import asyncio
import functools
import logging
import random
import warnings
from enum import IntEnum
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
LOGGER = logging.getLogger(__name__)
CoroFunc = Callable[..., Awaitable[Any]]
FArgs = Callable[[List[Any], Any], Tuple[List[Any], Dict[str, Any]]]
class TaskLoopIterationTimeout(TimeoutError):
"""Raised when a TaskLoop iteration exceeds its iteration_timeout."""
def _resolve_loop(
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> asyncio.AbstractEventLoop:
if loop is not None:
return loop
try:
return asyncio.get_running_loop()
except RuntimeError:
pass
# Legacy fallback: caller relies on the thread's current event loop.
# Prefer passing loop= explicitly or calling inside a running loop.
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
return asyncio.get_event_loop()
async def pause(sleep: float, *args: Any, **kwargs: Any) -> Tuple[List[Any], Dict[str, Any]]:
await asyncio.sleep(sleep)
return [sleep, *args], kwargs
async def _wait_for_event(
event: asyncio.Event, *args: Any, **kwargs: Any
) -> Tuple[List[Any], Dict[str, Any]]:
await event.wait()
return [event, *args], kwargs
def _is_pause_coro(coro: Any) -> bool:
return coro is pause or coro is _wait_for_event
def _normalize_iteration_result(result: Any) -> Tuple[List[Any], Dict[str, Any]]:
if not isinstance(result, tuple) or len(result) != 2:
raise TypeError("Iteration result must be a tuple of (args, kwargs)")
next_args, next_kwargs = result
if next_args is None:
next_args = []
if next_kwargs is None:
next_kwargs = {}
if not isinstance(next_kwargs, dict):
raise TypeError("Iteration kwargs must be a dict")
return list(next_args), dict(next_kwargs)
[docs]
async def coromask(
coro: CoroFunc,
args: List[Any],
kwargs: Dict[str, Any],
fargs: FArgs,
timeout: Optional[float] = None,
) -> Tuple[List[Any], Dict[str, Any]]:
"""
Execute a coroutine and normalize the next iteration payload.
"""
if timeout is not None:
try:
obtained = await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout)
except asyncio.TimeoutError as exc:
raise TaskLoopIterationTimeout(
f"iteration exceeded {timeout}s"
) from exc
else:
obtained = await coro(*args, **kwargs)
return _normalize_iteration_result(fargs(list(args), obtained))
def _create_task(
loop: asyncio.AbstractEventLoop,
coro: CoroFunc,
args: List[Any],
kwargs: Dict[str, Any],
fargs: FArgs,
name: Optional[str] = None,
timeout: Optional[float] = None,
) -> asyncio.Task:
task_coro = coromask(coro, args, kwargs, fargs, timeout=timeout)
if name is not None:
return loop.create_task(task_coro, name=name)
return loop.create_task(task_coro)
[docs]
def renew(
task: asyncio.Task,
coro: CoroFunc,
fargs: FArgs,
loop: Optional[asyncio.AbstractEventLoop] = None,
name: Optional[str] = None,
) -> Any:
"""
Renew a task when it completes using a done callback chain.
"""
if task.cancelled():
return "STOPPED"
try:
result_args, result_kwargs = _normalize_iteration_result(task.result())
except Exception:
raise
if result_kwargs.get("stop"):
return "STOPPED"
used_loop = _resolve_loop(loop)
new_task = _create_task(used_loop, coro, result_args, result_kwargs, fargs, name=name)
new_task.add_done_callback(
functools.partial(renew, coro=coro, fargs=fargs, loop=used_loop, name=name)
)
return new_task
[docs]
def simple_fargs(_in: List[Any], obtained: Any) -> Tuple[List[Any], Dict[str, Any]]:
return list(_in), {}
[docs]
def simple_fargs_out(_in: List[Any], obtained: Any) -> Tuple[List[Any], Dict[str, Any]]:
return _normalize_iteration_result(obtained)
class Steps(IntEnum):
START = 0
CONTINUE = 1
PAUSE = 2
STOP = 3
CANCEL = 4
class TaskLoop:
"""
Re-schedule ``coro`` through ``done_callback`` until a stop
condition is reached.
Each iteration returns ``(next_args, next_kwargs)`` which become
the input of the following iteration. A kwarg ``{"stop": True}``
terminates the loop; an ``{"exception": exc}`` kwarg terminates
and propagates ``exc``. External control is available via
``pause()`` / ``task_continue()`` / ``stop()`` / ``cancel()``.
"""
def __init__(
self,
coro: CoroFunc,
coro_args: Optional[List[Any]] = None,
coro_kwargs: Optional[Dict[str, Any]] = None,
control: Steps = Steps.START,
fargs: FArgs = simple_fargs_out,
loop: Optional[asyncio.AbstractEventLoop] = None,
name: str = "taskloop",
time_pause: float = 0.1,
interval: Optional[float] = None,
iteration_timeout: Optional[float] = None,
max_retries: Optional[int] = None,
retry_backoff: float = 0.0,
retry_backoff_factor: float = 2.0,
retry_jitter: float = 0.0,
on_exception: Optional[Callable[[BaseException, int], Any]] = None,
observer: Optional[Callable[[str, Dict[str, Any]], Any]] = None,
logger: Optional[logging.Logger] = None,
):
"""
Parameters
----------
coro:
The async function to run on each iteration. It receives
``*args, **kwargs`` plus an injected ``taskloop=`` kwarg
pointing back at this instance, and must return
``(next_args_list, next_kwargs_dict)``.
coro_args, coro_kwargs:
Initial positional/keyword arguments for the first
iteration. Subsequent iterations receive what the
previous return (filtered through ``fargs``).
control:
Initial state. Defaults to ``Steps.START``.
fargs:
``(input_args, iteration_return) -> (next_args, next_kwargs)``
adapter used to normalize each iteration. Defaults to
``simple_fargs_out`` which passes the return through.
loop:
Explicit event loop. Falls back to the running loop, then
to a legacy ``get_event_loop()`` with a silenced warning.
Prefer passing it or calling ``create()`` from within a
running loop.
name:
Task name used for logging and ``asyncio.Task.get_name()``.
time_pause:
Legacy polling interval for paused loops. Kept for
backward compatibility; since 1.5.0 the paused state is
gated by an ``asyncio.Event`` and ``time_pause`` is
unused on the normal pause path.
interval:
Optional fixed start-to-start cadence between iterations.
If an iteration takes longer than ``interval`` the next
iteration runs immediately.
iteration_timeout:
Optional per-iteration watchdog. When exceeded, the
iteration is cancelled and ``TaskLoopIterationTimeout``
is propagated (and can trigger retries if enabled).
max_retries:
If set, the loop retries a failing iteration up to this
many times before finalizing with the exception. ``None``
(default) means no retries.
retry_backoff:
Base delay before the first retry. The delay for attempt
``n`` (1-indexed) is
``retry_backoff * (retry_backoff_factor ** (n - 1)) + random.uniform(0, retry_jitter)``.
retry_backoff_factor:
Exponential multiplier applied between retries. Default 2.0.
retry_jitter:
Maximum additional random delay added to each retry.
Default 0.0 (deterministic). Use a small positive value
to break up thundering-herd retries.
on_exception:
Optional ``callable(exc, attempt) -> bool | None``. Called
before scheduling each retry; returning a falsy value
aborts retry and propagates the exception. Raising from
the callback also aborts retry.
observer:
Optional ``callable(event_name, payload_dict)`` invoked at
lifecycle points: ``started``, ``iteration_end``,
``iteration_error``, ``retry_scheduled``, ``paused``,
``resumed``, ``stop_requested``, ``stopped``,
``cancel_requested``, ``cancelled``, ``failed``.
Observer exceptions are caught and logged; they never
break the loop.
logger:
Logger used for internal diagnostics. Defaults to the
module-level ``tasktools.taskloop`` logger.
"""
self.coro = coro
self.coro_args = list(coro_args or [])
self.coro_kwargs = dict(coro_kwargs or {})
self.control = control
self.fargs = fargs
self.loop = loop
self._name = name
self.time_pause = time_pause
self.interval = interval
self.iteration_timeout = iteration_timeout
self.max_retries = max_retries
self.retry_backoff = retry_backoff
self.retry_backoff_factor = retry_backoff_factor
self.retry_jitter = retry_jitter
self.on_exception = on_exception
self.observer = observer
self.logger = logger or LOGGER
self.active_task: Optional[asyncio.Task[Any]] = None
self.done_callback: Optional[Callable[[asyncio.Task[Any]], Any]] = None
self._last_result: Tuple[List[Any], Dict[str, Any]] = (list(self.coro_args), dict(self.coro_kwargs))
self._resume_payload: Tuple[List[Any], Dict[str, Any]] = self._last_result
self._completion_future: Optional[asyncio.Future] = None
self._iteration_start: Optional[float] = None
self._iteration_input: Tuple[List[Any], Dict[str, Any]] = (
list(self.coro_args),
dict(self.coro_kwargs),
)
self._retry_count: int = 0
self._iteration_count: int = 0
self._pause_event: Optional[asyncio.Event] = None
def _notify(self, event: str, payload: Optional[Dict[str, Any]] = None) -> None:
if self.observer is None:
return
try:
self.observer(event, dict(payload or {}))
except Exception:
self.logger.exception("observer callback for %r raised", event)
@property
def name(self) -> str:
return self._name
def __str__(self) -> str:
return f"Taskloop {self.name}, coro {self.coro}"
def __repr__(self) -> str:
return f"Taskloop({self.coro},{self.name})"
@property
def running(self) -> bool:
return bool(self._completion_future and not self._completion_future.done())
def stop(self) -> None:
self.control = Steps.STOP
if self._pause_event is not None:
self._pause_event.set()
self._notify("stop_requested")
def pause(self) -> None:
self.control = Steps.PAUSE
if self._pause_event is not None:
self._pause_event.clear()
self._notify("paused")
def task_continue(self) -> None:
self.control = Steps.CONTINUE
if self._pause_event is not None:
self._pause_event.set()
self._notify("resumed")
def cancel(self) -> bool:
self.control = Steps.CANCEL
if self._pause_event is not None:
self._pause_event.set()
if not self.active_task:
return False
cancelled = self.active_task.cancel()
if cancelled:
self._notify("cancel_requested")
return cancelled
def finish(self) -> None:
self.stop()
def result(self) -> Tuple[List[Any], Dict[str, Any]]:
if not self._completion_future:
raise asyncio.InvalidStateError("TaskLoop has not been started")
return self._completion_future.result()
def __await__(self):
return self.wait().__await__()
async def __aenter__(self) -> "TaskLoop":
self.create()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
if self.running and self._completion_future is not None:
self.cancel()
try:
await self._completion_future
except asyncio.CancelledError:
pass
def set_name(self, name: str) -> None:
self._name = name
def start(self) -> asyncio.Task[Any]:
return self.create()
def create(self) -> asyncio.Task[Any]:
if self.running and self.active_task is not None:
return self.active_task
loop = _resolve_loop(self.loop)
self.loop = loop
self._completion_future = loop.create_future()
self.done_callback = functools.partial(self.renew, coro=self.coro, fargs=self.fargs)
initial_kwargs = {**self.coro_kwargs, "taskloop": self}
self._iteration_start = loop.time()
self._iteration_input = (list(self.coro_args), dict(initial_kwargs))
if self._pause_event is None:
self._pause_event = asyncio.Event()
self._pause_event.set()
self._notify("started", {"name": self._name})
self.active_task = _create_task(
loop,
self.coro,
self.coro_args,
initial_kwargs,
self.fargs,
self._name,
timeout=self.iteration_timeout,
)
self.active_task.add_done_callback(self.done_callback)
return self.active_task
async def wait(self) -> Tuple[List[Any], Dict[str, Any]]:
if not self._completion_future:
raise asyncio.InvalidStateError("TaskLoop has not been started")
return await self._completion_future
def _finish_with_result(self, result: Tuple[List[Any], Dict[str, Any]]) -> str:
self._last_result = result
if self._completion_future and not self._completion_future.done():
self._completion_future.set_result(result)
self._notify("stopped", {"result": result})
return "STOPPED"
def _finish_with_exception(self, exc: BaseException) -> str:
if self._completion_future and not self._completion_future.done():
self._completion_future.set_exception(exc)
self._notify("failed", {"exc": exc})
return "FAILED"
def _should_retry(self, exc: BaseException) -> bool:
if self.max_retries is None:
return False
if self._retry_count >= self.max_retries:
return False
if self.on_exception is not None:
try:
verdict = self.on_exception(exc, self._retry_count + 1)
except Exception:
self.logger.exception("on_exception callback raised; aborting retry")
return False
if verdict is None:
return True
return bool(verdict)
return True
def _schedule_retry(self, fargs: FArgs, exc: BaseException) -> asyncio.Task[Any]:
assert self.loop is not None
self._retry_count += 1
base = self.retry_backoff * (self.retry_backoff_factor ** (self._retry_count - 1))
jitter = random.uniform(0, self.retry_jitter) if self.retry_jitter > 0 else 0.0
delay = base + jitter
self._resume_payload = self._iteration_input
self._notify(
"retry_scheduled",
{"attempt": self._retry_count, "delay": delay, "exc": exc},
)
retry_task = _create_task(
self.loop,
pause,
[delay],
{},
fargs,
self._name,
)
retry_task.add_done_callback(functools.partial(self.renew, coro=pause, fargs=fargs))
self.active_task = retry_task
return retry_task
# --- renew: small helpers --------------------------------------
def _on_task_cancelled(self, task: asyncio.Task[Any]) -> str:
self.control = Steps.CANCEL
if self._completion_future and not self._completion_future.done():
self._completion_future.cancel()
self.active_task = task
self._notify("cancelled")
return "CANCELLED"
def _record_successful_iteration(
self, coro: CoroFunc, result: Tuple[List[Any], Dict[str, Any]]
) -> None:
if _is_pause_coro(coro):
return
self._resume_payload = result
self._retry_count = 0
self._iteration_count += 1
duration: Optional[float] = None
if self._iteration_start is not None and self.loop is not None:
duration = self.loop.time() - self._iteration_start
self._notify(
"iteration_end",
{
"iteration": self._iteration_count,
"duration": duration,
"result": result,
},
)
def _finalize_with_explicit_exception(
self, task: asyncio.Task[Any], exception: BaseException
) -> str:
self.control = Steps.STOP
self.active_task = task
self.logger.exception("TaskLoop %s failed", self.name, exc_info=exception)
return self._finish_with_exception(exception)
def _finalize_cancel(self, task: asyncio.Task[Any]) -> str:
self.active_task = task
if self._completion_future and not self._completion_future.done():
self._completion_future.cancel()
return "CANCELLED"
def _finalize_stop(self, task: asyncio.Task[Any]) -> str:
self.control = Steps.STOP
self.active_task = task
return self._finish_with_result(self._resume_payload)
def _decide_next(
self,
coro: CoroFunc,
result_args: List[Any],
result_kwargs: Dict[str, Any],
) -> Tuple[CoroFunc, List[Any], Dict[str, Any]]:
# User-initiated pause: wait on the Event (or legacy pause()).
if self.control == Steps.PAUSE:
if self._pause_event is not None:
return _wait_for_event, [self._pause_event], {}
return pause, [self.time_pause], {}
just_finished_user_iter = not _is_pause_coro(coro)
if _is_pause_coro(coro):
next_args, next_kwargs = self._resume_payload
else:
next_args, next_kwargs = result_args, result_kwargs
if self.control == Steps.CONTINUE:
self.control = Steps.START
remaining = 0.0
if (
self.interval is not None
and just_finished_user_iter
and self._iteration_start is not None
and self.loop is not None
):
elapsed = self.loop.time() - self._iteration_start
remaining = max(0.0, self.interval - elapsed)
if remaining > 0:
return pause, [remaining], {}
next_kwargs = dict(next_kwargs)
next_kwargs["taskloop"] = self
return self.coro, next_args, next_kwargs
def _schedule_next(
self,
next_coro: CoroFunc,
next_args: List[Any],
next_kwargs: Dict[str, Any],
fargs: FArgs,
) -> asyncio.Task[Any]:
assert self.loop is not None
if next_coro is self.coro:
self._iteration_start = self.loop.time()
self._iteration_input = (list(next_args), dict(next_kwargs))
task_timeout = self.iteration_timeout
else:
task_timeout = None
next_task = _create_task(
self.loop,
next_coro,
next_args,
next_kwargs,
fargs,
self._name,
timeout=task_timeout,
)
next_task.add_done_callback(
functools.partial(self.renew, coro=next_coro, fargs=fargs)
)
self.active_task = next_task
return next_task
def _handle_renew_exception(
self,
task: asyncio.Task[Any],
exc: BaseException,
result_kwargs: Dict[str, Any],
fargs: FArgs,
) -> Any:
self.active_task = task
if result_kwargs.get("log"):
result_kwargs["log"].exception("TaskLoop exception")
else:
self.logger.exception("TaskLoop %s failed", self.name)
will_retry = self._should_retry(exc)
self._notify(
"iteration_error",
{
"attempt": self._retry_count + 1,
"exc": exc,
"will_retry": will_retry,
},
)
if will_retry:
return self._schedule_retry(fargs, exc)
self.control = Steps.STOP
return self._finish_with_exception(exc)
def renew(
self,
task: asyncio.Task[Any],
coro: CoroFunc,
fargs: FArgs,
) -> Any:
"""
done_callback dispatcher: classify what happened, then delegate
to one of the ``_finalize_*`` / ``_decide_next`` / ``_schedule_next``
helpers.
"""
if task.cancelled():
return self._on_task_cancelled(task)
result_kwargs: Dict[str, Any] = {}
try:
result = _normalize_iteration_result(task.result())
result_args, result_kwargs = result
self._last_result = result
self._record_successful_iteration(coro, result)
explicit_exc = result_kwargs.get("exception")
if explicit_exc:
return self._finalize_with_explicit_exception(task, explicit_exc)
if self.control == Steps.CANCEL:
return self._finalize_cancel(task)
if self.control == Steps.STOP or result_kwargs.get("stop"):
return self._finalize_stop(task)
next_coro, next_args, next_kwargs = self._decide_next(
coro, result_args, result_kwargs
)
return self._schedule_next(next_coro, next_args, next_kwargs, fargs)
except Exception as exc:
return self._handle_renew_exception(task, exc, result_kwargs, fargs)