Why we need an Async While?¶
If you need to run your coroutines and each must be executed independent, like receive stream data from a list of sources, in which every source have different frecuencies, you need this.
TaskLoop provides a set of functions and asyncio coroutines who can help you on your mission.
You need
You can’t do a /for/ and obtain good results. You need free tasks.
So, the model you need is for every source-coroutine is:
The implementation consider:
Define coroutine
The couroutine need some input?
The input is the same ever or change with the result?
Create a couroutine mask who await the couroutine and manage the input
Call the coro-mask in a task generator, scheduling it.
Set a callback when task is done
When task is done the callback must be the same task generator
In a schemma, the system is:
TaskLoop State Model¶
The previous graph explains the callback chain. It is not a state diagram.
TaskLoop also has an explicit control model, evaluated inside renew when
the active task finishes:
The important detail is that PAUSE and CONTINUE are control signals for
the next callback decision:
PAUSEprevents rescheduling the user coroutine and schedules a short pause coroutine instead.CONTINUEresumes the user coroutine with the last payload that was produced before the pause loop.STOPends the callback chain after the current task completes.CANCELcancels the current task immediately.
The methods or functions available for this schema are:
Function |
Inputs |
Do |
|---|---|---|
coromask |
[coro, args, kwargs, fargs] | await coro |
|
renew |
[task, coro, fargs] |
renew task |
simple_fargs |
[input, output] |
return input |
simple_fargs_out |
[input, output] |
return output |
The following code is an example about how to use the taskloop methods, we have two coroutines, define a fargs for every coroutine, and use the methods on a asyncio event loop.
1 import asyncio
2 import functools
3
4 async def holacoro(v):
5 print("Hola %d" % v)
6 await asyncio.sleep(1)
7 return v+1
8
9 async def sumacoro(*args):
10 c=sum(args)
11 print("La suma es %d" %c)
12 await asyncio.sleep(3)
13 return c
14
15 def fargs_holacoro(args, obtained):
16 return [obtained]
17
18 def fargs_sumacoro(args, obtained):
19 result= [args[-1], obtained]
20 return result
21
22 #every coroutine
23 async def coromask(coro, args, kwargs, fargs):
24 _in=args
25 obtained=await coro(*args, **kwargs)
26 result=fargs(_in, obtained)
27 return result
28
29 def renew(task, coro, fargs):
30 result_args, result_kwargs = task.result()
31 task=loop.create_task(coromask(coro, result_args, result_kwargs, fargs))
32 task.add_done_callback(functools.partial(renew, coro=coro, fargs=fargs))
33
34 loop=asyncio.get_event_loop()
35
36 args=[1]
37 task1=loop.create_task(coromask(holacoro,args,{},fargs_holacoro))
38 task1.add_done_callback(functools.partial(renew, coro=holacoro, fargs=fargs_holacoro))
39
40 args2=[1,2]
41 task2=loop.create_task(coromask(sumacoro,args2,{},fargs_sumacoro))
42 task2.add_done_callback(functools.partial(renew, coro=sumacoro, fargs=fargs_sumacoro))
43
44
45 try:
46 loop.run_forever()
47 except KeyboardInterrupt:
48 print('Loop stopped')
Now, those async functions are encapsulated on the Tasktools module inside the taskloop file. Hence, to call them you have to use the tradicional python system to use modules and functions.
1from tasktools.taskloop import coromask, renew, simple_fargs