test_asynchat.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # test asynchat
  2. from test import support
  3. from test.support import socket_helper
  4. from test.support import threading_helper
  5. from test.support import warnings_helper
  6. import errno
  7. import socket
  8. import sys
  9. import threading
  10. import time
  11. import unittest
  12. import unittest.mock
  13. asynchat = warnings_helper.import_deprecated('asynchat')
  14. asyncore = warnings_helper.import_deprecated('asyncore')
  15. support.requires_working_socket(module=True)
  16. HOST = socket_helper.HOST
  17. SERVER_QUIT = b'QUIT\n'
  18. class echo_server(threading.Thread):
  19. # parameter to determine the number of bytes passed back to the
  20. # client each send
  21. chunk_size = 1
  22. def __init__(self, event):
  23. threading.Thread.__init__(self)
  24. self.event = event
  25. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  26. self.port = socket_helper.bind_port(self.sock)
  27. # This will be set if the client wants us to wait before echoing
  28. # data back.
  29. self.start_resend_event = None
  30. def run(self):
  31. self.sock.listen()
  32. self.event.set()
  33. conn, client = self.sock.accept()
  34. self.buffer = b""
  35. # collect data until quit message is seen
  36. while SERVER_QUIT not in self.buffer:
  37. data = conn.recv(1)
  38. if not data:
  39. break
  40. self.buffer = self.buffer + data
  41. # remove the SERVER_QUIT message
  42. self.buffer = self.buffer.replace(SERVER_QUIT, b'')
  43. if self.start_resend_event:
  44. self.start_resend_event.wait()
  45. # re-send entire set of collected data
  46. try:
  47. # this may fail on some tests, such as test_close_when_done,
  48. # since the client closes the channel when it's done sending
  49. while self.buffer:
  50. n = conn.send(self.buffer[:self.chunk_size])
  51. time.sleep(0.001)
  52. self.buffer = self.buffer[n:]
  53. except:
  54. pass
  55. conn.close()
  56. self.sock.close()
  57. class echo_client(asynchat.async_chat):
  58. def __init__(self, terminator, server_port):
  59. asynchat.async_chat.__init__(self)
  60. self.contents = []
  61. self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  62. self.connect((HOST, server_port))
  63. self.set_terminator(terminator)
  64. self.buffer = b""
  65. def handle_connect(self):
  66. pass
  67. if sys.platform == 'darwin':
  68. # select.poll returns a select.POLLHUP at the end of the tests
  69. # on darwin, so just ignore it
  70. def handle_expt(self):
  71. pass
  72. def collect_incoming_data(self, data):
  73. self.buffer += data
  74. def found_terminator(self):
  75. self.contents.append(self.buffer)
  76. self.buffer = b""
  77. def start_echo_server():
  78. event = threading.Event()
  79. s = echo_server(event)
  80. s.start()
  81. event.wait()
  82. event.clear()
  83. time.sleep(0.01) # Give server time to start accepting.
  84. return s, event
  85. class TestAsynchat(unittest.TestCase):
  86. usepoll = False
  87. def setUp(self):
  88. self._threads = threading_helper.threading_setup()
  89. def tearDown(self):
  90. threading_helper.threading_cleanup(*self._threads)
  91. def line_terminator_check(self, term, server_chunk):
  92. event = threading.Event()
  93. s = echo_server(event)
  94. s.chunk_size = server_chunk
  95. s.start()
  96. event.wait()
  97. event.clear()
  98. time.sleep(0.01) # Give server time to start accepting.
  99. c = echo_client(term, s.port)
  100. c.push(b"hello ")
  101. c.push(b"world" + term)
  102. c.push(b"I'm not dead yet!" + term)
  103. c.push(SERVER_QUIT)
  104. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  105. threading_helper.join_thread(s)
  106. self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
  107. # the line terminator tests below check receiving variously-sized
  108. # chunks back from the server in order to exercise all branches of
  109. # async_chat.handle_read
  110. def test_line_terminator1(self):
  111. # test one-character terminator
  112. for l in (1, 2, 3):
  113. self.line_terminator_check(b'\n', l)
  114. def test_line_terminator2(self):
  115. # test two-character terminator
  116. for l in (1, 2, 3):
  117. self.line_terminator_check(b'\r\n', l)
  118. def test_line_terminator3(self):
  119. # test three-character terminator
  120. for l in (1, 2, 3):
  121. self.line_terminator_check(b'qqq', l)
  122. def numeric_terminator_check(self, termlen):
  123. # Try reading a fixed number of bytes
  124. s, event = start_echo_server()
  125. c = echo_client(termlen, s.port)
  126. data = b"hello world, I'm not dead yet!\n"
  127. c.push(data)
  128. c.push(SERVER_QUIT)
  129. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  130. threading_helper.join_thread(s)
  131. self.assertEqual(c.contents, [data[:termlen]])
  132. def test_numeric_terminator1(self):
  133. # check that ints & longs both work (since type is
  134. # explicitly checked in async_chat.handle_read)
  135. self.numeric_terminator_check(1)
  136. def test_numeric_terminator2(self):
  137. self.numeric_terminator_check(6)
  138. def test_none_terminator(self):
  139. # Try reading a fixed number of bytes
  140. s, event = start_echo_server()
  141. c = echo_client(None, s.port)
  142. data = b"hello world, I'm not dead yet!\n"
  143. c.push(data)
  144. c.push(SERVER_QUIT)
  145. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  146. threading_helper.join_thread(s)
  147. self.assertEqual(c.contents, [])
  148. self.assertEqual(c.buffer, data)
  149. def test_simple_producer(self):
  150. s, event = start_echo_server()
  151. c = echo_client(b'\n', s.port)
  152. data = b"hello world\nI'm not dead yet!\n"
  153. p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
  154. c.push_with_producer(p)
  155. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  156. threading_helper.join_thread(s)
  157. self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
  158. def test_string_producer(self):
  159. s, event = start_echo_server()
  160. c = echo_client(b'\n', s.port)
  161. data = b"hello world\nI'm not dead yet!\n"
  162. c.push_with_producer(data+SERVER_QUIT)
  163. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  164. threading_helper.join_thread(s)
  165. self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
  166. def test_empty_line(self):
  167. # checks that empty lines are handled correctly
  168. s, event = start_echo_server()
  169. c = echo_client(b'\n', s.port)
  170. c.push(b"hello world\n\nI'm not dead yet!\n")
  171. c.push(SERVER_QUIT)
  172. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  173. threading_helper.join_thread(s)
  174. self.assertEqual(c.contents,
  175. [b"hello world", b"", b"I'm not dead yet!"])
  176. def test_close_when_done(self):
  177. s, event = start_echo_server()
  178. s.start_resend_event = threading.Event()
  179. c = echo_client(b'\n', s.port)
  180. c.push(b"hello world\nI'm not dead yet!\n")
  181. c.push(SERVER_QUIT)
  182. c.close_when_done()
  183. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  184. # Only allow the server to start echoing data back to the client after
  185. # the client has closed its connection. This prevents a race condition
  186. # where the server echoes all of its data before we can check that it
  187. # got any down below.
  188. s.start_resend_event.set()
  189. threading_helper.join_thread(s)
  190. self.assertEqual(c.contents, [])
  191. # the server might have been able to send a byte or two back, but this
  192. # at least checks that it received something and didn't just fail
  193. # (which could still result in the client not having received anything)
  194. self.assertGreater(len(s.buffer), 0)
  195. def test_push(self):
  196. # Issue #12523: push() should raise a TypeError if it doesn't get
  197. # a bytes string
  198. s, event = start_echo_server()
  199. c = echo_client(b'\n', s.port)
  200. data = b'bytes\n'
  201. c.push(data)
  202. c.push(bytearray(data))
  203. c.push(memoryview(data))
  204. self.assertRaises(TypeError, c.push, 10)
  205. self.assertRaises(TypeError, c.push, 'unicode')
  206. c.push(SERVER_QUIT)
  207. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  208. threading_helper.join_thread(s)
  209. self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
  210. class TestAsynchat_WithPoll(TestAsynchat):
  211. usepoll = True
  212. class TestAsynchatMocked(unittest.TestCase):
  213. def test_blockingioerror(self):
  214. # Issue #16133: handle_read() must ignore BlockingIOError
  215. sock = unittest.mock.Mock()
  216. sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
  217. dispatcher = asynchat.async_chat()
  218. dispatcher.set_socket(sock)
  219. self.addCleanup(dispatcher.del_channel)
  220. with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
  221. dispatcher.handle_read()
  222. self.assertFalse(error.called)
  223. class TestHelperFunctions(unittest.TestCase):
  224. def test_find_prefix_at_end(self):
  225. self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
  226. self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
  227. class TestNotConnected(unittest.TestCase):
  228. def test_disallow_negative_terminator(self):
  229. # Issue #11259
  230. client = asynchat.async_chat()
  231. self.assertRaises(ValueError, client.set_terminator, -1)
  232. if __name__ == "__main__":
  233. unittest.main()