test_poll.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. # Test case for the os.poll() function
  2. import os
  3. import subprocess
  4. import random
  5. import select
  6. import threading
  7. import time
  8. import unittest
  9. from test.support import (
  10. cpython_only, requires_subprocess, requires_working_socket
  11. )
  12. from test.support import threading_helper
  13. from test.support.os_helper import TESTFN
  14. try:
  15. select.poll
  16. except AttributeError:
  17. raise unittest.SkipTest("select.poll not defined")
  18. requires_working_socket(module=True)
  19. def find_ready_matching(ready, flag):
  20. match = []
  21. for fd, mode in ready:
  22. if mode & flag:
  23. match.append(fd)
  24. return match
  25. class PollTests(unittest.TestCase):
  26. def test_poll1(self):
  27. # Basic functional test of poll object
  28. # Create a bunch of pipe and test that poll works with them.
  29. p = select.poll()
  30. NUM_PIPES = 12
  31. MSG = b" This is a test."
  32. MSG_LEN = len(MSG)
  33. readers = []
  34. writers = []
  35. r2w = {}
  36. w2r = {}
  37. for i in range(NUM_PIPES):
  38. rd, wr = os.pipe()
  39. p.register(rd)
  40. p.modify(rd, select.POLLIN)
  41. p.register(wr, select.POLLOUT)
  42. readers.append(rd)
  43. writers.append(wr)
  44. r2w[rd] = wr
  45. w2r[wr] = rd
  46. bufs = []
  47. while writers:
  48. ready = p.poll()
  49. ready_writers = find_ready_matching(ready, select.POLLOUT)
  50. if not ready_writers:
  51. raise RuntimeError("no pipes ready for writing")
  52. wr = random.choice(ready_writers)
  53. os.write(wr, MSG)
  54. ready = p.poll()
  55. ready_readers = find_ready_matching(ready, select.POLLIN)
  56. if not ready_readers:
  57. raise RuntimeError("no pipes ready for reading")
  58. rd = random.choice(ready_readers)
  59. buf = os.read(rd, MSG_LEN)
  60. self.assertEqual(len(buf), MSG_LEN)
  61. bufs.append(buf)
  62. os.close(r2w[rd]) ; os.close( rd )
  63. p.unregister( r2w[rd] )
  64. p.unregister( rd )
  65. writers.remove(r2w[rd])
  66. self.assertEqual(bufs, [MSG] * NUM_PIPES)
  67. def test_poll_unit_tests(self):
  68. # returns NVAL for invalid file descriptor
  69. FD, w = os.pipe()
  70. os.close(FD)
  71. os.close(w)
  72. p = select.poll()
  73. p.register(FD)
  74. r = p.poll()
  75. self.assertEqual(r[0], (FD, select.POLLNVAL))
  76. with open(TESTFN, 'w') as f:
  77. fd = f.fileno()
  78. p = select.poll()
  79. p.register(f)
  80. r = p.poll()
  81. self.assertEqual(r[0][0], fd)
  82. r = p.poll()
  83. self.assertEqual(r[0], (fd, select.POLLNVAL))
  84. os.unlink(TESTFN)
  85. # type error for invalid arguments
  86. p = select.poll()
  87. self.assertRaises(TypeError, p.register, p)
  88. self.assertRaises(TypeError, p.unregister, p)
  89. # can't unregister non-existent object
  90. p = select.poll()
  91. self.assertRaises(KeyError, p.unregister, 3)
  92. # Test error cases
  93. pollster = select.poll()
  94. class Nope:
  95. pass
  96. class Almost:
  97. def fileno(self):
  98. return 'fileno'
  99. self.assertRaises(TypeError, pollster.register, Nope(), 0)
  100. self.assertRaises(TypeError, pollster.register, Almost(), 0)
  101. # Another test case for poll(). This is copied from the test case for
  102. # select(), modified to use poll() instead.
  103. @requires_subprocess()
  104. def test_poll2(self):
  105. cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
  106. proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
  107. bufsize=0)
  108. self.enterContext(proc)
  109. p = proc.stdout
  110. pollster = select.poll()
  111. pollster.register( p, select.POLLIN )
  112. for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
  113. fdlist = pollster.poll(tout)
  114. if (fdlist == []):
  115. continue
  116. fd, flags = fdlist[0]
  117. if flags & select.POLLHUP:
  118. line = p.readline()
  119. if line != b"":
  120. self.fail('error: pipe seems to be closed, but still returns data')
  121. continue
  122. elif flags & select.POLLIN:
  123. line = p.readline()
  124. if not line:
  125. break
  126. self.assertEqual(line, b'testing...\n')
  127. continue
  128. else:
  129. self.fail('Unexpected return value from select.poll: %s' % fdlist)
  130. def test_poll3(self):
  131. # test int overflow
  132. pollster = select.poll()
  133. pollster.register(1)
  134. self.assertRaises(OverflowError, pollster.poll, 1 << 64)
  135. x = 2 + 3
  136. if x != 5:
  137. self.fail('Overflow must have occurred')
  138. # Issues #15989, #17919
  139. self.assertRaises(ValueError, pollster.register, 0, -1)
  140. self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
  141. self.assertRaises(ValueError, pollster.modify, 1, -1)
  142. self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
  143. @cpython_only
  144. def test_poll_c_limits(self):
  145. from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
  146. pollster = select.poll()
  147. pollster.register(1)
  148. # Issues #15989, #17919
  149. self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
  150. self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
  151. self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
  152. self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
  153. @threading_helper.reap_threads
  154. def test_threaded_poll(self):
  155. r, w = os.pipe()
  156. self.addCleanup(os.close, r)
  157. self.addCleanup(os.close, w)
  158. rfds = []
  159. for i in range(10):
  160. fd = os.dup(r)
  161. self.addCleanup(os.close, fd)
  162. rfds.append(fd)
  163. pollster = select.poll()
  164. for fd in rfds:
  165. pollster.register(fd, select.POLLIN)
  166. t = threading.Thread(target=pollster.poll)
  167. t.start()
  168. try:
  169. time.sleep(0.5)
  170. # trigger ufds array reallocation
  171. for fd in rfds:
  172. pollster.unregister(fd)
  173. pollster.register(w, select.POLLOUT)
  174. self.assertRaises(RuntimeError, pollster.poll)
  175. finally:
  176. # and make the call to poll() from the thread return
  177. os.write(w, b'spam')
  178. t.join()
  179. @unittest.skipUnless(threading, 'Threading required for this test.')
  180. @threading_helper.reap_threads
  181. def test_poll_blocks_with_negative_ms(self):
  182. for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
  183. # Create two file descriptors. This will be used to unlock
  184. # the blocking call to poll.poll inside the thread
  185. r, w = os.pipe()
  186. pollster = select.poll()
  187. pollster.register(r, select.POLLIN)
  188. poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
  189. poll_thread.start()
  190. poll_thread.join(timeout=0.1)
  191. self.assertTrue(poll_thread.is_alive())
  192. # Write to the pipe so pollster.poll unblocks and the thread ends.
  193. os.write(w, b'spam')
  194. poll_thread.join()
  195. self.assertFalse(poll_thread.is_alive())
  196. os.close(r)
  197. os.close(w)
  198. if __name__ == '__main__':
  199. unittest.main()