Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ def __init__(self):
self.slow_callback_duration = 0.1
self._current_handle = None
self._task_factory = None
self._tasks = weakref.WeakSet()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be an unregister_task() method instead of relying on GC and WeakSet. It's weird if holding on to a copy of all_tasks() prevents any of those tasks from leaving the set even if they're finished. (The WeakSet implementation was fine for an internal-only debugging tool, but if it's going to be public I think it should have more well-defined semantics.)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good idea. asyncio.Task can have a __del__ method which would call self.loop.unregister_task(self).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it doesn't work: if tasks are stored in strong set() inside a loop the __del__ is never called -- refcount never became zero.
I could call _unregister_task() from _step() on exit from coroutine.

Note about naming: current_task() and all_tasks() are public methods but _register_task() and family should be never called by user code, that's why I've added underscore prefix.

self._current_task = None
self._coroutine_wrapper_set = False

if hasattr(sys, 'get_asyncgen_hooks'):
Expand Down Expand Up @@ -302,6 +304,28 @@ def get_task_factory(self):
"""Return a task factory, or None if the default one is in use."""
return self._task_factory

def current_task(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the new methods need documentation. Do they operate only on instances of asyncio.Task or is there a "task" interface that could have other implementations?

For Tornado I originally wanted to be able to use a tornado.gen.Runner instead of asyncio.Task, but I've given up on that. If anyone else is still pursuing alternative coroutine runners you may want to get their feedback on this interface too.

If it's only asyncio.Task, then I think it probably makes more sense to just do everything with public classmethods of Task instead of putting it on the event loop. Every event loop is just going to copy these five methods more or less verbatim; there's nothing here that requires anything of the event loop besides a place to store a couple of attributes. The only reason I see to move it from Task to the event loop is if we're aiming to abstract away the Task class itself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind a task is hashable object with .cancel() and .cancelled().
With the patch task could be and coroutine runner (sorry, only in Python 3.7+).

We can invite a AbstractTask if needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the new methods need documentation. Do they operate only on instances of asyncio.Task or is there a "task" interface that could have other implementations?

They will work with anything that behaves like asyncio.Task, although I don't think we need to add any type checks to enforce that.

Originally this PR was motivated by PyO3/tokio event loop, which is written in Rust. The idea is that tokio would have its own Task implementation that can be used in both Python and Rust environments simultaneously—think of it as a bridge between async Python and async Rust code.

There aren't a lot of requirements for a Task-like object: it must implement asyncio.Future APIs (add_done_callback(), cancel(), etc), and must be able to wrap a coroutine object. tokio event loop provides just that. There should be nothing wrong to have a program that simultaneously uses a few different Task implementations.

I guess you have a similar need in Tornado. You likely need a custom Task class that supports both generator-based coroutines and async/await, and maybe some Tornado-specific functionality, right?

If it's only asyncio.Task, then I think it probably makes more sense to just do everything with public classmethods of Task instead of putting it on the event loop. Every event loop is just going to copy these five methods more or less verbatim; [..]

You're right here. I now think that instead of having those as methods on the event loop, they should be just top-level asyncio functions: asyncio.all_tasks(), asyncio.current_task(), etc.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can invite a AbstractTask if needed.

I think it would be great to have both AbstractFuture and AsbtractTask ABCs just like we have events.AbstractEventLoop. Just as with event loops it won't be required for Future-like and Task-like objects to actually extend those classes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it sounds like introducing an AbstractTask is the way to go, just to define what exactly is necessary (for example, set_result and set_exception probably don't belong here)

You likely need a custom Task class that supports both generator-based coroutines and async/await, and maybe some Tornado-specific functionality, right?

Hmm, I hadn't even considered registering tornado's yield-based coroutines with asyncio like this. That could be useful (although it would require us to implement cancellation). I don't actually have any tornado-specific functionality that is compatible with async/await, which is why I'm moving to just use asyncio.Task in that case.

Just as with event loops it won't be required for Future-like and Task-like objects to actually extend those classes.

That would be great! I thought I had lost that argument and I've spent a lot of time shaving yaks to just move to the asyncio classes, since they weren't amenable to external implementations.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great! I thought I had lost that argument and I've spent a lot of time shaving yaks to just move to the asyncio classes, since they weren't amenable to external implementations.

BTW, I'm not sure if you know about this as it's a rather obscure thing: since at least 3.6, any object that has a _asyncio_future_blocking attribute is recognized as a Future-like object by asyncio. This is something that we could highlight in the AbstractFuture ABC.

@asvetlov Let's create a separate issue to add the discussed ABCs. For this one -- let's make all_tasks(), current_task(), _register_task(), etc top-level asyncio functions. Ben's right, there's nothing loop-specific about them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow what is proposed implementation?
Just move Task._current_tasks and Task._all_tasks into module namespace?
In my mind a loop is a container for own tasks.
asyncio.current_task() and asyncio.all_tasks() make sense as user-level API but implementation should sit in a loop, isn't it?

@1st1 1st1 Dec 10, 2017

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow what is proposed implementation?
Just move Task._current_tasks and Task._all_tasks into module namespace?

Yes, that's what I'm suggesting.

In my mind a loop is a container for own tasks.
asyncio.current_task() and asyncio.all_tasks() make sense as user-level API but implementation should sit in a loop, isn't it?

Let me list here some thoughts I have about this:

  1. All event loops will have to copy the implementation from asyncio. The API and its implementation is very simple and will likely be identical between, say, uvloop and asyncio/base_events.py. This duplication of code in different loop implementations is the main argument against adding this API to the event loop.

  2. We actually want to minimize the need of getting the current event loop. We also want the API to become straightforward to use. Compare the following:

    loop = asyncio.get_event_loop()
    loop.current_task()
    
    # vs
    
    asyncio.Task.current_task()
    
    # vs
    
    asyncio.current_task()
  3. asyncio.Task.current_task() isn't just cumbersome to use. It also requires us to have two implementations of it in asyncio. One in tasks.py, and one in _asynciomodule.c.

  4. asyncio.Task.current_task() is tied to asyncio.Task. It's weird to expect it to point to a different implementation of a Task (say tokio's Task objects).

  5. Currently, the loop argument is optional in asyncio.Task.current_task(). If the loop wasn't passed to it explicitly, it uses asyncio.get_event_loop() to get the current event loop. The caveat here is that asyncio.get_event_loop() can actually create a new event loop, or do something else, depending on the current policy.

    I want the new current_task() implementation to use asyncio._get_running_loop(), and raise an error if there's no currently running event loop.

  6. Implementing current_task() as a top-level asyncio function means that the existing event loops don't need to be updated at all to stay compatible with the new APIs we want to add. This means we can drop asyncio.Task._current_tasks and asyncio.Task._all_tasks attributes. We can't drop them with the current approach.

  7. Maintaining a separate event loop implementation (like uvloop) is already hard. I'd like to minimize the number of APIs we need to implement on the loop. In this case it's easy to do so.

All in all, I do believe now, that asyncio.current_task() and asyncio.all_tasks() is the way to go. Decoupling them from asyncio.Task and the event loop has simply too many advantages and no downsides.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.
Thanks for comprehensive explanation.
I'll rewrite the PR tomorrow.

return self._current_task

def all_tasks(self):
return set(self._tasks)

def _register_task(self, task):
self._tasks.add(task)

def _enter_task(self, task):
if self._current_task is not None:
raise RuntimeError("Entering into task {!r} "
"when other task {!r} is executed."
.format(task, self._current_task))
self._current_task = task

def _leave_task(self, task):
if self._current_task is not task:
raise RuntimeError("Leaving task {!r} is not current {!r}."
.format(task, self._current_task))
self._current_task = None

def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
"""Create socket transport."""
Expand Down
17 changes: 17 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,23 @@ def set_task_factory(self, factory):
def get_task_factory(self):
raise NotImplementedError

# Tasks introspection

def current_task(self):
raise NotImplementedError

def all_tasks(self):
raise NotImplementedError

def _register_task(self, task):
raise NotImplementedError

def _enter_task(self, task):
raise NotImplementedError

def _leave_task(self, task):
raise NotImplementedError

# Error handlers.

def get_exception_handler(self):
Expand Down
39 changes: 34 additions & 5 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,39 @@ def current_task(cls, loop=None):

None is returned when called not in the context of a Task.
"""
warnings.warn("Task.current_task() is deprecated, "
"use loop.current_task() instead.",
PendingDeprecationWarning,
stacklevel=2)
if loop is None:
loop = events.get_event_loop()
return cls._current_tasks.get(loop)
try:
return loop.current_task()
except (AttributeError, NotImplementedError):
# This code is needed to thrird-party event loops that don't
# support loop introspection API yet.
# The fallback will be removed in 3.8.
return cls._current_tasks.get(loop)

@classmethod
def all_tasks(cls, loop=None):
"""Return a set of all tasks for an event loop.

By default all tasks for the current event loop are returned.
"""
warnings.warn("Task.all_tasks() is deprecated, "
"use loop.all_tasks() instead.",
PendingDeprecationWarning,
stacklevel=2)
if loop is None:
loop = events.get_event_loop()
return {t for t in cls._all_tasks if t._loop is loop}
try:
return loop.all_tasks()
except (AttributeError, NotImplementedError):
# This code is needed to thrird-party event loops that don't
# support loop introspection API yet.
# The fallback will be removed in 3.8.
return {t for t in cls._all_tasks if t._loop is loop}

def __init__(self, coro, *, loop=None):
assert coroutines.iscoroutine(coro), repr(coro)
Expand All @@ -74,7 +94,10 @@ def __init__(self, coro, *, loop=None):
self._fut_waiter = None
self._must_cancel = False
self._loop.call_soon(self._step)
self.__class__._all_tasks.add(self)
try:
self._loop._register_task(self)
except (AttributeError, NotImplementedError):
self.__class__._all_tasks.add(self)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove Task._all_tasks and Task._current_tasks. They were never public attributes anyways.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These attributes are present for sake of backward compatibility.
Let's imagine a loop without task introspection support (not implementing _register_task, _enter_task and _leave_task).
The proposed PR will keep working by all_tasks/_current_task fallbacks.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let's add a comment that this code is needed to thrird-party event loops end that it will be removed in 3.8.


def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
Expand Down Expand Up @@ -167,7 +190,10 @@ def _step(self, exc=None):
coro = self._coro
self._fut_waiter = None

self.__class__._current_tasks[self._loop] = self
try:
self._loop._enter_task(self)
except (AttributeError, NotImplementedError):
self.__class__._current_tasks[self._loop] = self
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
Expand Down Expand Up @@ -238,7 +264,10 @@ def _step(self, exc=None):
RuntimeError(
'Task got bad yield: {!r}'.format(result)))
finally:
self.__class__._current_tasks.pop(self._loop)
try:
self._loop._leave_task(self)
except (AttributeError, NotImplementedError):
self.__class__._current_tasks.pop(self._loop)
self = None # Needed to break cycles when an exception occurs.

def _wakeup(self, future):
Expand Down