Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
ee13d5b
Draft for sock_sendfile
asvetlov Dec 22, 2017
8b610ad
Add test for blocked socket
asvetlov Dec 22, 2017
8d658be
Polish tests
asvetlov Dec 22, 2017
24c5a28
Test partial file content
asvetlov Dec 22, 2017
10b0b61
Add NEWS entry
asvetlov Dec 22, 2017
5f782a5
Add test for abstract sock_sendfile
asvetlov Dec 22, 2017
3068aa9
Polishment
asvetlov Dec 22, 2017
898766e
Don't call loop.set_debug(True)
asvetlov Dec 22, 2017
6e70a62
Revert set_debug() back
asvetlov Dec 22, 2017
20b3778
Work on tests
asvetlov Dec 22, 2017
7ed0e67
Improve test cleanup
asvetlov Dec 22, 2017
f4b61b1
Make tests stable
asvetlov Dec 23, 2017
6f6b94b
Merge remote-tracking branch 'upstream/master' into sock_sendfile
asvetlov Dec 24, 2017
1c05795
Refactor _check_sendfile_params helper
asvetlov Dec 24, 2017
f670fad
Use NotImplementedError in private socket.sendfile implementation.
asvetlov Dec 24, 2017
26f6d4a
Refactoring
asvetlov Dec 24, 2017
87d4804
Polish error text
asvetlov Dec 24, 2017
76eeef5
Update docs
asvetlov Dec 24, 2017
35071ea
Fix check for SOCK_STREAM
asvetlov Dec 24, 2017
92ae10b
Accept int fd along with socket instance
asvetlov Dec 24, 2017
8c451d2
Drop support for int FD for socket
asvetlov Dec 24, 2017
921fe69
NotImplementedError -> RuntimeError
asvetlov Dec 24, 2017
0f2a48f
Switch to RuntimeError back
asvetlov Dec 24, 2017
1cc0e8f
Merge branch 'master' into sock_sendfile
asvetlov Dec 30, 2017
272029e
Add sendfile fallback
asvetlov Dec 30, 2017
fa0954e
Fix private names
asvetlov Dec 30, 2017
0ddb410
Another renaming
asvetlov Dec 30, 2017
c18c3c8
Work on
asvetlov Dec 30, 2017
a4a174b
Fix NEWS.d
asvetlov Dec 30, 2017
448e949
More tests
asvetlov Dec 30, 2017
46c92ed
Polish docs
asvetlov Dec 31, 2017
8dd45dc
Revert changes in socket.py
asvetlov Dec 31, 2017
c9112b9
Merge branch 'master' into sock_sendfile
asvetlov Dec 31, 2017
b6273e4
More tests
asvetlov Dec 31, 2017
4d88063
Tests
asvetlov Dec 31, 2017
71b9f93
Test partial file with fallback
asvetlov Dec 31, 2017
db2445e
Improve test coverage
asvetlov Dec 31, 2017
46a6b46
Switch to custom exception type
asvetlov Dec 31, 2017
2ec48f8
read -> readinto
asvetlov Dec 31, 2017
8a6ed3f
Make tests more stable
asvetlov Dec 31, 2017
44da800
Merge remote-tracking branch 'upstream/master' into sock_sendfile
asvetlov Jan 1, 2018
967408e
Change base class for _SendfileNotAvailable to RuntimeError
asvetlov Jan 2, 2018
099dc56
Better exception type when sendfile is not available
asvetlov Jan 2, 2018
f9701cb
Add a test for mixed sock_send and sock_sendfile
asvetlov Jan 2, 2018
a30acc9
Add cancellation callback
asvetlov Jan 2, 2018
f7d9bab
More tests
asvetlov Jan 2, 2018
4d25927
Support tribool for fallback
asvetlov Jan 2, 2018
e303db9
Fix signature of abstract sock_sendfile
asvetlov Jan 2, 2018
84e1057
Merge branch 'master' into sock_sendfile
asvetlov Jan 2, 2018
657aa67
Revert back tribool for fallback
asvetlov Jan 16, 2018
dd4143a
Fix tests
asvetlov Jan 16, 2018
96d0032
Merge branch 'master' into sock_sendfile
asvetlov Jan 16, 2018
5deb0e2
Add a space
asvetlov Jan 16, 2018
3c9abaf
Fix sock_sendfile callback
asvetlov Jan 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from . import format_helpers


class SendfileUnsupportedError(ValueError):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendFileNotSupported

Why not raise NotImplementedError('sendfile is not available')?

"""Sendfile preparation failed."""


class Handle:
"""Object returned by callback registration methods."""

Expand Down Expand Up @@ -449,6 +453,9 @@ async def sock_connect(self, sock, address):
async def sock_accept(self, sock):
raise NotImplementedError

async def sock_sendfile(self, sock, file, offset=0, count=None):
raise NotImplementedError

# Signal handling.

def add_signal_handler(self, sig, callback, *args):
Expand Down
94 changes: 94 additions & 0 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Selector event loop for Unix with signal handling."""

import errno
import io
import os
import selectors
import signal
Expand Down Expand Up @@ -299,6 +300,99 @@ async def create_unix_server(
ssl_handshake_timeout=ssl_handshake_timeout)
return server

async def sock_sendfile(self, sock, file, offset=0, count=None):
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
try:
os.sendfile
except AttributeError as exc:
raise events.SendfileUnsupportedError(exc)
self._check_sendfile_params(sock, file, offset, count)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should support passing a FD for file.

try:
fileno = file.fileno()
except (AttributeError, io.UnsupportedOperation) as exc:
raise events.SendfileUnsupportedError(exc) # not a regular file

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is redundant. I'd do something like this:

if isinstance(file, int):
    fileno = file
else:
    # factor out the 'fileno()' getting part from `_ensure_fd_no_transport`

try:
fsize = os.fstat(fileno).st_size
except OSError as exc:
raise events.SendfileUnsupportedError(exc) # not a regular file

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise events.SendfileUnsupportedError('fstat call failed') from exc

if not fsize:
return 0 # empty file
blocksize = fsize if not count else count

fut = self.create_future()
self._sock_sendfile(fut, None, sock, file, offset, count, blocksize, 0)
return await fut

def _sock_sendfile(self, fut, registered_fd, sock, file, offset,
count, blocksize, total_sent):
if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_writer(registered_fd)
if fut.cancelled():

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You copied this check from other loop.sock_* methods, but in fact it's incorrect. It will only handle a case when a Future was cancelled right away.

The correct way of doing this (and that's btw what I'm doing in uvloop and planned to fix in asyncio) is to register a special callback on fut. That callback should check if the future is cancelled, and if it is, cancel the sendfile operation (remove writer).

Otherwise, we can't cancel sendfile while fd is in selector.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this check?

self._update_filepos(file, offset, total_sent)
return
if count:
blocksize = count - total_sent
if blocksize <= 0:
self._update_filepos(file, offset, total_sent)
fut.set_result(total_sent)
return

fd = sock.fileno()
try:
sent = os.sendfile(fd, file.fileno(), offset, blocksize)
except (BlockingIOError, InterruptedError):
self.add_writer(fd, self._sock_sendfile, fut, fd, sock,
file, offset, count, blocksize. total_sent)
except OSError as exc:
if total_sent == 0:
# We can get here for different reasons, the main
# one being 'file' is not a regular mmap(2)-like
# file, in which case we'll fall back on using
# plain send().
err = events.SendfileUnsupportedError(exc)
self._update_filepos(file, offset, total_sent)
fut.set_exception(err)
else:
self._update_filepos(file, offset, total_sent)
fut.set_exception(exc)
except Exception as exc:
self._update_filepos(file, offset, total_sent)
fut.set_exception(exc)
else:
if sent == 0:
# EOF
self._update_filepos(file, offset, total_sent)
fut.set_result(total_sent)
else:
offset += sent
total_sent += sent
fd = sock.fileno()
self.add_writer(fd, self._sock_sendfile, fut, fd, sock,
file, offset, count, blocksize, total_sent)

def _update_filepos(self, file, offset, total_sent):
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset)

def _check_sendfile_params(self, sock, file, offset, count):
if 'b' not in getattr(file, 'mode', 'b'):
raise ValueError("file should be opened in binary mode")
if not sock.type & socket.SOCK_STREAM:
raise ValueError("only SOCK_STREAM type sockets are supported")
if count is not None:
if not isinstance(count, int):
raise TypeError(
"count must be a positive integer (got {!r})".format(count))
if count <= 0:
raise ValueError(
"count must be a positive integer (got {!r})".format(count))



class _UnixReadPipeTransport(transports.ReadTransport):

Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2589,6 +2589,8 @@ async def inner():
await loop.sock_connect(f, f)
with self.assertRaises(NotImplementedError):
await loop.sock_accept(f)
with self.assertRaises(NotImplementedError):
await loop.sock_sendfile(f, mock.Mock())
with self.assertRaises(NotImplementedError):
await loop.connect_read_pipe(f, mock.sentinel.pipe)
with self.assertRaises(NotImplementedError):
Expand Down
87 changes: 87 additions & 0 deletions Lib/test/test_asyncio/test_unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,93 @@ def test_create_unix_connection_ssl_timeout_with_plain_sock(self):
self.loop.run_until_complete(coro)


@unittest.skipUnless(hasattr(os, 'sendfile'),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test like this:

await loop.sock_sendall(5 mb of data)
await loop.sock_sendfile(some file)
await loop.sock_sendall(5 mb of data)

just to ensure that sendfile and sendall work fine together.

'sendfile is not supported')
class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
DATA = b"12345abcde" * 16 * 1024 # 160 KiB

class MyProto(asyncio.Protocol):

def __init__(self):
self.started = False
self.closed = False
self.data = bytearray()

def connection_made(self, transport):
self.started = True

def data_received(self, data):
self.data.extend(data)

def connection_lost(self, exc):
self.closed = True

@classmethod
def setUpClass(cls):
with open(support.TESTFN, 'wb') as fp:
fp.write(cls.DATA)
super().setUpClass()

@classmethod
def tearDownClass(cls):
support.unlink(support.TESTFN)
super().tearDownClass()

def setUp(self):
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
self.file = open(support.TESTFN, 'rb')
self.addCleanup(self.file.close)
super().setUp()

def tearDown(self):
super().tearDown()

def make_socket(self, blocking=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
self.addCleanup(sock.close)
return sock

def run_loop(self, coro):
return self.loop.run_until_complete(coro)

def prepare(self):
sock = self.make_socket()
proto = self.MyProto()
port = support.find_unused_port()
server = self.run_loop(self.loop.create_server(
lambda: proto, support.HOST, port))
self.run_loop(self.loop.sock_connect(sock, (support.HOST, port)))

def cleanup():
server.close()
self.run_loop(server.wait_closed())

self.addCleanup(cleanup)

return sock, proto

def test_success(self):
sock, proto = self.prepare()
self.run_loop(self.loop.sock_sendfile(sock, self.file))

self.assertEqual(proto.data, self.DATA)
self.assertEqual(self.file.tell(), len(self.DATA))

def test_with_offset_and_count(self):
sock, proto = self.prepare()
self.run_loop(self.loop.sock_sendfile(sock, self.file, 1000, 2000))

self.assertEqual(proto.data, self.DATA[1000:3000])
self.assertEqual(self.file.tell(), 3000)

def test_blocking_socket(self):
self.loop.set_debug(True)
sock = self.make_socket(True)
with self.assertRaises(ValueError):
self.run_loop(self.loop.sock_sendfile(sock, self.file))


class UnixReadPipeTransportTests(test_utils.TestCase):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement ``loop.sock_sendfile`` for asyncio UNIX event loop.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we implement a fallback, it's not just for Unix anymore. Change to "Implement loop.sock_sendfile".