test_kqueue.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. """
  2. Tests for kqueue wrapper.
  3. """
  4. import errno
  5. import os
  6. import select
  7. import socket
  8. import time
  9. import unittest
  10. if not hasattr(select, "kqueue"):
  11. raise unittest.SkipTest("test works only on BSD")
  12. class TestKQueue(unittest.TestCase):
  13. def test_create_queue(self):
  14. kq = select.kqueue()
  15. self.assertTrue(kq.fileno() > 0, kq.fileno())
  16. self.assertTrue(not kq.closed)
  17. kq.close()
  18. self.assertTrue(kq.closed)
  19. self.assertRaises(ValueError, kq.fileno)
  20. def test_create_event(self):
  21. from operator import lt, le, gt, ge
  22. fd = os.open(os.devnull, os.O_WRONLY)
  23. self.addCleanup(os.close, fd)
  24. ev = select.kevent(fd)
  25. other = select.kevent(1000)
  26. self.assertEqual(ev.ident, fd)
  27. self.assertEqual(ev.filter, select.KQ_FILTER_READ)
  28. self.assertEqual(ev.flags, select.KQ_EV_ADD)
  29. self.assertEqual(ev.fflags, 0)
  30. self.assertEqual(ev.data, 0)
  31. self.assertEqual(ev.udata, 0)
  32. self.assertEqual(ev, ev)
  33. self.assertNotEqual(ev, other)
  34. self.assertTrue(ev < other)
  35. self.assertTrue(other >= ev)
  36. for op in lt, le, gt, ge:
  37. self.assertRaises(TypeError, op, ev, None)
  38. self.assertRaises(TypeError, op, ev, 1)
  39. self.assertRaises(TypeError, op, ev, "ev")
  40. ev = select.kevent(fd, select.KQ_FILTER_WRITE)
  41. self.assertEqual(ev.ident, fd)
  42. self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
  43. self.assertEqual(ev.flags, select.KQ_EV_ADD)
  44. self.assertEqual(ev.fflags, 0)
  45. self.assertEqual(ev.data, 0)
  46. self.assertEqual(ev.udata, 0)
  47. self.assertEqual(ev, ev)
  48. self.assertNotEqual(ev, other)
  49. ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
  50. self.assertEqual(ev.ident, fd)
  51. self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
  52. self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
  53. self.assertEqual(ev.fflags, 0)
  54. self.assertEqual(ev.data, 0)
  55. self.assertEqual(ev.udata, 0)
  56. self.assertEqual(ev, ev)
  57. self.assertNotEqual(ev, other)
  58. ev = select.kevent(1, 2, 3, 4, 5, 6)
  59. self.assertEqual(ev.ident, 1)
  60. self.assertEqual(ev.filter, 2)
  61. self.assertEqual(ev.flags, 3)
  62. self.assertEqual(ev.fflags, 4)
  63. self.assertEqual(ev.data, 5)
  64. self.assertEqual(ev.udata, 6)
  65. self.assertEqual(ev, ev)
  66. self.assertNotEqual(ev, other)
  67. bignum = 0x7fff
  68. ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
  69. self.assertEqual(ev.ident, bignum)
  70. self.assertEqual(ev.filter, 1)
  71. self.assertEqual(ev.flags, 2)
  72. self.assertEqual(ev.fflags, 3)
  73. self.assertEqual(ev.data, bignum - 1)
  74. self.assertEqual(ev.udata, bignum)
  75. self.assertEqual(ev, ev)
  76. self.assertNotEqual(ev, other)
  77. # Issue 11973
  78. bignum = 0xffff
  79. ev = select.kevent(0, 1, bignum)
  80. self.assertEqual(ev.ident, 0)
  81. self.assertEqual(ev.filter, 1)
  82. self.assertEqual(ev.flags, bignum)
  83. self.assertEqual(ev.fflags, 0)
  84. self.assertEqual(ev.data, 0)
  85. self.assertEqual(ev.udata, 0)
  86. self.assertEqual(ev, ev)
  87. self.assertNotEqual(ev, other)
  88. # Issue 11973
  89. bignum = 0xffffffff
  90. ev = select.kevent(0, 1, 2, bignum)
  91. self.assertEqual(ev.ident, 0)
  92. self.assertEqual(ev.filter, 1)
  93. self.assertEqual(ev.flags, 2)
  94. self.assertEqual(ev.fflags, bignum)
  95. self.assertEqual(ev.data, 0)
  96. self.assertEqual(ev.udata, 0)
  97. self.assertEqual(ev, ev)
  98. self.assertNotEqual(ev, other)
  99. def test_queue_event(self):
  100. serverSocket = socket.create_server(('127.0.0.1', 0))
  101. client = socket.socket()
  102. client.setblocking(False)
  103. try:
  104. client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
  105. except OSError as e:
  106. self.assertEqual(e.args[0], errno.EINPROGRESS)
  107. else:
  108. #raise AssertionError("Connect should have raised EINPROGRESS")
  109. pass # FreeBSD doesn't raise an exception here
  110. server, addr = serverSocket.accept()
  111. kq = select.kqueue()
  112. kq2 = select.kqueue.fromfd(kq.fileno())
  113. ev = select.kevent(server.fileno(),
  114. select.KQ_FILTER_WRITE,
  115. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  116. kq.control([ev], 0)
  117. ev = select.kevent(server.fileno(),
  118. select.KQ_FILTER_READ,
  119. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  120. kq.control([ev], 0)
  121. ev = select.kevent(client.fileno(),
  122. select.KQ_FILTER_WRITE,
  123. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  124. kq2.control([ev], 0)
  125. ev = select.kevent(client.fileno(),
  126. select.KQ_FILTER_READ,
  127. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  128. kq2.control([ev], 0)
  129. events = kq.control(None, 4, 1)
  130. events = set((e.ident, e.filter) for e in events)
  131. self.assertEqual(events, set([
  132. (client.fileno(), select.KQ_FILTER_WRITE),
  133. (server.fileno(), select.KQ_FILTER_WRITE)]))
  134. client.send(b"Hello!")
  135. server.send(b"world!!!")
  136. # We may need to call it several times
  137. for i in range(10):
  138. events = kq.control(None, 4, 1)
  139. if len(events) == 4:
  140. break
  141. time.sleep(1.0)
  142. else:
  143. self.fail('timeout waiting for event notifications')
  144. events = set((e.ident, e.filter) for e in events)
  145. self.assertEqual(events, set([
  146. (client.fileno(), select.KQ_FILTER_WRITE),
  147. (client.fileno(), select.KQ_FILTER_READ),
  148. (server.fileno(), select.KQ_FILTER_WRITE),
  149. (server.fileno(), select.KQ_FILTER_READ)]))
  150. # Remove completely client, and server read part
  151. ev = select.kevent(client.fileno(),
  152. select.KQ_FILTER_WRITE,
  153. select.KQ_EV_DELETE)
  154. kq.control([ev], 0)
  155. ev = select.kevent(client.fileno(),
  156. select.KQ_FILTER_READ,
  157. select.KQ_EV_DELETE)
  158. kq.control([ev], 0)
  159. ev = select.kevent(server.fileno(),
  160. select.KQ_FILTER_READ,
  161. select.KQ_EV_DELETE)
  162. kq.control([ev], 0, 0)
  163. events = kq.control([], 4, 0.99)
  164. events = set((e.ident, e.filter) for e in events)
  165. self.assertEqual(events, set([
  166. (server.fileno(), select.KQ_FILTER_WRITE)]))
  167. client.close()
  168. server.close()
  169. serverSocket.close()
  170. def testPair(self):
  171. kq = select.kqueue()
  172. a, b = socket.socketpair()
  173. a.send(b'foo')
  174. event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  175. event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  176. r = kq.control([event1, event2], 1, 1)
  177. self.assertTrue(r)
  178. self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
  179. self.assertEqual(b.recv(r[0].data), b'foo')
  180. a.close()
  181. b.close()
  182. kq.close()
  183. def test_issue30058(self):
  184. # changelist must be an iterable
  185. kq = select.kqueue()
  186. a, b = socket.socketpair()
  187. ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  188. kq.control([ev], 0)
  189. # not a list
  190. kq.control((ev,), 0)
  191. # __len__ is not consistent with __iter__
  192. class BadList:
  193. def __len__(self):
  194. return 0
  195. def __iter__(self):
  196. for i in range(100):
  197. yield ev
  198. kq.control(BadList(), 0)
  199. # doesn't have __len__
  200. kq.control(iter([ev]), 0)
  201. a.close()
  202. b.close()
  203. kq.close()
  204. def test_close(self):
  205. open_file = open(__file__, "rb")
  206. self.addCleanup(open_file.close)
  207. fd = open_file.fileno()
  208. kqueue = select.kqueue()
  209. # test fileno() method and closed attribute
  210. self.assertIsInstance(kqueue.fileno(), int)
  211. self.assertFalse(kqueue.closed)
  212. # test close()
  213. kqueue.close()
  214. self.assertTrue(kqueue.closed)
  215. self.assertRaises(ValueError, kqueue.fileno)
  216. # close() can be called more than once
  217. kqueue.close()
  218. # operations must fail with ValueError("I/O operation on closed ...")
  219. self.assertRaises(ValueError, kqueue.control, None, 4)
  220. def test_fd_non_inheritable(self):
  221. kqueue = select.kqueue()
  222. self.addCleanup(kqueue.close)
  223. self.assertEqual(os.get_inheritable(kqueue.fileno()), False)
  224. if __name__ == "__main__":
  225. unittest.main()