| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582 |
- import errno
- import os
- import random
- import selectors
- import signal
- import socket
- import sys
- from test import support
- from test.support import os_helper
- from test.support import socket_helper
- from time import sleep
- import unittest
- import unittest.mock
- import tempfile
- from time import monotonic as time
- try:
- import resource
- except ImportError:
- resource = None
- if support.is_emscripten or support.is_wasi:
- raise unittest.SkipTest("Cannot create socketpair on Emscripten/WASI.")
- if hasattr(socket, 'socketpair'):
- socketpair = socket.socketpair
- else:
- def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
- with socket.socket(family, type, proto) as l:
- l.bind((socket_helper.HOST, 0))
- l.listen()
- c = socket.socket(family, type, proto)
- try:
- c.connect(l.getsockname())
- caddr = c.getsockname()
- while True:
- a, addr = l.accept()
- # check that we've got the correct client
- if addr == caddr:
- return c, a
- a.close()
- except OSError:
- c.close()
- raise
- def find_ready_matching(ready, flag):
- match = []
- for key, events in ready:
- if events & flag:
- match.append(key.fileobj)
- return match
- class BaseSelectorTestCase:
- def make_socketpair(self):
- rd, wr = socketpair()
- self.addCleanup(rd.close)
- self.addCleanup(wr.close)
- return rd, wr
- def test_register(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- key = s.register(rd, selectors.EVENT_READ, "data")
- self.assertIsInstance(key, selectors.SelectorKey)
- self.assertEqual(key.fileobj, rd)
- self.assertEqual(key.fd, rd.fileno())
- self.assertEqual(key.events, selectors.EVENT_READ)
- self.assertEqual(key.data, "data")
- # register an unknown event
- self.assertRaises(ValueError, s.register, 0, 999999)
- # register an invalid FD
- self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
- # register twice
- self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
- # register the same FD, but with a different object
- self.assertRaises(KeyError, s.register, rd.fileno(),
- selectors.EVENT_READ)
- def test_unregister(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- s.register(rd, selectors.EVENT_READ)
- s.unregister(rd)
- # unregister an unknown file obj
- self.assertRaises(KeyError, s.unregister, 999999)
- # unregister twice
- self.assertRaises(KeyError, s.unregister, rd)
- def test_unregister_after_fd_close(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- r, w = rd.fileno(), wr.fileno()
- s.register(r, selectors.EVENT_READ)
- s.register(w, selectors.EVENT_WRITE)
- rd.close()
- wr.close()
- s.unregister(r)
- s.unregister(w)
- @unittest.skipUnless(os.name == 'posix', "requires posix")
- def test_unregister_after_fd_close_and_reuse(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- r, w = rd.fileno(), wr.fileno()
- s.register(r, selectors.EVENT_READ)
- s.register(w, selectors.EVENT_WRITE)
- rd2, wr2 = self.make_socketpair()
- rd.close()
- wr.close()
- os.dup2(rd2.fileno(), r)
- os.dup2(wr2.fileno(), w)
- self.addCleanup(os.close, r)
- self.addCleanup(os.close, w)
- s.unregister(r)
- s.unregister(w)
- def test_unregister_after_socket_close(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- s.register(rd, selectors.EVENT_READ)
- s.register(wr, selectors.EVENT_WRITE)
- rd.close()
- wr.close()
- s.unregister(rd)
- s.unregister(wr)
- def test_modify(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- key = s.register(rd, selectors.EVENT_READ)
- # modify events
- key2 = s.modify(rd, selectors.EVENT_WRITE)
- self.assertNotEqual(key.events, key2.events)
- self.assertEqual(key2, s.get_key(rd))
- s.unregister(rd)
- # modify data
- d1 = object()
- d2 = object()
- key = s.register(rd, selectors.EVENT_READ, d1)
- key2 = s.modify(rd, selectors.EVENT_READ, d2)
- self.assertEqual(key.events, key2.events)
- self.assertNotEqual(key.data, key2.data)
- self.assertEqual(key2, s.get_key(rd))
- self.assertEqual(key2.data, d2)
- # modify unknown file obj
- self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
- # modify use a shortcut
- d3 = object()
- s.register = unittest.mock.Mock()
- s.unregister = unittest.mock.Mock()
- s.modify(rd, selectors.EVENT_READ, d3)
- self.assertFalse(s.register.called)
- self.assertFalse(s.unregister.called)
- def test_modify_unregister(self):
- # Make sure the fd is unregister()ed in case of error on
- # modify(): http://bugs.python.org/issue30014
- if self.SELECTOR.__name__ == 'EpollSelector':
- patch = unittest.mock.patch(
- 'selectors.EpollSelector._selector_cls')
- elif self.SELECTOR.__name__ == 'PollSelector':
- patch = unittest.mock.patch(
- 'selectors.PollSelector._selector_cls')
- elif self.SELECTOR.__name__ == 'DevpollSelector':
- patch = unittest.mock.patch(
- 'selectors.DevpollSelector._selector_cls')
- else:
- raise self.skipTest("")
- with patch as m:
- m.return_value.modify = unittest.mock.Mock(
- side_effect=ZeroDivisionError)
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- s.register(rd, selectors.EVENT_READ)
- self.assertEqual(len(s._map), 1)
- with self.assertRaises(ZeroDivisionError):
- s.modify(rd, selectors.EVENT_WRITE)
- self.assertEqual(len(s._map), 0)
- def test_close(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- mapping = s.get_map()
- rd, wr = self.make_socketpair()
- s.register(rd, selectors.EVENT_READ)
- s.register(wr, selectors.EVENT_WRITE)
- s.close()
- self.assertRaises(RuntimeError, s.get_key, rd)
- self.assertRaises(RuntimeError, s.get_key, wr)
- self.assertRaises(KeyError, mapping.__getitem__, rd)
- self.assertRaises(KeyError, mapping.__getitem__, wr)
- def test_get_key(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- key = s.register(rd, selectors.EVENT_READ, "data")
- self.assertEqual(key, s.get_key(rd))
- # unknown file obj
- self.assertRaises(KeyError, s.get_key, 999999)
- def test_get_map(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- keys = s.get_map()
- self.assertFalse(keys)
- self.assertEqual(len(keys), 0)
- self.assertEqual(list(keys), [])
- key = s.register(rd, selectors.EVENT_READ, "data")
- self.assertIn(rd, keys)
- self.assertEqual(key, keys[rd])
- self.assertEqual(len(keys), 1)
- self.assertEqual(list(keys), [rd.fileno()])
- self.assertEqual(list(keys.values()), [key])
- # unknown file obj
- with self.assertRaises(KeyError):
- keys[999999]
- # Read-only mapping
- with self.assertRaises(TypeError):
- del keys[rd]
- def test_select(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- s.register(rd, selectors.EVENT_READ)
- wr_key = s.register(wr, selectors.EVENT_WRITE)
- result = s.select()
- for key, events in result:
- self.assertTrue(isinstance(key, selectors.SelectorKey))
- self.assertTrue(events)
- self.assertFalse(events & ~(selectors.EVENT_READ |
- selectors.EVENT_WRITE))
- self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
- def test_context_manager(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- with s as sel:
- sel.register(rd, selectors.EVENT_READ)
- sel.register(wr, selectors.EVENT_WRITE)
- self.assertRaises(RuntimeError, s.get_key, rd)
- self.assertRaises(RuntimeError, s.get_key, wr)
- def test_fileno(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- if hasattr(s, 'fileno'):
- fd = s.fileno()
- self.assertTrue(isinstance(fd, int))
- self.assertGreaterEqual(fd, 0)
- def test_selector(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- NUM_SOCKETS = 12
- MSG = b" This is a test."
- MSG_LEN = len(MSG)
- readers = []
- writers = []
- r2w = {}
- w2r = {}
- for i in range(NUM_SOCKETS):
- rd, wr = self.make_socketpair()
- s.register(rd, selectors.EVENT_READ)
- s.register(wr, selectors.EVENT_WRITE)
- readers.append(rd)
- writers.append(wr)
- r2w[rd] = wr
- w2r[wr] = rd
- bufs = []
- while writers:
- ready = s.select()
- ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
- if not ready_writers:
- self.fail("no sockets ready for writing")
- wr = random.choice(ready_writers)
- wr.send(MSG)
- for i in range(10):
- ready = s.select()
- ready_readers = find_ready_matching(ready,
- selectors.EVENT_READ)
- if ready_readers:
- break
- # there might be a delay between the write to the write end and
- # the read end is reported ready
- sleep(0.1)
- else:
- self.fail("no sockets ready for reading")
- self.assertEqual([w2r[wr]], ready_readers)
- rd = ready_readers[0]
- buf = rd.recv(MSG_LEN)
- self.assertEqual(len(buf), MSG_LEN)
- bufs.append(buf)
- s.unregister(r2w[rd])
- s.unregister(rd)
- writers.remove(r2w[rd])
- self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
- @unittest.skipIf(sys.platform == 'win32',
- 'select.select() cannot be used with empty fd sets')
- def test_empty_select(self):
- # Issue #23009: Make sure EpollSelector.select() works when no FD is
- # registered.
- s = self.SELECTOR()
- self.addCleanup(s.close)
- self.assertEqual(s.select(timeout=0), [])
- def test_timeout(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- s.register(wr, selectors.EVENT_WRITE)
- t = time()
- self.assertEqual(1, len(s.select(0)))
- self.assertEqual(1, len(s.select(-1)))
- self.assertLess(time() - t, 0.5)
- s.unregister(wr)
- s.register(rd, selectors.EVENT_READ)
- t = time()
- self.assertFalse(s.select(0))
- self.assertFalse(s.select(-1))
- self.assertLess(time() - t, 0.5)
- t0 = time()
- self.assertFalse(s.select(1))
- t1 = time()
- dt = t1 - t0
- # Tolerate 2.0 seconds for very slow buildbots
- self.assertTrue(0.8 <= dt <= 2.0, dt)
- @unittest.skipUnless(hasattr(signal, "alarm"),
- "signal.alarm() required for this test")
- def test_select_interrupt_exc(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- class InterruptSelect(Exception):
- pass
- def handler(*args):
- raise InterruptSelect
- orig_alrm_handler = signal.signal(signal.SIGALRM, handler)
- self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
- try:
- signal.alarm(1)
- s.register(rd, selectors.EVENT_READ)
- t = time()
- # select() is interrupted by a signal which raises an exception
- with self.assertRaises(InterruptSelect):
- s.select(30)
- # select() was interrupted before the timeout of 30 seconds
- self.assertLess(time() - t, 5.0)
- finally:
- signal.alarm(0)
- @unittest.skipUnless(hasattr(signal, "alarm"),
- "signal.alarm() required for this test")
- def test_select_interrupt_noraise(self):
- s = self.SELECTOR()
- self.addCleanup(s.close)
- rd, wr = self.make_socketpair()
- orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
- self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
- try:
- signal.alarm(1)
- s.register(rd, selectors.EVENT_READ)
- t = time()
- # select() is interrupted by a signal, but the signal handler doesn't
- # raise an exception, so select() should by retries with a recomputed
- # timeout
- self.assertFalse(s.select(1.5))
- self.assertGreaterEqual(time() - t, 1.0)
- finally:
- signal.alarm(0)
- class ScalableSelectorMixIn:
- # see issue #18963 for why it's skipped on older OS X versions
- @support.requires_mac_ver(10, 5)
- @unittest.skipUnless(resource, "Test needs resource module")
- def test_above_fd_setsize(self):
- # A scalable implementation should have no problem with more than
- # FD_SETSIZE file descriptors. Since we don't know the value, we just
- # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
- soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
- try:
- resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
- self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
- (soft, hard))
- NUM_FDS = min(hard, 2**16)
- except (OSError, ValueError):
- NUM_FDS = soft
- # guard for already allocated FDs (stdin, stdout...)
- NUM_FDS -= 32
- s = self.SELECTOR()
- self.addCleanup(s.close)
- for i in range(NUM_FDS // 2):
- try:
- rd, wr = self.make_socketpair()
- except OSError:
- # too many FDs, skip - note that we should only catch EMFILE
- # here, but apparently *BSD and Solaris can fail upon connect()
- # or bind() with EADDRNOTAVAIL, so let's be safe
- self.skipTest("FD limit reached")
- try:
- s.register(rd, selectors.EVENT_READ)
- s.register(wr, selectors.EVENT_WRITE)
- except OSError as e:
- if e.errno == errno.ENOSPC:
- # this can be raised by epoll if we go over
- # fs.epoll.max_user_watches sysctl
- self.skipTest("FD limit reached")
- raise
- try:
- fds = s.select()
- except OSError as e:
- if e.errno == errno.EINVAL and sys.platform == 'darwin':
- # unexplainable errors on macOS don't need to fail the test
- self.skipTest("Invalid argument error calling poll()")
- raise
- self.assertEqual(NUM_FDS // 2, len(fds))
- class DefaultSelectorTestCase(BaseSelectorTestCase, unittest.TestCase):
- SELECTOR = selectors.DefaultSelector
- class SelectSelectorTestCase(BaseSelectorTestCase, unittest.TestCase):
- SELECTOR = selectors.SelectSelector
- @unittest.skipUnless(hasattr(selectors, 'PollSelector'),
- "Test needs selectors.PollSelector")
- class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn,
- unittest.TestCase):
- SELECTOR = getattr(selectors, 'PollSelector', None)
- @unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
- "Test needs selectors.EpollSelector")
- class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn,
- unittest.TestCase):
- SELECTOR = getattr(selectors, 'EpollSelector', None)
- def test_register_file(self):
- # epoll(7) returns EPERM when given a file to watch
- s = self.SELECTOR()
- with tempfile.NamedTemporaryFile() as f:
- with self.assertRaises(IOError):
- s.register(f, selectors.EVENT_READ)
- # the SelectorKey has been removed
- with self.assertRaises(KeyError):
- s.get_key(f)
- @unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
- "Test needs selectors.KqueueSelector)")
- class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn,
- unittest.TestCase):
- SELECTOR = getattr(selectors, 'KqueueSelector', None)
- def test_register_bad_fd(self):
- # a file descriptor that's been closed should raise an OSError
- # with EBADF
- s = self.SELECTOR()
- bad_f = os_helper.make_bad_fd()
- with self.assertRaises(OSError) as cm:
- s.register(bad_f, selectors.EVENT_READ)
- self.assertEqual(cm.exception.errno, errno.EBADF)
- # the SelectorKey has been removed
- with self.assertRaises(KeyError):
- s.get_key(bad_f)
- def test_empty_select_timeout(self):
- # Issues #23009, #29255: Make sure timeout is applied when no fds
- # are registered.
- s = self.SELECTOR()
- self.addCleanup(s.close)
- t0 = time()
- self.assertEqual(s.select(1), [])
- t1 = time()
- dt = t1 - t0
- # Tolerate 2.0 seconds for very slow buildbots
- self.assertTrue(0.8 <= dt <= 2.0, dt)
- @unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),
- "Test needs selectors.DevpollSelector")
- class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn,
- unittest.TestCase):
- SELECTOR = getattr(selectors, 'DevpollSelector', None)
- def tearDownModule():
- support.reap_children()
- if __name__ == "__main__":
- unittest.main()
|