-
-
Notifications
You must be signed in to change notification settings - Fork 34.8k
bpo-32410: Implement loop.sock_sendfile() #4976
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
ee13d5b
8b610ad
8d658be
24c5a28
10b0b61
5f782a5
3068aa9
898766e
6e70a62
20b3778
7ed0e67
f4b61b1
6f6b94b
1c05795
f670fad
26f6d4a
87d4804
76eeef5
35071ea
92ae10b
8c451d2
921fe69
0f2a48f
1cc0e8f
272029e
fa0954e
0ddb410
c18c3c8
a4a174b
448e949
46c92ed
8dd45dc
c9112b9
b6273e4
4d88063
71b9f93
db2445e
46a6b46
2ec48f8
8a6ed3f
44da800
967408e
099dc56
f9701cb
a30acc9
f7d9bab
4d25927
e303db9
84e1057
657aa67
dd4143a
96d0032
5deb0e2
3c9abaf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| """Selector event loop for Unix with signal handling.""" | ||
|
|
||
| import errno | ||
| import io | ||
| import os | ||
| import selectors | ||
| import signal | ||
|
|
@@ -299,6 +300,99 @@ async def create_unix_server( | |
| ssl_handshake_timeout=ssl_handshake_timeout) | ||
| return server | ||
|
|
||
| async def sock_sendfile(self, sock, file, offset=0, count=None): | ||
| if self._debug and sock.gettimeout() != 0: | ||
| raise ValueError("the socket must be non-blocking") | ||
| try: | ||
| os.sendfile | ||
| except AttributeError as exc: | ||
| raise events.SendfileUnsupportedError(exc) | ||
| self._check_sendfile_params(sock, file, offset, count) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should support passing a FD for |
||
| try: | ||
| fileno = file.fileno() | ||
| except (AttributeError, io.UnsupportedOperation) as exc: | ||
| raise events.SendfileUnsupportedError(exc) # not a regular file | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is redundant. I'd do something like this: if isinstance(file, int):
fileno = file
else:
# factor out the 'fileno()' getting part from `_ensure_fd_no_transport` |
||
| try: | ||
| fsize = os.fstat(fileno).st_size | ||
| except OSError as exc: | ||
| raise events.SendfileUnsupportedError(exc) # not a regular file | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| if not fsize: | ||
| return 0 # empty file | ||
| blocksize = fsize if not count else count | ||
|
|
||
| fut = self.create_future() | ||
| self._sock_sendfile(fut, None, sock, file, offset, count, blocksize, 0) | ||
| return await fut | ||
|
|
||
| def _sock_sendfile(self, fut, registered_fd, sock, file, offset, | ||
| count, blocksize, total_sent): | ||
| if registered_fd is not None: | ||
| # Remove the callback early. It should be rare that the | ||
| # selector says the fd is ready but the call still returns | ||
| # EAGAIN, and I am willing to take a hit in that case in | ||
| # order to simplify the common case. | ||
| self.remove_writer(registered_fd) | ||
| if fut.cancelled(): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You copied this check from other The correct way of doing this (and that's btw what I'm doing in uvloop and planned to fix in asyncio) is to register a special callback on Otherwise, we can't cancel sendfile while
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we still need this check? |
||
| self._update_filepos(file, offset, total_sent) | ||
| return | ||
| if count: | ||
| blocksize = count - total_sent | ||
| if blocksize <= 0: | ||
| self._update_filepos(file, offset, total_sent) | ||
| fut.set_result(total_sent) | ||
| return | ||
|
|
||
| fd = sock.fileno() | ||
| try: | ||
| sent = os.sendfile(fd, file.fileno(), offset, blocksize) | ||
| except (BlockingIOError, InterruptedError): | ||
| self.add_writer(fd, self._sock_sendfile, fut, fd, sock, | ||
| file, offset, count, blocksize. total_sent) | ||
| except OSError as exc: | ||
| if total_sent == 0: | ||
| # We can get here for different reasons, the main | ||
| # one being 'file' is not a regular mmap(2)-like | ||
| # file, in which case we'll fall back on using | ||
| # plain send(). | ||
| err = events.SendfileUnsupportedError(exc) | ||
| self._update_filepos(file, offset, total_sent) | ||
| fut.set_exception(err) | ||
| else: | ||
| self._update_filepos(file, offset, total_sent) | ||
| fut.set_exception(exc) | ||
| except Exception as exc: | ||
| self._update_filepos(file, offset, total_sent) | ||
| fut.set_exception(exc) | ||
| else: | ||
| if sent == 0: | ||
| # EOF | ||
| self._update_filepos(file, offset, total_sent) | ||
| fut.set_result(total_sent) | ||
| else: | ||
| offset += sent | ||
| total_sent += sent | ||
| fd = sock.fileno() | ||
| self.add_writer(fd, self._sock_sendfile, fut, fd, sock, | ||
| file, offset, count, blocksize, total_sent) | ||
|
|
||
| def _update_filepos(self, file, offset, total_sent): | ||
| if total_sent > 0 and hasattr(file, 'seek'): | ||
| file.seek(offset) | ||
|
|
||
| def _check_sendfile_params(self, sock, file, offset, count): | ||
| if 'b' not in getattr(file, 'mode', 'b'): | ||
| raise ValueError("file should be opened in binary mode") | ||
| if not sock.type & socket.SOCK_STREAM: | ||
| raise ValueError("only SOCK_STREAM type sockets are supported") | ||
| if count is not None: | ||
| if not isinstance(count, int): | ||
| raise TypeError( | ||
| "count must be a positive integer (got {!r})".format(count)) | ||
| if count <= 0: | ||
| raise ValueError( | ||
| "count must be a positive integer (got {!r})".format(count)) | ||
|
|
||
|
|
||
|
|
||
| class _UnixReadPipeTransport(transports.ReadTransport): | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -400,6 +400,93 @@ def test_create_unix_connection_ssl_timeout_with_plain_sock(self): | |
| self.loop.run_until_complete(coro) | ||
|
|
||
|
|
||
| @unittest.skipUnless(hasattr(os, 'sendfile'), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a test like this: await loop.sock_sendall(5 mb of data)
await loop.sock_sendfile(some file)
await loop.sock_sendall(5 mb of data)just to ensure that sendfile and sendall work fine together. |
||
| 'sendfile is not supported') | ||
| class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase): | ||
| DATA = b"12345abcde" * 16 * 1024 # 160 KiB | ||
|
|
||
| class MyProto(asyncio.Protocol): | ||
|
|
||
| def __init__(self): | ||
| self.started = False | ||
| self.closed = False | ||
| self.data = bytearray() | ||
|
|
||
| def connection_made(self, transport): | ||
| self.started = True | ||
|
|
||
| def data_received(self, data): | ||
| self.data.extend(data) | ||
|
|
||
| def connection_lost(self, exc): | ||
| self.closed = True | ||
|
|
||
| @classmethod | ||
| def setUpClass(cls): | ||
| with open(support.TESTFN, 'wb') as fp: | ||
| fp.write(cls.DATA) | ||
| super().setUpClass() | ||
|
|
||
| @classmethod | ||
| def tearDownClass(cls): | ||
| support.unlink(support.TESTFN) | ||
| super().tearDownClass() | ||
|
|
||
| def setUp(self): | ||
| self.loop = asyncio.new_event_loop() | ||
| self.set_event_loop(self.loop) | ||
| self.file = open(support.TESTFN, 'rb') | ||
| self.addCleanup(self.file.close) | ||
| super().setUp() | ||
|
|
||
| def tearDown(self): | ||
| super().tearDown() | ||
|
|
||
| def make_socket(self, blocking=False): | ||
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
| sock.setblocking(False) | ||
| self.addCleanup(sock.close) | ||
| return sock | ||
|
|
||
| def run_loop(self, coro): | ||
| return self.loop.run_until_complete(coro) | ||
|
|
||
| def prepare(self): | ||
| sock = self.make_socket() | ||
| proto = self.MyProto() | ||
| port = support.find_unused_port() | ||
| server = self.run_loop(self.loop.create_server( | ||
| lambda: proto, support.HOST, port)) | ||
| self.run_loop(self.loop.sock_connect(sock, (support.HOST, port))) | ||
|
|
||
| def cleanup(): | ||
| server.close() | ||
| self.run_loop(server.wait_closed()) | ||
|
|
||
| self.addCleanup(cleanup) | ||
|
|
||
| return sock, proto | ||
|
|
||
| def test_success(self): | ||
| sock, proto = self.prepare() | ||
| self.run_loop(self.loop.sock_sendfile(sock, self.file)) | ||
|
|
||
| self.assertEqual(proto.data, self.DATA) | ||
| self.assertEqual(self.file.tell(), len(self.DATA)) | ||
|
|
||
| def test_with_offset_and_count(self): | ||
| sock, proto = self.prepare() | ||
| self.run_loop(self.loop.sock_sendfile(sock, self.file, 1000, 2000)) | ||
|
|
||
| self.assertEqual(proto.data, self.DATA[1000:3000]) | ||
| self.assertEqual(self.file.tell(), 3000) | ||
|
|
||
| def test_blocking_socket(self): | ||
| self.loop.set_debug(True) | ||
| sock = self.make_socket(True) | ||
| with self.assertRaises(ValueError): | ||
| self.run_loop(self.loop.sock_sendfile(sock, self.file)) | ||
|
|
||
|
|
||
| class UnixReadPipeTransportTests(test_utils.TestCase): | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Implement ``loop.sock_sendfile`` for asyncio UNIX event loop. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that we implement a fallback, it's not just for Unix anymore. Change to "Implement loop.sock_sendfile". |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SendFileNotSupportedWhy not
raise NotImplementedError('sendfile is not available')?