Source code for tasktools.taskloop

import asyncio
import functools
import logging
import random
import warnings
from enum import IntEnum
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple


LOGGER = logging.getLogger(__name__)

CoroFunc = Callable[..., Awaitable[Any]]
FArgs = Callable[[List[Any], Any], Tuple[List[Any], Dict[str, Any]]]


class TaskLoopIterationTimeout(TimeoutError):
    """Raised when a TaskLoop iteration exceeds its iteration_timeout."""


def _resolve_loop(
    loop: Optional[asyncio.AbstractEventLoop] = None,
) -> asyncio.AbstractEventLoop:
    if loop is not None:
        return loop
    try:
        return asyncio.get_running_loop()
    except RuntimeError:
        pass
    # Legacy fallback: caller relies on the thread's current event loop.
    # Prefer passing loop= explicitly or calling inside a running loop.
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", DeprecationWarning)
        return asyncio.get_event_loop()


async def pause(sleep: float, *args: Any, **kwargs: Any) -> Tuple[List[Any], Dict[str, Any]]:
    await asyncio.sleep(sleep)
    return [sleep, *args], kwargs


async def _wait_for_event(
    event: asyncio.Event, *args: Any, **kwargs: Any
) -> Tuple[List[Any], Dict[str, Any]]:
    await event.wait()
    return [event, *args], kwargs


def _is_pause_coro(coro: Any) -> bool:
    return coro is pause or coro is _wait_for_event


def _normalize_iteration_result(result: Any) -> Tuple[List[Any], Dict[str, Any]]:
    if not isinstance(result, tuple) or len(result) != 2:
        raise TypeError("Iteration result must be a tuple of (args, kwargs)")
    next_args, next_kwargs = result
    if next_args is None:
        next_args = []
    if next_kwargs is None:
        next_kwargs = {}
    if not isinstance(next_kwargs, dict):
        raise TypeError("Iteration kwargs must be a dict")
    return list(next_args), dict(next_kwargs)


[docs] async def coromask( coro: CoroFunc, args: List[Any], kwargs: Dict[str, Any], fargs: FArgs, timeout: Optional[float] = None, ) -> Tuple[List[Any], Dict[str, Any]]: """ Execute a coroutine and normalize the next iteration payload. """ if timeout is not None: try: obtained = await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) except asyncio.TimeoutError as exc: raise TaskLoopIterationTimeout( f"iteration exceeded {timeout}s" ) from exc else: obtained = await coro(*args, **kwargs) return _normalize_iteration_result(fargs(list(args), obtained))
def _create_task( loop: asyncio.AbstractEventLoop, coro: CoroFunc, args: List[Any], kwargs: Dict[str, Any], fargs: FArgs, name: Optional[str] = None, timeout: Optional[float] = None, ) -> asyncio.Task: task_coro = coromask(coro, args, kwargs, fargs, timeout=timeout) if name is not None: return loop.create_task(task_coro, name=name) return loop.create_task(task_coro)
[docs] def renew( task: asyncio.Task, coro: CoroFunc, fargs: FArgs, loop: Optional[asyncio.AbstractEventLoop] = None, name: Optional[str] = None, ) -> Any: """ Renew a task when it completes using a done callback chain. """ if task.cancelled(): return "STOPPED" try: result_args, result_kwargs = _normalize_iteration_result(task.result()) except Exception: raise if result_kwargs.get("stop"): return "STOPPED" used_loop = _resolve_loop(loop) new_task = _create_task(used_loop, coro, result_args, result_kwargs, fargs, name=name) new_task.add_done_callback( functools.partial(renew, coro=coro, fargs=fargs, loop=used_loop, name=name) ) return new_task
[docs] def simple_fargs(_in: List[Any], obtained: Any) -> Tuple[List[Any], Dict[str, Any]]: return list(_in), {}
[docs] def simple_fargs_out(_in: List[Any], obtained: Any) -> Tuple[List[Any], Dict[str, Any]]: return _normalize_iteration_result(obtained)
class Steps(IntEnum): START = 0 CONTINUE = 1 PAUSE = 2 STOP = 3 CANCEL = 4 class TaskLoop: """ Re-schedule ``coro`` through ``done_callback`` until a stop condition is reached. Each iteration returns ``(next_args, next_kwargs)`` which become the input of the following iteration. A kwarg ``{"stop": True}`` terminates the loop; an ``{"exception": exc}`` kwarg terminates and propagates ``exc``. External control is available via ``pause()`` / ``task_continue()`` / ``stop()`` / ``cancel()``. """ def __init__( self, coro: CoroFunc, coro_args: Optional[List[Any]] = None, coro_kwargs: Optional[Dict[str, Any]] = None, control: Steps = Steps.START, fargs: FArgs = simple_fargs_out, loop: Optional[asyncio.AbstractEventLoop] = None, name: str = "taskloop", time_pause: float = 0.1, interval: Optional[float] = None, iteration_timeout: Optional[float] = None, max_retries: Optional[int] = None, retry_backoff: float = 0.0, retry_backoff_factor: float = 2.0, retry_jitter: float = 0.0, on_exception: Optional[Callable[[BaseException, int], Any]] = None, observer: Optional[Callable[[str, Dict[str, Any]], Any]] = None, logger: Optional[logging.Logger] = None, ): """ Parameters ---------- coro: The async function to run on each iteration. It receives ``*args, **kwargs`` plus an injected ``taskloop=`` kwarg pointing back at this instance, and must return ``(next_args_list, next_kwargs_dict)``. coro_args, coro_kwargs: Initial positional/keyword arguments for the first iteration. Subsequent iterations receive what the previous return (filtered through ``fargs``). control: Initial state. Defaults to ``Steps.START``. fargs: ``(input_args, iteration_return) -> (next_args, next_kwargs)`` adapter used to normalize each iteration. Defaults to ``simple_fargs_out`` which passes the return through. loop: Explicit event loop. Falls back to the running loop, then to a legacy ``get_event_loop()`` with a silenced warning. Prefer passing it or calling ``create()`` from within a running loop. name: Task name used for logging and ``asyncio.Task.get_name()``. time_pause: Legacy polling interval for paused loops. Kept for backward compatibility; since 1.5.0 the paused state is gated by an ``asyncio.Event`` and ``time_pause`` is unused on the normal pause path. interval: Optional fixed start-to-start cadence between iterations. If an iteration takes longer than ``interval`` the next iteration runs immediately. iteration_timeout: Optional per-iteration watchdog. When exceeded, the iteration is cancelled and ``TaskLoopIterationTimeout`` is propagated (and can trigger retries if enabled). max_retries: If set, the loop retries a failing iteration up to this many times before finalizing with the exception. ``None`` (default) means no retries. retry_backoff: Base delay before the first retry. The delay for attempt ``n`` (1-indexed) is ``retry_backoff * (retry_backoff_factor ** (n - 1)) + random.uniform(0, retry_jitter)``. retry_backoff_factor: Exponential multiplier applied between retries. Default 2.0. retry_jitter: Maximum additional random delay added to each retry. Default 0.0 (deterministic). Use a small positive value to break up thundering-herd retries. on_exception: Optional ``callable(exc, attempt) -> bool | None``. Called before scheduling each retry; returning a falsy value aborts retry and propagates the exception. Raising from the callback also aborts retry. observer: Optional ``callable(event_name, payload_dict)`` invoked at lifecycle points: ``started``, ``iteration_end``, ``iteration_error``, ``retry_scheduled``, ``paused``, ``resumed``, ``stop_requested``, ``stopped``, ``cancel_requested``, ``cancelled``, ``failed``. Observer exceptions are caught and logged; they never break the loop. logger: Logger used for internal diagnostics. Defaults to the module-level ``tasktools.taskloop`` logger. """ self.coro = coro self.coro_args = list(coro_args or []) self.coro_kwargs = dict(coro_kwargs or {}) self.control = control self.fargs = fargs self.loop = loop self._name = name self.time_pause = time_pause self.interval = interval self.iteration_timeout = iteration_timeout self.max_retries = max_retries self.retry_backoff = retry_backoff self.retry_backoff_factor = retry_backoff_factor self.retry_jitter = retry_jitter self.on_exception = on_exception self.observer = observer self.logger = logger or LOGGER self.active_task: Optional[asyncio.Task[Any]] = None self.done_callback: Optional[Callable[[asyncio.Task[Any]], Any]] = None self._last_result: Tuple[List[Any], Dict[str, Any]] = (list(self.coro_args), dict(self.coro_kwargs)) self._resume_payload: Tuple[List[Any], Dict[str, Any]] = self._last_result self._completion_future: Optional[asyncio.Future] = None self._iteration_start: Optional[float] = None self._iteration_input: Tuple[List[Any], Dict[str, Any]] = ( list(self.coro_args), dict(self.coro_kwargs), ) self._retry_count: int = 0 self._iteration_count: int = 0 self._pause_event: Optional[asyncio.Event] = None def _notify(self, event: str, payload: Optional[Dict[str, Any]] = None) -> None: if self.observer is None: return try: self.observer(event, dict(payload or {})) except Exception: self.logger.exception("observer callback for %r raised", event) @property def name(self) -> str: return self._name def __str__(self) -> str: return f"Taskloop {self.name}, coro {self.coro}" def __repr__(self) -> str: return f"Taskloop({self.coro},{self.name})" @property def running(self) -> bool: return bool(self._completion_future and not self._completion_future.done()) def stop(self) -> None: self.control = Steps.STOP if self._pause_event is not None: self._pause_event.set() self._notify("stop_requested") def pause(self) -> None: self.control = Steps.PAUSE if self._pause_event is not None: self._pause_event.clear() self._notify("paused") def task_continue(self) -> None: self.control = Steps.CONTINUE if self._pause_event is not None: self._pause_event.set() self._notify("resumed") def cancel(self) -> bool: self.control = Steps.CANCEL if self._pause_event is not None: self._pause_event.set() if not self.active_task: return False cancelled = self.active_task.cancel() if cancelled: self._notify("cancel_requested") return cancelled def finish(self) -> None: self.stop() def result(self) -> Tuple[List[Any], Dict[str, Any]]: if not self._completion_future: raise asyncio.InvalidStateError("TaskLoop has not been started") return self._completion_future.result() def __await__(self): return self.wait().__await__() async def __aenter__(self) -> "TaskLoop": self.create() return self async def __aexit__(self, exc_type, exc, tb) -> None: if self.running and self._completion_future is not None: self.cancel() try: await self._completion_future except asyncio.CancelledError: pass def set_name(self, name: str) -> None: self._name = name def start(self) -> asyncio.Task[Any]: return self.create() def create(self) -> asyncio.Task[Any]: if self.running and self.active_task is not None: return self.active_task loop = _resolve_loop(self.loop) self.loop = loop self._completion_future = loop.create_future() self.done_callback = functools.partial(self.renew, coro=self.coro, fargs=self.fargs) initial_kwargs = {**self.coro_kwargs, "taskloop": self} self._iteration_start = loop.time() self._iteration_input = (list(self.coro_args), dict(initial_kwargs)) if self._pause_event is None: self._pause_event = asyncio.Event() self._pause_event.set() self._notify("started", {"name": self._name}) self.active_task = _create_task( loop, self.coro, self.coro_args, initial_kwargs, self.fargs, self._name, timeout=self.iteration_timeout, ) self.active_task.add_done_callback(self.done_callback) return self.active_task async def wait(self) -> Tuple[List[Any], Dict[str, Any]]: if not self._completion_future: raise asyncio.InvalidStateError("TaskLoop has not been started") return await self._completion_future def _finish_with_result(self, result: Tuple[List[Any], Dict[str, Any]]) -> str: self._last_result = result if self._completion_future and not self._completion_future.done(): self._completion_future.set_result(result) self._notify("stopped", {"result": result}) return "STOPPED" def _finish_with_exception(self, exc: BaseException) -> str: if self._completion_future and not self._completion_future.done(): self._completion_future.set_exception(exc) self._notify("failed", {"exc": exc}) return "FAILED" def _should_retry(self, exc: BaseException) -> bool: if self.max_retries is None: return False if self._retry_count >= self.max_retries: return False if self.on_exception is not None: try: verdict = self.on_exception(exc, self._retry_count + 1) except Exception: self.logger.exception("on_exception callback raised; aborting retry") return False if verdict is None: return True return bool(verdict) return True def _schedule_retry(self, fargs: FArgs, exc: BaseException) -> asyncio.Task[Any]: assert self.loop is not None self._retry_count += 1 base = self.retry_backoff * (self.retry_backoff_factor ** (self._retry_count - 1)) jitter = random.uniform(0, self.retry_jitter) if self.retry_jitter > 0 else 0.0 delay = base + jitter self._resume_payload = self._iteration_input self._notify( "retry_scheduled", {"attempt": self._retry_count, "delay": delay, "exc": exc}, ) retry_task = _create_task( self.loop, pause, [delay], {}, fargs, self._name, ) retry_task.add_done_callback(functools.partial(self.renew, coro=pause, fargs=fargs)) self.active_task = retry_task return retry_task # --- renew: small helpers -------------------------------------- def _on_task_cancelled(self, task: asyncio.Task[Any]) -> str: self.control = Steps.CANCEL if self._completion_future and not self._completion_future.done(): self._completion_future.cancel() self.active_task = task self._notify("cancelled") return "CANCELLED" def _record_successful_iteration( self, coro: CoroFunc, result: Tuple[List[Any], Dict[str, Any]] ) -> None: if _is_pause_coro(coro): return self._resume_payload = result self._retry_count = 0 self._iteration_count += 1 duration: Optional[float] = None if self._iteration_start is not None and self.loop is not None: duration = self.loop.time() - self._iteration_start self._notify( "iteration_end", { "iteration": self._iteration_count, "duration": duration, "result": result, }, ) def _finalize_with_explicit_exception( self, task: asyncio.Task[Any], exception: BaseException ) -> str: self.control = Steps.STOP self.active_task = task self.logger.exception("TaskLoop %s failed", self.name, exc_info=exception) return self._finish_with_exception(exception) def _finalize_cancel(self, task: asyncio.Task[Any]) -> str: self.active_task = task if self._completion_future and not self._completion_future.done(): self._completion_future.cancel() return "CANCELLED" def _finalize_stop(self, task: asyncio.Task[Any]) -> str: self.control = Steps.STOP self.active_task = task return self._finish_with_result(self._resume_payload) def _decide_next( self, coro: CoroFunc, result_args: List[Any], result_kwargs: Dict[str, Any], ) -> Tuple[CoroFunc, List[Any], Dict[str, Any]]: # User-initiated pause: wait on the Event (or legacy pause()). if self.control == Steps.PAUSE: if self._pause_event is not None: return _wait_for_event, [self._pause_event], {} return pause, [self.time_pause], {} just_finished_user_iter = not _is_pause_coro(coro) if _is_pause_coro(coro): next_args, next_kwargs = self._resume_payload else: next_args, next_kwargs = result_args, result_kwargs if self.control == Steps.CONTINUE: self.control = Steps.START remaining = 0.0 if ( self.interval is not None and just_finished_user_iter and self._iteration_start is not None and self.loop is not None ): elapsed = self.loop.time() - self._iteration_start remaining = max(0.0, self.interval - elapsed) if remaining > 0: return pause, [remaining], {} next_kwargs = dict(next_kwargs) next_kwargs["taskloop"] = self return self.coro, next_args, next_kwargs def _schedule_next( self, next_coro: CoroFunc, next_args: List[Any], next_kwargs: Dict[str, Any], fargs: FArgs, ) -> asyncio.Task[Any]: assert self.loop is not None if next_coro is self.coro: self._iteration_start = self.loop.time() self._iteration_input = (list(next_args), dict(next_kwargs)) task_timeout = self.iteration_timeout else: task_timeout = None next_task = _create_task( self.loop, next_coro, next_args, next_kwargs, fargs, self._name, timeout=task_timeout, ) next_task.add_done_callback( functools.partial(self.renew, coro=next_coro, fargs=fargs) ) self.active_task = next_task return next_task def _handle_renew_exception( self, task: asyncio.Task[Any], exc: BaseException, result_kwargs: Dict[str, Any], fargs: FArgs, ) -> Any: self.active_task = task if result_kwargs.get("log"): result_kwargs["log"].exception("TaskLoop exception") else: self.logger.exception("TaskLoop %s failed", self.name) will_retry = self._should_retry(exc) self._notify( "iteration_error", { "attempt": self._retry_count + 1, "exc": exc, "will_retry": will_retry, }, ) if will_retry: return self._schedule_retry(fargs, exc) self.control = Steps.STOP return self._finish_with_exception(exc) def renew( self, task: asyncio.Task[Any], coro: CoroFunc, fargs: FArgs, ) -> Any: """ done_callback dispatcher: classify what happened, then delegate to one of the ``_finalize_*`` / ``_decide_next`` / ``_schedule_next`` helpers. """ if task.cancelled(): return self._on_task_cancelled(task) result_kwargs: Dict[str, Any] = {} try: result = _normalize_iteration_result(task.result()) result_args, result_kwargs = result self._last_result = result self._record_successful_iteration(coro, result) explicit_exc = result_kwargs.get("exception") if explicit_exc: return self._finalize_with_explicit_exception(task, explicit_exc) if self.control == Steps.CANCEL: return self._finalize_cancel(task) if self.control == Steps.STOP or result_kwargs.get("stop"): return self._finalize_stop(task) next_coro, next_args, next_kwargs = self._decide_next( coro, result_args, result_kwargs ) return self._schedule_next(next_coro, next_args, next_kwargs, fargs) except Exception as exc: return self._handle_renew_exception(task, exc, result_kwargs, fargs)