test_queue.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. # Some simple queue module tests, plus some failure conditions
  2. # to ensure the Queue locks remain stable.
  3. import itertools
  4. import random
  5. import threading
  6. import time
  7. import unittest
  8. import weakref
  9. from test.support import gc_collect
  10. from test.support import import_helper
  11. from test.support import threading_helper
  12. # queue module depends on threading primitives
  13. threading_helper.requires_working_threading(module=True)
  14. py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
  15. c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
  16. need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
  17. QUEUE_SIZE = 5
  18. def qfull(q):
  19. return q.maxsize > 0 and q.qsize() == q.maxsize
  20. # A thread to run a function that unclogs a blocked Queue.
  21. class _TriggerThread(threading.Thread):
  22. def __init__(self, fn, args):
  23. self.fn = fn
  24. self.args = args
  25. self.startedEvent = threading.Event()
  26. threading.Thread.__init__(self)
  27. def run(self):
  28. # The sleep isn't necessary, but is intended to give the blocking
  29. # function in the main thread a chance at actually blocking before
  30. # we unclog it. But if the sleep is longer than the timeout-based
  31. # tests wait in their blocking functions, those tests will fail.
  32. # So we give them much longer timeout values compared to the
  33. # sleep here (I aimed at 10 seconds for blocking functions --
  34. # they should never actually wait that long - they should make
  35. # progress as soon as we call self.fn()).
  36. time.sleep(0.1)
  37. self.startedEvent.set()
  38. self.fn(*self.args)
  39. # Execute a function that blocks, and in a separate thread, a function that
  40. # triggers the release. Returns the result of the blocking function. Caution:
  41. # block_func must guarantee to block until trigger_func is called, and
  42. # trigger_func must guarantee to change queue state so that block_func can make
  43. # enough progress to return. In particular, a block_func that just raises an
  44. # exception regardless of whether trigger_func is called will lead to
  45. # timing-dependent sporadic failures, and one of those went rarely seen but
  46. # undiagnosed for years. Now block_func must be unexceptional. If block_func
  47. # is supposed to raise an exception, call do_exceptional_blocking_test()
  48. # instead.
  49. class BlockingTestMixin:
  50. def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
  51. thread = _TriggerThread(trigger_func, trigger_args)
  52. thread.start()
  53. try:
  54. self.result = block_func(*block_args)
  55. # If block_func returned before our thread made the call, we failed!
  56. if not thread.startedEvent.is_set():
  57. self.fail("blocking function %r appeared not to block" %
  58. block_func)
  59. return self.result
  60. finally:
  61. threading_helper.join_thread(thread) # make sure the thread terminates
  62. # Call this instead if block_func is supposed to raise an exception.
  63. def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
  64. trigger_args, expected_exception_class):
  65. thread = _TriggerThread(trigger_func, trigger_args)
  66. thread.start()
  67. try:
  68. try:
  69. block_func(*block_args)
  70. except expected_exception_class:
  71. raise
  72. else:
  73. self.fail("expected exception of kind %r" %
  74. expected_exception_class)
  75. finally:
  76. threading_helper.join_thread(thread) # make sure the thread terminates
  77. if not thread.startedEvent.is_set():
  78. self.fail("trigger thread ended but event never set")
  79. class BaseQueueTestMixin(BlockingTestMixin):
  80. def setUp(self):
  81. self.cum = 0
  82. self.cumlock = threading.Lock()
  83. def basic_queue_test(self, q):
  84. if q.qsize():
  85. raise RuntimeError("Call this function with an empty queue")
  86. self.assertTrue(q.empty())
  87. self.assertFalse(q.full())
  88. # I guess we better check things actually queue correctly a little :)
  89. q.put(111)
  90. q.put(333)
  91. q.put(222)
  92. target_order = dict(Queue = [111, 333, 222],
  93. LifoQueue = [222, 333, 111],
  94. PriorityQueue = [111, 222, 333])
  95. actual_order = [q.get(), q.get(), q.get()]
  96. self.assertEqual(actual_order, target_order[q.__class__.__name__],
  97. "Didn't seem to queue the correct data!")
  98. for i in range(QUEUE_SIZE-1):
  99. q.put(i)
  100. self.assertTrue(q.qsize(), "Queue should not be empty")
  101. self.assertTrue(not qfull(q), "Queue should not be full")
  102. last = 2 * QUEUE_SIZE
  103. full = 3 * 2 * QUEUE_SIZE
  104. q.put(last)
  105. self.assertTrue(qfull(q), "Queue should be full")
  106. self.assertFalse(q.empty())
  107. self.assertTrue(q.full())
  108. try:
  109. q.put(full, block=0)
  110. self.fail("Didn't appear to block with a full queue")
  111. except self.queue.Full:
  112. pass
  113. try:
  114. q.put(full, timeout=0.01)
  115. self.fail("Didn't appear to time-out with a full queue")
  116. except self.queue.Full:
  117. pass
  118. # Test a blocking put
  119. self.do_blocking_test(q.put, (full,), q.get, ())
  120. self.do_blocking_test(q.put, (full, True, 10), q.get, ())
  121. # Empty it
  122. for i in range(QUEUE_SIZE):
  123. q.get()
  124. self.assertTrue(not q.qsize(), "Queue should be empty")
  125. try:
  126. q.get(block=0)
  127. self.fail("Didn't appear to block with an empty queue")
  128. except self.queue.Empty:
  129. pass
  130. try:
  131. q.get(timeout=0.01)
  132. self.fail("Didn't appear to time-out with an empty queue")
  133. except self.queue.Empty:
  134. pass
  135. # Test a blocking get
  136. self.do_blocking_test(q.get, (), q.put, ('empty',))
  137. self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
  138. def worker(self, q):
  139. while True:
  140. x = q.get()
  141. if x < 0:
  142. q.task_done()
  143. return
  144. with self.cumlock:
  145. self.cum += x
  146. q.task_done()
  147. def queue_join_test(self, q):
  148. self.cum = 0
  149. threads = []
  150. for i in (0,1):
  151. thread = threading.Thread(target=self.worker, args=(q,))
  152. thread.start()
  153. threads.append(thread)
  154. for i in range(100):
  155. q.put(i)
  156. q.join()
  157. self.assertEqual(self.cum, sum(range(100)),
  158. "q.join() did not block until all tasks were done")
  159. for i in (0,1):
  160. q.put(-1) # instruct the threads to close
  161. q.join() # verify that you can join twice
  162. for thread in threads:
  163. thread.join()
  164. def test_queue_task_done(self):
  165. # Test to make sure a queue task completed successfully.
  166. q = self.type2test()
  167. try:
  168. q.task_done()
  169. except ValueError:
  170. pass
  171. else:
  172. self.fail("Did not detect task count going negative")
  173. def test_queue_join(self):
  174. # Test that a queue join()s successfully, and before anything else
  175. # (done twice for insurance).
  176. q = self.type2test()
  177. self.queue_join_test(q)
  178. self.queue_join_test(q)
  179. try:
  180. q.task_done()
  181. except ValueError:
  182. pass
  183. else:
  184. self.fail("Did not detect task count going negative")
  185. def test_basic(self):
  186. # Do it a couple of times on the same queue.
  187. # Done twice to make sure works with same instance reused.
  188. q = self.type2test(QUEUE_SIZE)
  189. self.basic_queue_test(q)
  190. self.basic_queue_test(q)
  191. def test_negative_timeout_raises_exception(self):
  192. q = self.type2test(QUEUE_SIZE)
  193. with self.assertRaises(ValueError):
  194. q.put(1, timeout=-1)
  195. with self.assertRaises(ValueError):
  196. q.get(1, timeout=-1)
  197. def test_nowait(self):
  198. q = self.type2test(QUEUE_SIZE)
  199. for i in range(QUEUE_SIZE):
  200. q.put_nowait(1)
  201. with self.assertRaises(self.queue.Full):
  202. q.put_nowait(1)
  203. for i in range(QUEUE_SIZE):
  204. q.get_nowait()
  205. with self.assertRaises(self.queue.Empty):
  206. q.get_nowait()
  207. def test_shrinking_queue(self):
  208. # issue 10110
  209. q = self.type2test(3)
  210. q.put(1)
  211. q.put(2)
  212. q.put(3)
  213. with self.assertRaises(self.queue.Full):
  214. q.put_nowait(4)
  215. self.assertEqual(q.qsize(), 3)
  216. q.maxsize = 2 # shrink the queue
  217. with self.assertRaises(self.queue.Full):
  218. q.put_nowait(4)
  219. class QueueTest(BaseQueueTestMixin):
  220. def setUp(self):
  221. self.type2test = self.queue.Queue
  222. super().setUp()
  223. class PyQueueTest(QueueTest, unittest.TestCase):
  224. queue = py_queue
  225. @need_c_queue
  226. class CQueueTest(QueueTest, unittest.TestCase):
  227. queue = c_queue
  228. class LifoQueueTest(BaseQueueTestMixin):
  229. def setUp(self):
  230. self.type2test = self.queue.LifoQueue
  231. super().setUp()
  232. class PyLifoQueueTest(LifoQueueTest, unittest.TestCase):
  233. queue = py_queue
  234. @need_c_queue
  235. class CLifoQueueTest(LifoQueueTest, unittest.TestCase):
  236. queue = c_queue
  237. class PriorityQueueTest(BaseQueueTestMixin):
  238. def setUp(self):
  239. self.type2test = self.queue.PriorityQueue
  240. super().setUp()
  241. class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
  242. queue = py_queue
  243. @need_c_queue
  244. class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
  245. queue = c_queue
  246. # A Queue subclass that can provoke failure at a moment's notice :)
  247. class FailingQueueException(Exception): pass
  248. class FailingQueueTest(BlockingTestMixin):
  249. def setUp(self):
  250. Queue = self.queue.Queue
  251. class FailingQueue(Queue):
  252. def __init__(self, *args):
  253. self.fail_next_put = False
  254. self.fail_next_get = False
  255. Queue.__init__(self, *args)
  256. def _put(self, item):
  257. if self.fail_next_put:
  258. self.fail_next_put = False
  259. raise FailingQueueException("You Lose")
  260. return Queue._put(self, item)
  261. def _get(self):
  262. if self.fail_next_get:
  263. self.fail_next_get = False
  264. raise FailingQueueException("You Lose")
  265. return Queue._get(self)
  266. self.FailingQueue = FailingQueue
  267. super().setUp()
  268. def failing_queue_test(self, q):
  269. if q.qsize():
  270. raise RuntimeError("Call this function with an empty queue")
  271. for i in range(QUEUE_SIZE-1):
  272. q.put(i)
  273. # Test a failing non-blocking put.
  274. q.fail_next_put = True
  275. try:
  276. q.put("oops", block=0)
  277. self.fail("The queue didn't fail when it should have")
  278. except FailingQueueException:
  279. pass
  280. q.fail_next_put = True
  281. try:
  282. q.put("oops", timeout=0.1)
  283. self.fail("The queue didn't fail when it should have")
  284. except FailingQueueException:
  285. pass
  286. q.put("last")
  287. self.assertTrue(qfull(q), "Queue should be full")
  288. # Test a failing blocking put
  289. q.fail_next_put = True
  290. try:
  291. self.do_blocking_test(q.put, ("full",), q.get, ())
  292. self.fail("The queue didn't fail when it should have")
  293. except FailingQueueException:
  294. pass
  295. # Check the Queue isn't damaged.
  296. # put failed, but get succeeded - re-add
  297. q.put("last")
  298. # Test a failing timeout put
  299. q.fail_next_put = True
  300. try:
  301. self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
  302. FailingQueueException)
  303. self.fail("The queue didn't fail when it should have")
  304. except FailingQueueException:
  305. pass
  306. # Check the Queue isn't damaged.
  307. # put failed, but get succeeded - re-add
  308. q.put("last")
  309. self.assertTrue(qfull(q), "Queue should be full")
  310. q.get()
  311. self.assertTrue(not qfull(q), "Queue should not be full")
  312. q.put("last")
  313. self.assertTrue(qfull(q), "Queue should be full")
  314. # Test a blocking put
  315. self.do_blocking_test(q.put, ("full",), q.get, ())
  316. # Empty it
  317. for i in range(QUEUE_SIZE):
  318. q.get()
  319. self.assertTrue(not q.qsize(), "Queue should be empty")
  320. q.put("first")
  321. q.fail_next_get = True
  322. try:
  323. q.get()
  324. self.fail("The queue didn't fail when it should have")
  325. except FailingQueueException:
  326. pass
  327. self.assertTrue(q.qsize(), "Queue should not be empty")
  328. q.fail_next_get = True
  329. try:
  330. q.get(timeout=0.1)
  331. self.fail("The queue didn't fail when it should have")
  332. except FailingQueueException:
  333. pass
  334. self.assertTrue(q.qsize(), "Queue should not be empty")
  335. q.get()
  336. self.assertTrue(not q.qsize(), "Queue should be empty")
  337. q.fail_next_get = True
  338. try:
  339. self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
  340. FailingQueueException)
  341. self.fail("The queue didn't fail when it should have")
  342. except FailingQueueException:
  343. pass
  344. # put succeeded, but get failed.
  345. self.assertTrue(q.qsize(), "Queue should not be empty")
  346. q.get()
  347. self.assertTrue(not q.qsize(), "Queue should be empty")
  348. def test_failing_queue(self):
  349. # Test to make sure a queue is functioning correctly.
  350. # Done twice to the same instance.
  351. q = self.FailingQueue(QUEUE_SIZE)
  352. self.failing_queue_test(q)
  353. self.failing_queue_test(q)
  354. class PyFailingQueueTest(FailingQueueTest, unittest.TestCase):
  355. queue = py_queue
  356. @need_c_queue
  357. class CFailingQueueTest(FailingQueueTest, unittest.TestCase):
  358. queue = c_queue
  359. class BaseSimpleQueueTest:
  360. def setUp(self):
  361. self.q = self.type2test()
  362. def feed(self, q, seq, rnd, sentinel):
  363. while True:
  364. try:
  365. val = seq.pop()
  366. except IndexError:
  367. q.put(sentinel)
  368. return
  369. q.put(val)
  370. if rnd.random() > 0.5:
  371. time.sleep(rnd.random() * 1e-3)
  372. def consume(self, q, results, sentinel):
  373. while True:
  374. val = q.get()
  375. if val == sentinel:
  376. return
  377. results.append(val)
  378. def consume_nonblock(self, q, results, sentinel):
  379. while True:
  380. while True:
  381. try:
  382. val = q.get(block=False)
  383. except self.queue.Empty:
  384. time.sleep(1e-5)
  385. else:
  386. break
  387. if val == sentinel:
  388. return
  389. results.append(val)
  390. def consume_timeout(self, q, results, sentinel):
  391. while True:
  392. while True:
  393. try:
  394. val = q.get(timeout=1e-5)
  395. except self.queue.Empty:
  396. pass
  397. else:
  398. break
  399. if val == sentinel:
  400. return
  401. results.append(val)
  402. def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
  403. results = []
  404. sentinel = None
  405. seq = inputs.copy()
  406. seq.reverse()
  407. rnd = random.Random(42)
  408. exceptions = []
  409. def log_exceptions(f):
  410. def wrapper(*args, **kwargs):
  411. try:
  412. f(*args, **kwargs)
  413. except BaseException as e:
  414. exceptions.append(e)
  415. return wrapper
  416. feeders = [threading.Thread(target=log_exceptions(feed_func),
  417. args=(q, seq, rnd, sentinel))
  418. for i in range(n_threads)]
  419. consumers = [threading.Thread(target=log_exceptions(consume_func),
  420. args=(q, results, sentinel))
  421. for i in range(n_threads)]
  422. with threading_helper.start_threads(feeders + consumers):
  423. pass
  424. self.assertFalse(exceptions)
  425. self.assertTrue(q.empty())
  426. self.assertEqual(q.qsize(), 0)
  427. return results
  428. def test_basic(self):
  429. # Basic tests for get(), put() etc.
  430. q = self.q
  431. self.assertTrue(q.empty())
  432. self.assertEqual(q.qsize(), 0)
  433. q.put(1)
  434. self.assertFalse(q.empty())
  435. self.assertEqual(q.qsize(), 1)
  436. q.put(2)
  437. q.put_nowait(3)
  438. q.put(4)
  439. self.assertFalse(q.empty())
  440. self.assertEqual(q.qsize(), 4)
  441. self.assertEqual(q.get(), 1)
  442. self.assertEqual(q.qsize(), 3)
  443. self.assertEqual(q.get_nowait(), 2)
  444. self.assertEqual(q.qsize(), 2)
  445. self.assertEqual(q.get(block=False), 3)
  446. self.assertFalse(q.empty())
  447. self.assertEqual(q.qsize(), 1)
  448. self.assertEqual(q.get(timeout=0.1), 4)
  449. self.assertTrue(q.empty())
  450. self.assertEqual(q.qsize(), 0)
  451. with self.assertRaises(self.queue.Empty):
  452. q.get(block=False)
  453. with self.assertRaises(self.queue.Empty):
  454. q.get(timeout=1e-3)
  455. with self.assertRaises(self.queue.Empty):
  456. q.get_nowait()
  457. self.assertTrue(q.empty())
  458. self.assertEqual(q.qsize(), 0)
  459. def test_negative_timeout_raises_exception(self):
  460. q = self.q
  461. q.put(1)
  462. with self.assertRaises(ValueError):
  463. q.get(timeout=-1)
  464. def test_order(self):
  465. # Test a pair of concurrent put() and get()
  466. q = self.q
  467. inputs = list(range(100))
  468. results = self.run_threads(1, q, inputs, self.feed, self.consume)
  469. # One producer, one consumer => results appended in well-defined order
  470. self.assertEqual(results, inputs)
  471. def test_many_threads(self):
  472. # Test multiple concurrent put() and get()
  473. N = 50
  474. q = self.q
  475. inputs = list(range(10000))
  476. results = self.run_threads(N, q, inputs, self.feed, self.consume)
  477. # Multiple consumers without synchronization append the
  478. # results in random order
  479. self.assertEqual(sorted(results), inputs)
  480. def test_many_threads_nonblock(self):
  481. # Test multiple concurrent put() and get(block=False)
  482. N = 50
  483. q = self.q
  484. inputs = list(range(10000))
  485. results = self.run_threads(N, q, inputs,
  486. self.feed, self.consume_nonblock)
  487. self.assertEqual(sorted(results), inputs)
  488. def test_many_threads_timeout(self):
  489. # Test multiple concurrent put() and get(timeout=...)
  490. N = 50
  491. q = self.q
  492. inputs = list(range(1000))
  493. results = self.run_threads(N, q, inputs,
  494. self.feed, self.consume_timeout)
  495. self.assertEqual(sorted(results), inputs)
  496. def test_references(self):
  497. # The queue should lose references to each item as soon as
  498. # it leaves the queue.
  499. class C:
  500. pass
  501. N = 20
  502. q = self.q
  503. for i in range(N):
  504. q.put(C())
  505. for i in range(N):
  506. wr = weakref.ref(q.get())
  507. gc_collect() # For PyPy or other GCs.
  508. self.assertIsNone(wr())
  509. class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
  510. queue = py_queue
  511. def setUp(self):
  512. self.type2test = self.queue._PySimpleQueue
  513. super().setUp()
  514. @need_c_queue
  515. class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
  516. queue = c_queue
  517. def setUp(self):
  518. self.type2test = self.queue.SimpleQueue
  519. super().setUp()
  520. def test_is_default(self):
  521. self.assertIs(self.type2test, self.queue.SimpleQueue)
  522. self.assertIs(self.type2test, self.queue.SimpleQueue)
  523. def test_reentrancy(self):
  524. # bpo-14976: put() may be called reentrantly in an asynchronous
  525. # callback.
  526. q = self.q
  527. gen = itertools.count()
  528. N = 10000
  529. results = []
  530. # This test exploits the fact that __del__ in a reference cycle
  531. # can be called any time the GC may run.
  532. class Circular(object):
  533. def __init__(self):
  534. self.circular = self
  535. def __del__(self):
  536. q.put(next(gen))
  537. while True:
  538. o = Circular()
  539. q.put(next(gen))
  540. del o
  541. results.append(q.get())
  542. if results[-1] >= N:
  543. break
  544. self.assertEqual(results, list(range(N + 1)))
  545. if __name__ == "__main__":
  546. unittest.main()