selector_events.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242
  1. """Event loop using a selector and related classes.
  2. A selector is a "notify-when-ready" multiplexer. For a subclass which
  3. also includes support for signal handling, see the unix_events sub-module.
  4. """
  5. __all__ = 'BaseSelectorEventLoop',
  6. import collections
  7. import errno
  8. import functools
  9. import selectors
  10. import socket
  11. import warnings
  12. import weakref
  13. try:
  14. import ssl
  15. except ImportError: # pragma: no cover
  16. ssl = None
  17. from . import base_events
  18. from . import constants
  19. from . import events
  20. from . import futures
  21. from . import protocols
  22. from . import sslproto
  23. from . import transports
  24. from . import trsock
  25. from .log import logger
  26. def _test_selector_event(selector, fd, event):
  27. # Test if the selector is monitoring 'event' events
  28. # for the file descriptor 'fd'.
  29. try:
  30. key = selector.get_key(fd)
  31. except KeyError:
  32. return False
  33. else:
  34. return bool(key.events & event)
  35. class BaseSelectorEventLoop(base_events.BaseEventLoop):
  36. """Selector event loop.
  37. See events.EventLoop for API specification.
  38. """
  39. def __init__(self, selector=None):
  40. super().__init__()
  41. if selector is None:
  42. selector = selectors.DefaultSelector()
  43. logger.debug('Using selector: %s', selector.__class__.__name__)
  44. self._selector = selector
  45. self._make_self_pipe()
  46. self._transports = weakref.WeakValueDictionary()
  47. def _make_socket_transport(self, sock, protocol, waiter=None, *,
  48. extra=None, server=None):
  49. return _SelectorSocketTransport(self, sock, protocol, waiter,
  50. extra, server)
  51. def _make_ssl_transport(
  52. self, rawsock, protocol, sslcontext, waiter=None,
  53. *, server_side=False, server_hostname=None,
  54. extra=None, server=None,
  55. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  56. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
  57. ):
  58. ssl_protocol = sslproto.SSLProtocol(
  59. self, protocol, sslcontext, waiter,
  60. server_side, server_hostname,
  61. ssl_handshake_timeout=ssl_handshake_timeout,
  62. ssl_shutdown_timeout=ssl_shutdown_timeout
  63. )
  64. _SelectorSocketTransport(self, rawsock, ssl_protocol,
  65. extra=extra, server=server)
  66. return ssl_protocol._app_transport
  67. def _make_datagram_transport(self, sock, protocol,
  68. address=None, waiter=None, extra=None):
  69. return _SelectorDatagramTransport(self, sock, protocol,
  70. address, waiter, extra)
  71. def close(self):
  72. if self.is_running():
  73. raise RuntimeError("Cannot close a running event loop")
  74. if self.is_closed():
  75. return
  76. self._close_self_pipe()
  77. super().close()
  78. if self._selector is not None:
  79. self._selector.close()
  80. self._selector = None
  81. def _close_self_pipe(self):
  82. self._remove_reader(self._ssock.fileno())
  83. self._ssock.close()
  84. self._ssock = None
  85. self._csock.close()
  86. self._csock = None
  87. self._internal_fds -= 1
  88. def _make_self_pipe(self):
  89. # A self-socket, really. :-)
  90. self._ssock, self._csock = socket.socketpair()
  91. self._ssock.setblocking(False)
  92. self._csock.setblocking(False)
  93. self._internal_fds += 1
  94. self._add_reader(self._ssock.fileno(), self._read_from_self)
  95. def _process_self_data(self, data):
  96. pass
  97. def _read_from_self(self):
  98. while True:
  99. try:
  100. data = self._ssock.recv(4096)
  101. if not data:
  102. break
  103. self._process_self_data(data)
  104. except InterruptedError:
  105. continue
  106. except BlockingIOError:
  107. break
  108. def _write_to_self(self):
  109. # This may be called from a different thread, possibly after
  110. # _close_self_pipe() has been called or even while it is
  111. # running. Guard for self._csock being None or closed. When
  112. # a socket is closed, send() raises OSError (with errno set to
  113. # EBADF, but let's not rely on the exact error code).
  114. csock = self._csock
  115. if csock is None:
  116. return
  117. try:
  118. csock.send(b'\0')
  119. except OSError:
  120. if self._debug:
  121. logger.debug("Fail to write a null byte into the "
  122. "self-pipe socket",
  123. exc_info=True)
  124. def _start_serving(self, protocol_factory, sock,
  125. sslcontext=None, server=None, backlog=100,
  126. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  127. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
  128. self._add_reader(sock.fileno(), self._accept_connection,
  129. protocol_factory, sock, sslcontext, server, backlog,
  130. ssl_handshake_timeout, ssl_shutdown_timeout)
  131. def _accept_connection(
  132. self, protocol_factory, sock,
  133. sslcontext=None, server=None, backlog=100,
  134. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  135. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
  136. # This method is only called once for each event loop tick where the
  137. # listening socket has triggered an EVENT_READ. There may be multiple
  138. # connections waiting for an .accept() so it is called in a loop.
  139. # See https://bugs.python.org/issue27906 for more details.
  140. for _ in range(backlog):
  141. try:
  142. conn, addr = sock.accept()
  143. if self._debug:
  144. logger.debug("%r got a new connection from %r: %r",
  145. server, addr, conn)
  146. conn.setblocking(False)
  147. except (BlockingIOError, InterruptedError, ConnectionAbortedError):
  148. # Early exit because the socket accept buffer is empty.
  149. return None
  150. except OSError as exc:
  151. # There's nowhere to send the error, so just log it.
  152. if exc.errno in (errno.EMFILE, errno.ENFILE,
  153. errno.ENOBUFS, errno.ENOMEM):
  154. # Some platforms (e.g. Linux keep reporting the FD as
  155. # ready, so we remove the read handler temporarily.
  156. # We'll try again in a while.
  157. self.call_exception_handler({
  158. 'message': 'socket.accept() out of system resource',
  159. 'exception': exc,
  160. 'socket': trsock.TransportSocket(sock),
  161. })
  162. self._remove_reader(sock.fileno())
  163. self.call_later(constants.ACCEPT_RETRY_DELAY,
  164. self._start_serving,
  165. protocol_factory, sock, sslcontext, server,
  166. backlog, ssl_handshake_timeout,
  167. ssl_shutdown_timeout)
  168. else:
  169. raise # The event loop will catch, log and ignore it.
  170. else:
  171. extra = {'peername': addr}
  172. accept = self._accept_connection2(
  173. protocol_factory, conn, extra, sslcontext, server,
  174. ssl_handshake_timeout, ssl_shutdown_timeout)
  175. self.create_task(accept)
  176. async def _accept_connection2(
  177. self, protocol_factory, conn, extra,
  178. sslcontext=None, server=None,
  179. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  180. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
  181. protocol = None
  182. transport = None
  183. try:
  184. protocol = protocol_factory()
  185. waiter = self.create_future()
  186. if sslcontext:
  187. transport = self._make_ssl_transport(
  188. conn, protocol, sslcontext, waiter=waiter,
  189. server_side=True, extra=extra, server=server,
  190. ssl_handshake_timeout=ssl_handshake_timeout,
  191. ssl_shutdown_timeout=ssl_shutdown_timeout)
  192. else:
  193. transport = self._make_socket_transport(
  194. conn, protocol, waiter=waiter, extra=extra,
  195. server=server)
  196. try:
  197. await waiter
  198. except BaseException:
  199. transport.close()
  200. raise
  201. # It's now up to the protocol to handle the connection.
  202. except (SystemExit, KeyboardInterrupt):
  203. raise
  204. except BaseException as exc:
  205. if self._debug:
  206. context = {
  207. 'message':
  208. 'Error on transport creation for incoming connection',
  209. 'exception': exc,
  210. }
  211. if protocol is not None:
  212. context['protocol'] = protocol
  213. if transport is not None:
  214. context['transport'] = transport
  215. self.call_exception_handler(context)
  216. def _ensure_fd_no_transport(self, fd):
  217. fileno = fd
  218. if not isinstance(fileno, int):
  219. try:
  220. fileno = int(fileno.fileno())
  221. except (AttributeError, TypeError, ValueError):
  222. # This code matches selectors._fileobj_to_fd function.
  223. raise ValueError(f"Invalid file object: {fd!r}") from None
  224. try:
  225. transport = self._transports[fileno]
  226. except KeyError:
  227. pass
  228. else:
  229. if not transport.is_closing():
  230. raise RuntimeError(
  231. f'File descriptor {fd!r} is used by transport '
  232. f'{transport!r}')
  233. def _add_reader(self, fd, callback, *args):
  234. self._check_closed()
  235. handle = events.Handle(callback, args, self, None)
  236. try:
  237. key = self._selector.get_key(fd)
  238. except KeyError:
  239. self._selector.register(fd, selectors.EVENT_READ,
  240. (handle, None))
  241. else:
  242. mask, (reader, writer) = key.events, key.data
  243. self._selector.modify(fd, mask | selectors.EVENT_READ,
  244. (handle, writer))
  245. if reader is not None:
  246. reader.cancel()
  247. return handle
  248. def _remove_reader(self, fd):
  249. if self.is_closed():
  250. return False
  251. try:
  252. key = self._selector.get_key(fd)
  253. except KeyError:
  254. return False
  255. else:
  256. mask, (reader, writer) = key.events, key.data
  257. mask &= ~selectors.EVENT_READ
  258. if not mask:
  259. self._selector.unregister(fd)
  260. else:
  261. self._selector.modify(fd, mask, (None, writer))
  262. if reader is not None:
  263. reader.cancel()
  264. return True
  265. else:
  266. return False
  267. def _add_writer(self, fd, callback, *args):
  268. self._check_closed()
  269. handle = events.Handle(callback, args, self, None)
  270. try:
  271. key = self._selector.get_key(fd)
  272. except KeyError:
  273. self._selector.register(fd, selectors.EVENT_WRITE,
  274. (None, handle))
  275. else:
  276. mask, (reader, writer) = key.events, key.data
  277. self._selector.modify(fd, mask | selectors.EVENT_WRITE,
  278. (reader, handle))
  279. if writer is not None:
  280. writer.cancel()
  281. return handle
  282. def _remove_writer(self, fd):
  283. """Remove a writer callback."""
  284. if self.is_closed():
  285. return False
  286. try:
  287. key = self._selector.get_key(fd)
  288. except KeyError:
  289. return False
  290. else:
  291. mask, (reader, writer) = key.events, key.data
  292. # Remove both writer and connector.
  293. mask &= ~selectors.EVENT_WRITE
  294. if not mask:
  295. self._selector.unregister(fd)
  296. else:
  297. self._selector.modify(fd, mask, (reader, None))
  298. if writer is not None:
  299. writer.cancel()
  300. return True
  301. else:
  302. return False
  303. def add_reader(self, fd, callback, *args):
  304. """Add a reader callback."""
  305. self._ensure_fd_no_transport(fd)
  306. self._add_reader(fd, callback, *args)
  307. def remove_reader(self, fd):
  308. """Remove a reader callback."""
  309. self._ensure_fd_no_transport(fd)
  310. return self._remove_reader(fd)
  311. def add_writer(self, fd, callback, *args):
  312. """Add a writer callback.."""
  313. self._ensure_fd_no_transport(fd)
  314. self._add_writer(fd, callback, *args)
  315. def remove_writer(self, fd):
  316. """Remove a writer callback."""
  317. self._ensure_fd_no_transport(fd)
  318. return self._remove_writer(fd)
  319. async def sock_recv(self, sock, n):
  320. """Receive data from the socket.
  321. The return value is a bytes object representing the data received.
  322. The maximum amount of data to be received at once is specified by
  323. nbytes.
  324. """
  325. base_events._check_ssl_socket(sock)
  326. if self._debug and sock.gettimeout() != 0:
  327. raise ValueError("the socket must be non-blocking")
  328. try:
  329. return sock.recv(n)
  330. except (BlockingIOError, InterruptedError):
  331. pass
  332. fut = self.create_future()
  333. fd = sock.fileno()
  334. self._ensure_fd_no_transport(fd)
  335. handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
  336. fut.add_done_callback(
  337. functools.partial(self._sock_read_done, fd, handle=handle))
  338. return await fut
  339. def _sock_read_done(self, fd, fut, handle=None):
  340. if handle is None or not handle.cancelled():
  341. self.remove_reader(fd)
  342. def _sock_recv(self, fut, sock, n):
  343. # _sock_recv() can add itself as an I/O callback if the operation can't
  344. # be done immediately. Don't use it directly, call sock_recv().
  345. if fut.done():
  346. return
  347. try:
  348. data = sock.recv(n)
  349. except (BlockingIOError, InterruptedError):
  350. return # try again next time
  351. except (SystemExit, KeyboardInterrupt):
  352. raise
  353. except BaseException as exc:
  354. fut.set_exception(exc)
  355. else:
  356. fut.set_result(data)
  357. async def sock_recv_into(self, sock, buf):
  358. """Receive data from the socket.
  359. The received data is written into *buf* (a writable buffer).
  360. The return value is the number of bytes written.
  361. """
  362. base_events._check_ssl_socket(sock)
  363. if self._debug and sock.gettimeout() != 0:
  364. raise ValueError("the socket must be non-blocking")
  365. try:
  366. return sock.recv_into(buf)
  367. except (BlockingIOError, InterruptedError):
  368. pass
  369. fut = self.create_future()
  370. fd = sock.fileno()
  371. self._ensure_fd_no_transport(fd)
  372. handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
  373. fut.add_done_callback(
  374. functools.partial(self._sock_read_done, fd, handle=handle))
  375. return await fut
  376. def _sock_recv_into(self, fut, sock, buf):
  377. # _sock_recv_into() can add itself as an I/O callback if the operation
  378. # can't be done immediately. Don't use it directly, call
  379. # sock_recv_into().
  380. if fut.done():
  381. return
  382. try:
  383. nbytes = sock.recv_into(buf)
  384. except (BlockingIOError, InterruptedError):
  385. return # try again next time
  386. except (SystemExit, KeyboardInterrupt):
  387. raise
  388. except BaseException as exc:
  389. fut.set_exception(exc)
  390. else:
  391. fut.set_result(nbytes)
  392. async def sock_recvfrom(self, sock, bufsize):
  393. """Receive a datagram from a datagram socket.
  394. The return value is a tuple of (bytes, address) representing the
  395. datagram received and the address it came from.
  396. The maximum amount of data to be received at once is specified by
  397. nbytes.
  398. """
  399. base_events._check_ssl_socket(sock)
  400. if self._debug and sock.gettimeout() != 0:
  401. raise ValueError("the socket must be non-blocking")
  402. try:
  403. return sock.recvfrom(bufsize)
  404. except (BlockingIOError, InterruptedError):
  405. pass
  406. fut = self.create_future()
  407. fd = sock.fileno()
  408. self._ensure_fd_no_transport(fd)
  409. handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
  410. fut.add_done_callback(
  411. functools.partial(self._sock_read_done, fd, handle=handle))
  412. return await fut
  413. def _sock_recvfrom(self, fut, sock, bufsize):
  414. # _sock_recvfrom() can add itself as an I/O callback if the operation
  415. # can't be done immediately. Don't use it directly, call
  416. # sock_recvfrom().
  417. if fut.done():
  418. return
  419. try:
  420. result = sock.recvfrom(bufsize)
  421. except (BlockingIOError, InterruptedError):
  422. return # try again next time
  423. except (SystemExit, KeyboardInterrupt):
  424. raise
  425. except BaseException as exc:
  426. fut.set_exception(exc)
  427. else:
  428. fut.set_result(result)
  429. async def sock_recvfrom_into(self, sock, buf, nbytes=0):
  430. """Receive data from the socket.
  431. The received data is written into *buf* (a writable buffer).
  432. The return value is a tuple of (number of bytes written, address).
  433. """
  434. base_events._check_ssl_socket(sock)
  435. if self._debug and sock.gettimeout() != 0:
  436. raise ValueError("the socket must be non-blocking")
  437. if not nbytes:
  438. nbytes = len(buf)
  439. try:
  440. return sock.recvfrom_into(buf, nbytes)
  441. except (BlockingIOError, InterruptedError):
  442. pass
  443. fut = self.create_future()
  444. fd = sock.fileno()
  445. self._ensure_fd_no_transport(fd)
  446. handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
  447. nbytes)
  448. fut.add_done_callback(
  449. functools.partial(self._sock_read_done, fd, handle=handle))
  450. return await fut
  451. def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
  452. # _sock_recv_into() can add itself as an I/O callback if the operation
  453. # can't be done immediately. Don't use it directly, call
  454. # sock_recv_into().
  455. if fut.done():
  456. return
  457. try:
  458. result = sock.recvfrom_into(buf, bufsize)
  459. except (BlockingIOError, InterruptedError):
  460. return # try again next time
  461. except (SystemExit, KeyboardInterrupt):
  462. raise
  463. except BaseException as exc:
  464. fut.set_exception(exc)
  465. else:
  466. fut.set_result(result)
  467. async def sock_sendall(self, sock, data):
  468. """Send data to the socket.
  469. The socket must be connected to a remote socket. This method continues
  470. to send data from data until either all data has been sent or an
  471. error occurs. None is returned on success. On error, an exception is
  472. raised, and there is no way to determine how much data, if any, was
  473. successfully processed by the receiving end of the connection.
  474. """
  475. base_events._check_ssl_socket(sock)
  476. if self._debug and sock.gettimeout() != 0:
  477. raise ValueError("the socket must be non-blocking")
  478. try:
  479. n = sock.send(data)
  480. except (BlockingIOError, InterruptedError):
  481. n = 0
  482. if n == len(data):
  483. # all data sent
  484. return
  485. fut = self.create_future()
  486. fd = sock.fileno()
  487. self._ensure_fd_no_transport(fd)
  488. # use a trick with a list in closure to store a mutable state
  489. handle = self._add_writer(fd, self._sock_sendall, fut, sock,
  490. memoryview(data), [n])
  491. fut.add_done_callback(
  492. functools.partial(self._sock_write_done, fd, handle=handle))
  493. return await fut
  494. def _sock_sendall(self, fut, sock, view, pos):
  495. if fut.done():
  496. # Future cancellation can be scheduled on previous loop iteration
  497. return
  498. start = pos[0]
  499. try:
  500. n = sock.send(view[start:])
  501. except (BlockingIOError, InterruptedError):
  502. return
  503. except (SystemExit, KeyboardInterrupt):
  504. raise
  505. except BaseException as exc:
  506. fut.set_exception(exc)
  507. return
  508. start += n
  509. if start == len(view):
  510. fut.set_result(None)
  511. else:
  512. pos[0] = start
  513. async def sock_sendto(self, sock, data, address):
  514. """Send data to the socket.
  515. The socket must be connected to a remote socket. This method continues
  516. to send data from data until either all data has been sent or an
  517. error occurs. None is returned on success. On error, an exception is
  518. raised, and there is no way to determine how much data, if any, was
  519. successfully processed by the receiving end of the connection.
  520. """
  521. base_events._check_ssl_socket(sock)
  522. if self._debug and sock.gettimeout() != 0:
  523. raise ValueError("the socket must be non-blocking")
  524. try:
  525. return sock.sendto(data, address)
  526. except (BlockingIOError, InterruptedError):
  527. pass
  528. fut = self.create_future()
  529. fd = sock.fileno()
  530. self._ensure_fd_no_transport(fd)
  531. # use a trick with a list in closure to store a mutable state
  532. handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
  533. address)
  534. fut.add_done_callback(
  535. functools.partial(self._sock_write_done, fd, handle=handle))
  536. return await fut
  537. def _sock_sendto(self, fut, sock, data, address):
  538. if fut.done():
  539. # Future cancellation can be scheduled on previous loop iteration
  540. return
  541. try:
  542. n = sock.sendto(data, 0, address)
  543. except (BlockingIOError, InterruptedError):
  544. return
  545. except (SystemExit, KeyboardInterrupt):
  546. raise
  547. except BaseException as exc:
  548. fut.set_exception(exc)
  549. else:
  550. fut.set_result(n)
  551. async def sock_connect(self, sock, address):
  552. """Connect to a remote socket at address.
  553. This method is a coroutine.
  554. """
  555. base_events._check_ssl_socket(sock)
  556. if self._debug and sock.gettimeout() != 0:
  557. raise ValueError("the socket must be non-blocking")
  558. if sock.family == socket.AF_INET or (
  559. base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
  560. resolved = await self._ensure_resolved(
  561. address, family=sock.family, type=sock.type, proto=sock.proto,
  562. loop=self,
  563. )
  564. _, _, _, _, address = resolved[0]
  565. fut = self.create_future()
  566. self._sock_connect(fut, sock, address)
  567. try:
  568. return await fut
  569. finally:
  570. # Needed to break cycles when an exception occurs.
  571. fut = None
  572. def _sock_connect(self, fut, sock, address):
  573. fd = sock.fileno()
  574. try:
  575. sock.connect(address)
  576. except (BlockingIOError, InterruptedError):
  577. # Issue #23618: When the C function connect() fails with EINTR, the
  578. # connection runs in background. We have to wait until the socket
  579. # becomes writable to be notified when the connection succeed or
  580. # fails.
  581. self._ensure_fd_no_transport(fd)
  582. handle = self._add_writer(
  583. fd, self._sock_connect_cb, fut, sock, address)
  584. fut.add_done_callback(
  585. functools.partial(self._sock_write_done, fd, handle=handle))
  586. except (SystemExit, KeyboardInterrupt):
  587. raise
  588. except BaseException as exc:
  589. fut.set_exception(exc)
  590. else:
  591. fut.set_result(None)
  592. finally:
  593. fut = None
  594. def _sock_write_done(self, fd, fut, handle=None):
  595. if handle is None or not handle.cancelled():
  596. self.remove_writer(fd)
  597. def _sock_connect_cb(self, fut, sock, address):
  598. if fut.done():
  599. return
  600. try:
  601. err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  602. if err != 0:
  603. # Jump to any except clause below.
  604. raise OSError(err, f'Connect call failed {address}')
  605. except (BlockingIOError, InterruptedError):
  606. # socket is still registered, the callback will be retried later
  607. pass
  608. except (SystemExit, KeyboardInterrupt):
  609. raise
  610. except BaseException as exc:
  611. fut.set_exception(exc)
  612. else:
  613. fut.set_result(None)
  614. finally:
  615. fut = None
  616. async def sock_accept(self, sock):
  617. """Accept a connection.
  618. The socket must be bound to an address and listening for connections.
  619. The return value is a pair (conn, address) where conn is a new socket
  620. object usable to send and receive data on the connection, and address
  621. is the address bound to the socket on the other end of the connection.
  622. """
  623. base_events._check_ssl_socket(sock)
  624. if self._debug and sock.gettimeout() != 0:
  625. raise ValueError("the socket must be non-blocking")
  626. fut = self.create_future()
  627. self._sock_accept(fut, sock)
  628. return await fut
  629. def _sock_accept(self, fut, sock):
  630. fd = sock.fileno()
  631. try:
  632. conn, address = sock.accept()
  633. conn.setblocking(False)
  634. except (BlockingIOError, InterruptedError):
  635. self._ensure_fd_no_transport(fd)
  636. handle = self._add_reader(fd, self._sock_accept, fut, sock)
  637. fut.add_done_callback(
  638. functools.partial(self._sock_read_done, fd, handle=handle))
  639. except (SystemExit, KeyboardInterrupt):
  640. raise
  641. except BaseException as exc:
  642. fut.set_exception(exc)
  643. else:
  644. fut.set_result((conn, address))
  645. async def _sendfile_native(self, transp, file, offset, count):
  646. del self._transports[transp._sock_fd]
  647. resume_reading = transp.is_reading()
  648. transp.pause_reading()
  649. await transp._make_empty_waiter()
  650. try:
  651. return await self.sock_sendfile(transp._sock, file, offset, count,
  652. fallback=False)
  653. finally:
  654. transp._reset_empty_waiter()
  655. if resume_reading:
  656. transp.resume_reading()
  657. self._transports[transp._sock_fd] = transp
  658. def _process_events(self, event_list):
  659. for key, mask in event_list:
  660. fileobj, (reader, writer) = key.fileobj, key.data
  661. if mask & selectors.EVENT_READ and reader is not None:
  662. if reader._cancelled:
  663. self._remove_reader(fileobj)
  664. else:
  665. self._add_callback(reader)
  666. if mask & selectors.EVENT_WRITE and writer is not None:
  667. if writer._cancelled:
  668. self._remove_writer(fileobj)
  669. else:
  670. self._add_callback(writer)
  671. def _stop_serving(self, sock):
  672. self._remove_reader(sock.fileno())
  673. sock.close()
  674. class _SelectorTransport(transports._FlowControlMixin,
  675. transports.Transport):
  676. max_size = 256 * 1024 # Buffer size passed to recv().
  677. _buffer_factory = bytearray # Constructs initial value for self._buffer.
  678. # Attribute used in the destructor: it must be set even if the constructor
  679. # is not called (see _SelectorSslTransport which may start by raising an
  680. # exception)
  681. _sock = None
  682. def __init__(self, loop, sock, protocol, extra=None, server=None):
  683. super().__init__(extra, loop)
  684. self._extra['socket'] = trsock.TransportSocket(sock)
  685. try:
  686. self._extra['sockname'] = sock.getsockname()
  687. except OSError:
  688. self._extra['sockname'] = None
  689. if 'peername' not in self._extra:
  690. try:
  691. self._extra['peername'] = sock.getpeername()
  692. except socket.error:
  693. self._extra['peername'] = None
  694. self._sock = sock
  695. self._sock_fd = sock.fileno()
  696. self._protocol_connected = False
  697. self.set_protocol(protocol)
  698. self._server = server
  699. self._buffer = self._buffer_factory()
  700. self._conn_lost = 0 # Set when call to connection_lost scheduled.
  701. self._closing = False # Set when close() called.
  702. if self._server is not None:
  703. self._server._attach()
  704. loop._transports[self._sock_fd] = self
  705. def __repr__(self):
  706. info = [self.__class__.__name__]
  707. if self._sock is None:
  708. info.append('closed')
  709. elif self._closing:
  710. info.append('closing')
  711. info.append(f'fd={self._sock_fd}')
  712. # test if the transport was closed
  713. if self._loop is not None and not self._loop.is_closed():
  714. polling = _test_selector_event(self._loop._selector,
  715. self._sock_fd, selectors.EVENT_READ)
  716. if polling:
  717. info.append('read=polling')
  718. else:
  719. info.append('read=idle')
  720. polling = _test_selector_event(self._loop._selector,
  721. self._sock_fd,
  722. selectors.EVENT_WRITE)
  723. if polling:
  724. state = 'polling'
  725. else:
  726. state = 'idle'
  727. bufsize = self.get_write_buffer_size()
  728. info.append(f'write=<{state}, bufsize={bufsize}>')
  729. return '<{}>'.format(' '.join(info))
  730. def abort(self):
  731. self._force_close(None)
  732. def set_protocol(self, protocol):
  733. self._protocol = protocol
  734. self._protocol_connected = True
  735. def get_protocol(self):
  736. return self._protocol
  737. def is_closing(self):
  738. return self._closing
  739. def close(self):
  740. if self._closing:
  741. return
  742. self._closing = True
  743. self._loop._remove_reader(self._sock_fd)
  744. if not self._buffer:
  745. self._conn_lost += 1
  746. self._loop._remove_writer(self._sock_fd)
  747. self._loop.call_soon(self._call_connection_lost, None)
  748. def __del__(self, _warn=warnings.warn):
  749. if self._sock is not None:
  750. _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  751. self._sock.close()
  752. def _fatal_error(self, exc, message='Fatal error on transport'):
  753. # Should be called from exception handler only.
  754. if isinstance(exc, OSError):
  755. if self._loop.get_debug():
  756. logger.debug("%r: %s", self, message, exc_info=True)
  757. else:
  758. self._loop.call_exception_handler({
  759. 'message': message,
  760. 'exception': exc,
  761. 'transport': self,
  762. 'protocol': self._protocol,
  763. })
  764. self._force_close(exc)
  765. def _force_close(self, exc):
  766. if self._conn_lost:
  767. return
  768. if self._buffer:
  769. self._buffer.clear()
  770. self._loop._remove_writer(self._sock_fd)
  771. if not self._closing:
  772. self._closing = True
  773. self._loop._remove_reader(self._sock_fd)
  774. self._conn_lost += 1
  775. self._loop.call_soon(self._call_connection_lost, exc)
  776. def _call_connection_lost(self, exc):
  777. try:
  778. if self._protocol_connected:
  779. self._protocol.connection_lost(exc)
  780. finally:
  781. self._sock.close()
  782. self._sock = None
  783. self._protocol = None
  784. self._loop = None
  785. server = self._server
  786. if server is not None:
  787. server._detach()
  788. self._server = None
  789. def get_write_buffer_size(self):
  790. return len(self._buffer)
  791. def _add_reader(self, fd, callback, *args):
  792. if self._closing:
  793. return
  794. self._loop._add_reader(fd, callback, *args)
  795. class _SelectorSocketTransport(_SelectorTransport):
  796. _start_tls_compatible = True
  797. _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
  798. def __init__(self, loop, sock, protocol, waiter=None,
  799. extra=None, server=None):
  800. self._read_ready_cb = None
  801. super().__init__(loop, sock, protocol, extra, server)
  802. self._eof = False
  803. self._paused = False
  804. self._empty_waiter = None
  805. # Disable the Nagle algorithm -- small writes will be
  806. # sent without waiting for the TCP ACK. This generally
  807. # decreases the latency (in some cases significantly.)
  808. base_events._set_nodelay(self._sock)
  809. self._loop.call_soon(self._protocol.connection_made, self)
  810. # only start reading when connection_made() has been called
  811. self._loop.call_soon(self._add_reader,
  812. self._sock_fd, self._read_ready)
  813. if waiter is not None:
  814. # only wake up the waiter when connection_made() has been called
  815. self._loop.call_soon(futures._set_result_unless_cancelled,
  816. waiter, None)
  817. def set_protocol(self, protocol):
  818. if isinstance(protocol, protocols.BufferedProtocol):
  819. self._read_ready_cb = self._read_ready__get_buffer
  820. else:
  821. self._read_ready_cb = self._read_ready__data_received
  822. super().set_protocol(protocol)
  823. def is_reading(self):
  824. return not self._paused and not self._closing
  825. def pause_reading(self):
  826. if self._closing or self._paused:
  827. return
  828. self._paused = True
  829. self._loop._remove_reader(self._sock_fd)
  830. if self._loop.get_debug():
  831. logger.debug("%r pauses reading", self)
  832. def resume_reading(self):
  833. if self._closing or not self._paused:
  834. return
  835. self._paused = False
  836. self._add_reader(self._sock_fd, self._read_ready)
  837. if self._loop.get_debug():
  838. logger.debug("%r resumes reading", self)
  839. def _read_ready(self):
  840. self._read_ready_cb()
  841. def _read_ready__get_buffer(self):
  842. if self._conn_lost:
  843. return
  844. try:
  845. buf = self._protocol.get_buffer(-1)
  846. if not len(buf):
  847. raise RuntimeError('get_buffer() returned an empty buffer')
  848. except (SystemExit, KeyboardInterrupt):
  849. raise
  850. except BaseException as exc:
  851. self._fatal_error(
  852. exc, 'Fatal error: protocol.get_buffer() call failed.')
  853. return
  854. try:
  855. nbytes = self._sock.recv_into(buf)
  856. except (BlockingIOError, InterruptedError):
  857. return
  858. except (SystemExit, KeyboardInterrupt):
  859. raise
  860. except BaseException as exc:
  861. self._fatal_error(exc, 'Fatal read error on socket transport')
  862. return
  863. if not nbytes:
  864. self._read_ready__on_eof()
  865. return
  866. try:
  867. self._protocol.buffer_updated(nbytes)
  868. except (SystemExit, KeyboardInterrupt):
  869. raise
  870. except BaseException as exc:
  871. self._fatal_error(
  872. exc, 'Fatal error: protocol.buffer_updated() call failed.')
  873. def _read_ready__data_received(self):
  874. if self._conn_lost:
  875. return
  876. try:
  877. data = self._sock.recv(self.max_size)
  878. except (BlockingIOError, InterruptedError):
  879. return
  880. except (SystemExit, KeyboardInterrupt):
  881. raise
  882. except BaseException as exc:
  883. self._fatal_error(exc, 'Fatal read error on socket transport')
  884. return
  885. if not data:
  886. self._read_ready__on_eof()
  887. return
  888. try:
  889. self._protocol.data_received(data)
  890. except (SystemExit, KeyboardInterrupt):
  891. raise
  892. except BaseException as exc:
  893. self._fatal_error(
  894. exc, 'Fatal error: protocol.data_received() call failed.')
  895. def _read_ready__on_eof(self):
  896. if self._loop.get_debug():
  897. logger.debug("%r received EOF", self)
  898. try:
  899. keep_open = self._protocol.eof_received()
  900. except (SystemExit, KeyboardInterrupt):
  901. raise
  902. except BaseException as exc:
  903. self._fatal_error(
  904. exc, 'Fatal error: protocol.eof_received() call failed.')
  905. return
  906. if keep_open:
  907. # We're keeping the connection open so the
  908. # protocol can write more, but we still can't
  909. # receive more, so remove the reader callback.
  910. self._loop._remove_reader(self._sock_fd)
  911. else:
  912. self.close()
  913. def write(self, data):
  914. if not isinstance(data, (bytes, bytearray, memoryview)):
  915. raise TypeError(f'data argument must be a bytes-like object, '
  916. f'not {type(data).__name__!r}')
  917. if self._eof:
  918. raise RuntimeError('Cannot call write() after write_eof()')
  919. if self._empty_waiter is not None:
  920. raise RuntimeError('unable to write; sendfile is in progress')
  921. if not data:
  922. return
  923. if self._conn_lost:
  924. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  925. logger.warning('socket.send() raised exception.')
  926. self._conn_lost += 1
  927. return
  928. if not self._buffer:
  929. # Optimization: try to send now.
  930. try:
  931. n = self._sock.send(data)
  932. except (BlockingIOError, InterruptedError):
  933. pass
  934. except (SystemExit, KeyboardInterrupt):
  935. raise
  936. except BaseException as exc:
  937. self._fatal_error(exc, 'Fatal write error on socket transport')
  938. return
  939. else:
  940. data = data[n:]
  941. if not data:
  942. return
  943. # Not all was written; register write handler.
  944. self._loop._add_writer(self._sock_fd, self._write_ready)
  945. # Add it to the buffer.
  946. self._buffer.extend(data)
  947. self._maybe_pause_protocol()
  948. def _write_ready(self):
  949. assert self._buffer, 'Data should not be empty'
  950. if self._conn_lost:
  951. return
  952. try:
  953. n = self._sock.send(self._buffer)
  954. except (BlockingIOError, InterruptedError):
  955. pass
  956. except (SystemExit, KeyboardInterrupt):
  957. raise
  958. except BaseException as exc:
  959. self._loop._remove_writer(self._sock_fd)
  960. self._buffer.clear()
  961. self._fatal_error(exc, 'Fatal write error on socket transport')
  962. if self._empty_waiter is not None:
  963. self._empty_waiter.set_exception(exc)
  964. else:
  965. if n:
  966. del self._buffer[:n]
  967. self._maybe_resume_protocol() # May append to buffer.
  968. if not self._buffer:
  969. self._loop._remove_writer(self._sock_fd)
  970. if self._empty_waiter is not None:
  971. self._empty_waiter.set_result(None)
  972. if self._closing:
  973. self._call_connection_lost(None)
  974. elif self._eof:
  975. self._sock.shutdown(socket.SHUT_WR)
  976. def write_eof(self):
  977. if self._closing or self._eof:
  978. return
  979. self._eof = True
  980. if not self._buffer:
  981. self._sock.shutdown(socket.SHUT_WR)
  982. def can_write_eof(self):
  983. return True
  984. def _call_connection_lost(self, exc):
  985. super()._call_connection_lost(exc)
  986. if self._empty_waiter is not None:
  987. self._empty_waiter.set_exception(
  988. ConnectionError("Connection is closed by peer"))
  989. def _make_empty_waiter(self):
  990. if self._empty_waiter is not None:
  991. raise RuntimeError("Empty waiter is already set")
  992. self._empty_waiter = self._loop.create_future()
  993. if not self._buffer:
  994. self._empty_waiter.set_result(None)
  995. return self._empty_waiter
  996. def _reset_empty_waiter(self):
  997. self._empty_waiter = None
  998. class _SelectorDatagramTransport(_SelectorTransport):
  999. _buffer_factory = collections.deque
  1000. def __init__(self, loop, sock, protocol, address=None,
  1001. waiter=None, extra=None):
  1002. super().__init__(loop, sock, protocol, extra)
  1003. self._address = address
  1004. self._buffer_size = 0
  1005. self._loop.call_soon(self._protocol.connection_made, self)
  1006. # only start reading when connection_made() has been called
  1007. self._loop.call_soon(self._add_reader,
  1008. self._sock_fd, self._read_ready)
  1009. if waiter is not None:
  1010. # only wake up the waiter when connection_made() has been called
  1011. self._loop.call_soon(futures._set_result_unless_cancelled,
  1012. waiter, None)
  1013. def get_write_buffer_size(self):
  1014. return self._buffer_size
  1015. def _read_ready(self):
  1016. if self._conn_lost:
  1017. return
  1018. try:
  1019. data, addr = self._sock.recvfrom(self.max_size)
  1020. except (BlockingIOError, InterruptedError):
  1021. pass
  1022. except OSError as exc:
  1023. self._protocol.error_received(exc)
  1024. except (SystemExit, KeyboardInterrupt):
  1025. raise
  1026. except BaseException as exc:
  1027. self._fatal_error(exc, 'Fatal read error on datagram transport')
  1028. else:
  1029. self._protocol.datagram_received(data, addr)
  1030. def sendto(self, data, addr=None):
  1031. if not isinstance(data, (bytes, bytearray, memoryview)):
  1032. raise TypeError(f'data argument must be a bytes-like object, '
  1033. f'not {type(data).__name__!r}')
  1034. if not data:
  1035. return
  1036. if self._address:
  1037. if addr not in (None, self._address):
  1038. raise ValueError(
  1039. f'Invalid address: must be None or {self._address}')
  1040. addr = self._address
  1041. if self._conn_lost and self._address:
  1042. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  1043. logger.warning('socket.send() raised exception.')
  1044. self._conn_lost += 1
  1045. return
  1046. if not self._buffer:
  1047. # Attempt to send it right away first.
  1048. try:
  1049. if self._extra['peername']:
  1050. self._sock.send(data)
  1051. else:
  1052. self._sock.sendto(data, addr)
  1053. return
  1054. except (BlockingIOError, InterruptedError):
  1055. self._loop._add_writer(self._sock_fd, self._sendto_ready)
  1056. except OSError as exc:
  1057. self._protocol.error_received(exc)
  1058. return
  1059. except (SystemExit, KeyboardInterrupt):
  1060. raise
  1061. except BaseException as exc:
  1062. self._fatal_error(
  1063. exc, 'Fatal write error on datagram transport')
  1064. return
  1065. # Ensure that what we buffer is immutable.
  1066. self._buffer.append((bytes(data), addr))
  1067. self._buffer_size += len(data)
  1068. self._maybe_pause_protocol()
  1069. def _sendto_ready(self):
  1070. while self._buffer:
  1071. data, addr = self._buffer.popleft()
  1072. self._buffer_size -= len(data)
  1073. try:
  1074. if self._extra['peername']:
  1075. self._sock.send(data)
  1076. else:
  1077. self._sock.sendto(data, addr)
  1078. except (BlockingIOError, InterruptedError):
  1079. self._buffer.appendleft((data, addr)) # Try again later.
  1080. self._buffer_size += len(data)
  1081. break
  1082. except OSError as exc:
  1083. self._protocol.error_received(exc)
  1084. return
  1085. except (SystemExit, KeyboardInterrupt):
  1086. raise
  1087. except BaseException as exc:
  1088. self._fatal_error(
  1089. exc, 'Fatal write error on datagram transport')
  1090. return
  1091. self._maybe_resume_protocol() # May append to buffer.
  1092. if not self._buffer:
  1093. self._loop._remove_writer(self._sock_fd)
  1094. if self._closing:
  1095. self._call_connection_lost(None)