Code’s Documentation

These documentation came from the source code.

The Async While to be a Wheel

Those are the main functions to enable the async while. The core functions to create asynchronous code. Are like the gears for a big future machine.

async tasktools.taskloop.coromask(coro: Callable[[...], Awaitable[Any]], args: List[Any], kwargs: Dict[str, Any], fargs: Callable[[List[Any], Any], Tuple[List[Any], Dict[str, Any]]], timeout: float | None = None) Tuple[List[Any], Dict[str, Any]][source]

Execute a coroutine and normalize the next iteration payload.

tasktools.taskloop.renew(task: Task, coro: Callable[[...], Awaitable[Any]], fargs: Callable[[List[Any], Any], Tuple[List[Any], Dict[str, Any]]], loop: AbstractEventLoop | None = None, name: str | None = None) Any[source]

Renew a task when it completes using a done callback chain.

tasktools.taskloop.simple_fargs(_in: List[Any], obtained: Any) Tuple[List[Any], Dict[str, Any]][source]
tasktools.taskloop.simple_fargs_out(_in: List[Any], obtained: Any) Tuple[List[Any], Dict[str, Any]][source]

The Scheduler explained Step by Step

The Scheduler is a like abstract class the enables a set of features that you have to inherit to your own class.

class tasktools.scheduler.TaskScheduler(*args: Any, **kwargs: Any)[source]

Generic task scheduler composing a SlotRegistry and a ChannelRouter with optional multiprocessing fan-out via manage_tasks().

Since v2.0 the slot bookkeeping lives in SlotRegistry (exposed as self.registry) and the n2s/s2n side channels live in ChannelRouter (self.router). Primary run_task / process_sta_task logic stays on this class so subclasses can keep overriding them as methods.

manage_tasks(ipt: str) None[source]

Legacy blocking entry point.

Deprecated since version 2.0: Use run_ipt() (async, TaskGroup-based) instead. This method creates its own event loop and calls run_forever(); it will be removed in 3.0.

async run_ipt(ipt: str) None[source]

Run the slot group for ipt under structured concurrency.

Constructs one TaskLoop per slot-task plus the process_sta_manager loop, starts them all, and awaits their completion inside an asyncio.TaskGroup. If any loop raises, the others are cancelled. Use shutdown() from outside (or rely on the user iteration returning {"stop": True}) to stop cleanly.

shutdown() None[source]

Request all loops managed by run_ipt() to stop.

Sets stop on each active TaskLoop so the current iteration finishes and the loop exits cleanly.

The Assignator role

This class works fine, you don’t have to implement a subclass of this, you have to use your Scheduler class on it.

class tasktools.assignator.TaskAssignator(scheduler, queue_tasks, queue_answer, sta_assigned, dt_status, dt_group, locker, *args: Any, **kwargs: Any)[source]

Manage the tasks assigned to a TaskScheduler instance.

async new_process(queue_tasks, *args: Any, **kwargs: Any)[source]

Wake idle task slots and assign queued stations.

new_process_task() None[source]

Legacy blocking entry point.

Deprecated since version 2.0: Use run() (async) inside asyncio.run instead. This method creates its own event loop and calls run_forever(); it will be removed in 3.0.

async run() None[source]

Run the assignator loop under structured concurrency.

Creates the underlying TaskLoop on the current running event loop and awaits it. Use shutdown() (or cancel the surrounding task) to stop.

shutdown() None[source]

Request the assignator loop (if running) to stop.