Code’s Documentation¶
These documentation came from the source code.
The Async While to be a Wheel¶
Those are the main functions to enable the async while. The core functions to create asynchronous code. Are like the gears for a big future machine.
- async tasktools.taskloop.coromask(coro: Callable[[...], Awaitable[Any]], args: List[Any], kwargs: Dict[str, Any], fargs: Callable[[List[Any], Any], Tuple[List[Any], Dict[str, Any]]], timeout: float | None = None) Tuple[List[Any], Dict[str, Any]][source]¶
Execute a coroutine and normalize the next iteration payload.
- tasktools.taskloop.renew(task: Task, coro: Callable[[...], Awaitable[Any]], fargs: Callable[[List[Any], Any], Tuple[List[Any], Dict[str, Any]]], loop: AbstractEventLoop | None = None, name: str | None = None) Any[source]¶
Renew a task when it completes using a done callback chain.
The Scheduler explained Step by Step¶
The Scheduler is a like abstract class the enables a set of features that you have to inherit to your own class.
- class tasktools.scheduler.TaskScheduler(*args: Any, **kwargs: Any)[source]¶
Generic task scheduler composing a
SlotRegistryand aChannelRouterwith optional multiprocessing fan-out viamanage_tasks().Since v2.0 the slot bookkeeping lives in
SlotRegistry(exposed asself.registry) and the n2s/s2n side channels live inChannelRouter(self.router). Primaryrun_task/process_sta_tasklogic stays on this class so subclasses can keep overriding them as methods.- manage_tasks(ipt: str) None[source]¶
Legacy blocking entry point.
Deprecated since version 2.0: Use
run_ipt()(async, TaskGroup-based) instead. This method creates its own event loop and callsrun_forever(); it will be removed in 3.0.
- async run_ipt(ipt: str) None[source]¶
Run the slot group for
iptunder structured concurrency.Constructs one
TaskLoopper slot-task plus theprocess_sta_managerloop, starts them all, and awaits their completion inside anasyncio.TaskGroup. If any loop raises, the others are cancelled. Useshutdown()from outside (or rely on the user iteration returning{"stop": True}) to stop cleanly.
The Assignator role¶
This class works fine, you don’t have to implement a subclass of this, you have to use your Scheduler class on it.
- class tasktools.assignator.TaskAssignator(scheduler, queue_tasks, queue_answer, sta_assigned, dt_status, dt_group, locker, *args: Any, **kwargs: Any)[source]¶
Manage the tasks assigned to a TaskScheduler instance.
- async new_process(queue_tasks, *args: Any, **kwargs: Any)[source]¶
Wake idle task slots and assign queued stations.
- new_process_task() None[source]¶
Legacy blocking entry point.
Deprecated since version 2.0: Use
run()(async) insideasyncio.runinstead. This method creates its own event loop and callsrun_forever(); it will be removed in 3.0.
- async run() None[source]¶
Run the assignator loop under structured concurrency.
Creates the underlying
TaskLoopon the current running event loop and awaits it. Useshutdown()(or cancel the surrounding task) to stop.