lock_tests.py 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046
  1. """
  2. Various tests for synchronization primitives.
  3. """
  4. import os
  5. import gc
  6. import sys
  7. import time
  8. from _thread import start_new_thread, TIMEOUT_MAX
  9. import threading
  10. import unittest
  11. import weakref
  12. from test import support
  13. from test.support import threading_helper
  14. requires_fork = unittest.skipUnless(support.has_fork_support,
  15. "platform doesn't support fork "
  16. "(no _at_fork_reinit method)")
  17. def _wait():
  18. # A crude wait/yield function not relying on synchronization primitives.
  19. time.sleep(0.01)
  20. class Bunch(object):
  21. """
  22. A bunch of threads.
  23. """
  24. def __init__(self, f, n, wait_before_exit=False):
  25. """
  26. Construct a bunch of `n` threads running the same function `f`.
  27. If `wait_before_exit` is True, the threads won't terminate until
  28. do_finish() is called.
  29. """
  30. self.f = f
  31. self.n = n
  32. self.started = []
  33. self.finished = []
  34. self._can_exit = not wait_before_exit
  35. self.wait_thread = threading_helper.wait_threads_exit()
  36. self.wait_thread.__enter__()
  37. def task():
  38. tid = threading.get_ident()
  39. self.started.append(tid)
  40. try:
  41. f()
  42. finally:
  43. self.finished.append(tid)
  44. while not self._can_exit:
  45. _wait()
  46. try:
  47. for i in range(n):
  48. start_new_thread(task, ())
  49. except:
  50. self._can_exit = True
  51. raise
  52. def wait_for_started(self):
  53. while len(self.started) < self.n:
  54. _wait()
  55. def wait_for_finished(self):
  56. while len(self.finished) < self.n:
  57. _wait()
  58. # Wait for threads exit
  59. self.wait_thread.__exit__(None, None, None)
  60. def do_finish(self):
  61. self._can_exit = True
  62. class BaseTestCase(unittest.TestCase):
  63. def setUp(self):
  64. self._threads = threading_helper.threading_setup()
  65. def tearDown(self):
  66. threading_helper.threading_cleanup(*self._threads)
  67. support.reap_children()
  68. def assertTimeout(self, actual, expected):
  69. # The waiting and/or time.monotonic() can be imprecise, which
  70. # is why comparing to the expected value would sometimes fail
  71. # (especially under Windows).
  72. self.assertGreaterEqual(actual, expected * 0.6)
  73. # Test nothing insane happened
  74. self.assertLess(actual, expected * 10.0)
  75. class BaseLockTests(BaseTestCase):
  76. """
  77. Tests for both recursive and non-recursive locks.
  78. """
  79. def test_constructor(self):
  80. lock = self.locktype()
  81. del lock
  82. def test_repr(self):
  83. lock = self.locktype()
  84. self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
  85. del lock
  86. def test_locked_repr(self):
  87. lock = self.locktype()
  88. lock.acquire()
  89. self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
  90. del lock
  91. def test_acquire_destroy(self):
  92. lock = self.locktype()
  93. lock.acquire()
  94. del lock
  95. def test_acquire_release(self):
  96. lock = self.locktype()
  97. lock.acquire()
  98. lock.release()
  99. del lock
  100. def test_try_acquire(self):
  101. lock = self.locktype()
  102. self.assertTrue(lock.acquire(False))
  103. lock.release()
  104. def test_try_acquire_contended(self):
  105. lock = self.locktype()
  106. lock.acquire()
  107. result = []
  108. def f():
  109. result.append(lock.acquire(False))
  110. Bunch(f, 1).wait_for_finished()
  111. self.assertFalse(result[0])
  112. lock.release()
  113. def test_acquire_contended(self):
  114. lock = self.locktype()
  115. lock.acquire()
  116. N = 5
  117. def f():
  118. lock.acquire()
  119. lock.release()
  120. b = Bunch(f, N)
  121. b.wait_for_started()
  122. _wait()
  123. self.assertEqual(len(b.finished), 0)
  124. lock.release()
  125. b.wait_for_finished()
  126. self.assertEqual(len(b.finished), N)
  127. def test_with(self):
  128. lock = self.locktype()
  129. def f():
  130. lock.acquire()
  131. lock.release()
  132. def _with(err=None):
  133. with lock:
  134. if err is not None:
  135. raise err
  136. _with()
  137. # Check the lock is unacquired
  138. Bunch(f, 1).wait_for_finished()
  139. self.assertRaises(TypeError, _with, TypeError)
  140. # Check the lock is unacquired
  141. Bunch(f, 1).wait_for_finished()
  142. def test_thread_leak(self):
  143. # The lock shouldn't leak a Thread instance when used from a foreign
  144. # (non-threading) thread.
  145. lock = self.locktype()
  146. def f():
  147. lock.acquire()
  148. lock.release()
  149. n = len(threading.enumerate())
  150. # We run many threads in the hope that existing threads ids won't
  151. # be recycled.
  152. Bunch(f, 15).wait_for_finished()
  153. if len(threading.enumerate()) != n:
  154. # There is a small window during which a Thread instance's
  155. # target function has finished running, but the Thread is still
  156. # alive and registered. Avoid spurious failures by waiting a
  157. # bit more (seen on a buildbot).
  158. time.sleep(0.4)
  159. self.assertEqual(n, len(threading.enumerate()))
  160. def test_timeout(self):
  161. lock = self.locktype()
  162. # Can't set timeout if not blocking
  163. self.assertRaises(ValueError, lock.acquire, False, 1)
  164. # Invalid timeout values
  165. self.assertRaises(ValueError, lock.acquire, timeout=-100)
  166. self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
  167. self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
  168. # TIMEOUT_MAX is ok
  169. lock.acquire(timeout=TIMEOUT_MAX)
  170. lock.release()
  171. t1 = time.monotonic()
  172. self.assertTrue(lock.acquire(timeout=5))
  173. t2 = time.monotonic()
  174. # Just a sanity test that it didn't actually wait for the timeout.
  175. self.assertLess(t2 - t1, 5)
  176. results = []
  177. def f():
  178. t1 = time.monotonic()
  179. results.append(lock.acquire(timeout=0.5))
  180. t2 = time.monotonic()
  181. results.append(t2 - t1)
  182. Bunch(f, 1).wait_for_finished()
  183. self.assertFalse(results[0])
  184. self.assertTimeout(results[1], 0.5)
  185. def test_weakref_exists(self):
  186. lock = self.locktype()
  187. ref = weakref.ref(lock)
  188. self.assertIsNotNone(ref())
  189. def test_weakref_deleted(self):
  190. lock = self.locktype()
  191. ref = weakref.ref(lock)
  192. del lock
  193. gc.collect() # For PyPy or other GCs.
  194. self.assertIsNone(ref())
  195. class LockTests(BaseLockTests):
  196. """
  197. Tests for non-recursive, weak locks
  198. (which can be acquired and released from different threads).
  199. """
  200. def test_reacquire(self):
  201. # Lock needs to be released before re-acquiring.
  202. lock = self.locktype()
  203. phase = []
  204. def f():
  205. lock.acquire()
  206. phase.append(None)
  207. lock.acquire()
  208. phase.append(None)
  209. with threading_helper.wait_threads_exit():
  210. start_new_thread(f, ())
  211. while len(phase) == 0:
  212. _wait()
  213. _wait()
  214. self.assertEqual(len(phase), 1)
  215. lock.release()
  216. while len(phase) == 1:
  217. _wait()
  218. self.assertEqual(len(phase), 2)
  219. def test_different_thread(self):
  220. # Lock can be released from a different thread.
  221. lock = self.locktype()
  222. lock.acquire()
  223. def f():
  224. lock.release()
  225. b = Bunch(f, 1)
  226. b.wait_for_finished()
  227. lock.acquire()
  228. lock.release()
  229. def test_state_after_timeout(self):
  230. # Issue #11618: check that lock is in a proper state after a
  231. # (non-zero) timeout.
  232. lock = self.locktype()
  233. lock.acquire()
  234. self.assertFalse(lock.acquire(timeout=0.01))
  235. lock.release()
  236. self.assertFalse(lock.locked())
  237. self.assertTrue(lock.acquire(blocking=False))
  238. @requires_fork
  239. def test_at_fork_reinit(self):
  240. def use_lock(lock):
  241. # make sure that the lock still works normally
  242. # after _at_fork_reinit()
  243. lock.acquire()
  244. lock.release()
  245. # unlocked
  246. lock = self.locktype()
  247. lock._at_fork_reinit()
  248. use_lock(lock)
  249. # locked: _at_fork_reinit() resets the lock to the unlocked state
  250. lock2 = self.locktype()
  251. lock2.acquire()
  252. lock2._at_fork_reinit()
  253. use_lock(lock2)
  254. class RLockTests(BaseLockTests):
  255. """
  256. Tests for recursive locks.
  257. """
  258. def test_reacquire(self):
  259. lock = self.locktype()
  260. lock.acquire()
  261. lock.acquire()
  262. lock.release()
  263. lock.acquire()
  264. lock.release()
  265. lock.release()
  266. def test_release_unacquired(self):
  267. # Cannot release an unacquired lock
  268. lock = self.locktype()
  269. self.assertRaises(RuntimeError, lock.release)
  270. lock.acquire()
  271. lock.acquire()
  272. lock.release()
  273. lock.acquire()
  274. lock.release()
  275. lock.release()
  276. self.assertRaises(RuntimeError, lock.release)
  277. def test_release_save_unacquired(self):
  278. # Cannot _release_save an unacquired lock
  279. lock = self.locktype()
  280. self.assertRaises(RuntimeError, lock._release_save)
  281. lock.acquire()
  282. lock.acquire()
  283. lock.release()
  284. lock.acquire()
  285. lock.release()
  286. lock.release()
  287. self.assertRaises(RuntimeError, lock._release_save)
  288. def test_different_thread(self):
  289. # Cannot release from a different thread
  290. lock = self.locktype()
  291. def f():
  292. lock.acquire()
  293. b = Bunch(f, 1, True)
  294. try:
  295. self.assertRaises(RuntimeError, lock.release)
  296. finally:
  297. b.do_finish()
  298. b.wait_for_finished()
  299. def test__is_owned(self):
  300. lock = self.locktype()
  301. self.assertFalse(lock._is_owned())
  302. lock.acquire()
  303. self.assertTrue(lock._is_owned())
  304. lock.acquire()
  305. self.assertTrue(lock._is_owned())
  306. result = []
  307. def f():
  308. result.append(lock._is_owned())
  309. Bunch(f, 1).wait_for_finished()
  310. self.assertFalse(result[0])
  311. lock.release()
  312. self.assertTrue(lock._is_owned())
  313. lock.release()
  314. self.assertFalse(lock._is_owned())
  315. class EventTests(BaseTestCase):
  316. """
  317. Tests for Event objects.
  318. """
  319. def test_is_set(self):
  320. evt = self.eventtype()
  321. self.assertFalse(evt.is_set())
  322. evt.set()
  323. self.assertTrue(evt.is_set())
  324. evt.set()
  325. self.assertTrue(evt.is_set())
  326. evt.clear()
  327. self.assertFalse(evt.is_set())
  328. evt.clear()
  329. self.assertFalse(evt.is_set())
  330. def _check_notify(self, evt):
  331. # All threads get notified
  332. N = 5
  333. results1 = []
  334. results2 = []
  335. def f():
  336. results1.append(evt.wait())
  337. results2.append(evt.wait())
  338. b = Bunch(f, N)
  339. b.wait_for_started()
  340. _wait()
  341. self.assertEqual(len(results1), 0)
  342. evt.set()
  343. b.wait_for_finished()
  344. self.assertEqual(results1, [True] * N)
  345. self.assertEqual(results2, [True] * N)
  346. def test_notify(self):
  347. evt = self.eventtype()
  348. self._check_notify(evt)
  349. # Another time, after an explicit clear()
  350. evt.set()
  351. evt.clear()
  352. self._check_notify(evt)
  353. def test_timeout(self):
  354. evt = self.eventtype()
  355. results1 = []
  356. results2 = []
  357. N = 5
  358. def f():
  359. results1.append(evt.wait(0.0))
  360. t1 = time.monotonic()
  361. r = evt.wait(0.5)
  362. t2 = time.monotonic()
  363. results2.append((r, t2 - t1))
  364. Bunch(f, N).wait_for_finished()
  365. self.assertEqual(results1, [False] * N)
  366. for r, dt in results2:
  367. self.assertFalse(r)
  368. self.assertTimeout(dt, 0.5)
  369. # The event is set
  370. results1 = []
  371. results2 = []
  372. evt.set()
  373. Bunch(f, N).wait_for_finished()
  374. self.assertEqual(results1, [True] * N)
  375. for r, dt in results2:
  376. self.assertTrue(r)
  377. def test_set_and_clear(self):
  378. # Issue #13502: check that wait() returns true even when the event is
  379. # cleared before the waiting thread is woken up.
  380. evt = self.eventtype()
  381. results = []
  382. timeout = 0.250
  383. N = 5
  384. def f():
  385. results.append(evt.wait(timeout * 4))
  386. b = Bunch(f, N)
  387. b.wait_for_started()
  388. time.sleep(timeout)
  389. evt.set()
  390. evt.clear()
  391. b.wait_for_finished()
  392. self.assertEqual(results, [True] * N)
  393. @requires_fork
  394. def test_at_fork_reinit(self):
  395. # ensure that condition is still using a Lock after reset
  396. evt = self.eventtype()
  397. with evt._cond:
  398. self.assertFalse(evt._cond.acquire(False))
  399. evt._at_fork_reinit()
  400. with evt._cond:
  401. self.assertFalse(evt._cond.acquire(False))
  402. def test_repr(self):
  403. evt = self.eventtype()
  404. self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>")
  405. evt.set()
  406. self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>")
  407. class ConditionTests(BaseTestCase):
  408. """
  409. Tests for condition variables.
  410. """
  411. def test_acquire(self):
  412. cond = self.condtype()
  413. # Be default we have an RLock: the condition can be acquired multiple
  414. # times.
  415. cond.acquire()
  416. cond.acquire()
  417. cond.release()
  418. cond.release()
  419. lock = threading.Lock()
  420. cond = self.condtype(lock)
  421. cond.acquire()
  422. self.assertFalse(lock.acquire(False))
  423. cond.release()
  424. self.assertTrue(lock.acquire(False))
  425. self.assertFalse(cond.acquire(False))
  426. lock.release()
  427. with cond:
  428. self.assertFalse(lock.acquire(False))
  429. def test_unacquired_wait(self):
  430. cond = self.condtype()
  431. self.assertRaises(RuntimeError, cond.wait)
  432. def test_unacquired_notify(self):
  433. cond = self.condtype()
  434. self.assertRaises(RuntimeError, cond.notify)
  435. def _check_notify(self, cond):
  436. # Note that this test is sensitive to timing. If the worker threads
  437. # don't execute in a timely fashion, the main thread may think they
  438. # are further along then they are. The main thread therefore issues
  439. # _wait() statements to try to make sure that it doesn't race ahead
  440. # of the workers.
  441. # Secondly, this test assumes that condition variables are not subject
  442. # to spurious wakeups. The absence of spurious wakeups is an implementation
  443. # detail of Condition Variables in current CPython, but in general, not
  444. # a guaranteed property of condition variables as a programming
  445. # construct. In particular, it is possible that this can no longer
  446. # be conveniently guaranteed should their implementation ever change.
  447. N = 5
  448. ready = []
  449. results1 = []
  450. results2 = []
  451. phase_num = 0
  452. def f():
  453. cond.acquire()
  454. ready.append(phase_num)
  455. result = cond.wait()
  456. cond.release()
  457. results1.append((result, phase_num))
  458. cond.acquire()
  459. ready.append(phase_num)
  460. result = cond.wait()
  461. cond.release()
  462. results2.append((result, phase_num))
  463. b = Bunch(f, N)
  464. b.wait_for_started()
  465. # first wait, to ensure all workers settle into cond.wait() before
  466. # we continue. See issues #8799 and #30727.
  467. while len(ready) < 5:
  468. _wait()
  469. ready.clear()
  470. self.assertEqual(results1, [])
  471. # Notify 3 threads at first
  472. cond.acquire()
  473. cond.notify(3)
  474. _wait()
  475. phase_num = 1
  476. cond.release()
  477. while len(results1) < 3:
  478. _wait()
  479. self.assertEqual(results1, [(True, 1)] * 3)
  480. self.assertEqual(results2, [])
  481. # make sure all awaken workers settle into cond.wait()
  482. while len(ready) < 3:
  483. _wait()
  484. # Notify 5 threads: they might be in their first or second wait
  485. cond.acquire()
  486. cond.notify(5)
  487. _wait()
  488. phase_num = 2
  489. cond.release()
  490. while len(results1) + len(results2) < 8:
  491. _wait()
  492. self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
  493. self.assertEqual(results2, [(True, 2)] * 3)
  494. # make sure all workers settle into cond.wait()
  495. while len(ready) < 5:
  496. _wait()
  497. # Notify all threads: they are all in their second wait
  498. cond.acquire()
  499. cond.notify_all()
  500. _wait()
  501. phase_num = 3
  502. cond.release()
  503. while len(results2) < 5:
  504. _wait()
  505. self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
  506. self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
  507. b.wait_for_finished()
  508. def test_notify(self):
  509. cond = self.condtype()
  510. self._check_notify(cond)
  511. # A second time, to check internal state is still ok.
  512. self._check_notify(cond)
  513. def test_timeout(self):
  514. cond = self.condtype()
  515. results = []
  516. N = 5
  517. def f():
  518. cond.acquire()
  519. t1 = time.monotonic()
  520. result = cond.wait(0.5)
  521. t2 = time.monotonic()
  522. cond.release()
  523. results.append((t2 - t1, result))
  524. Bunch(f, N).wait_for_finished()
  525. self.assertEqual(len(results), N)
  526. for dt, result in results:
  527. self.assertTimeout(dt, 0.5)
  528. # Note that conceptually (that"s the condition variable protocol)
  529. # a wait() may succeed even if no one notifies us and before any
  530. # timeout occurs. Spurious wakeups can occur.
  531. # This makes it hard to verify the result value.
  532. # In practice, this implementation has no spurious wakeups.
  533. self.assertFalse(result)
  534. def test_waitfor(self):
  535. cond = self.condtype()
  536. state = 0
  537. def f():
  538. with cond:
  539. result = cond.wait_for(lambda : state==4)
  540. self.assertTrue(result)
  541. self.assertEqual(state, 4)
  542. b = Bunch(f, 1)
  543. b.wait_for_started()
  544. for i in range(4):
  545. time.sleep(0.01)
  546. with cond:
  547. state += 1
  548. cond.notify()
  549. b.wait_for_finished()
  550. def test_waitfor_timeout(self):
  551. cond = self.condtype()
  552. state = 0
  553. success = []
  554. def f():
  555. with cond:
  556. dt = time.monotonic()
  557. result = cond.wait_for(lambda : state==4, timeout=0.1)
  558. dt = time.monotonic() - dt
  559. self.assertFalse(result)
  560. self.assertTimeout(dt, 0.1)
  561. success.append(None)
  562. b = Bunch(f, 1)
  563. b.wait_for_started()
  564. # Only increment 3 times, so state == 4 is never reached.
  565. for i in range(3):
  566. time.sleep(0.01)
  567. with cond:
  568. state += 1
  569. cond.notify()
  570. b.wait_for_finished()
  571. self.assertEqual(len(success), 1)
  572. class BaseSemaphoreTests(BaseTestCase):
  573. """
  574. Common tests for {bounded, unbounded} semaphore objects.
  575. """
  576. def test_constructor(self):
  577. self.assertRaises(ValueError, self.semtype, value = -1)
  578. self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
  579. def test_acquire(self):
  580. sem = self.semtype(1)
  581. sem.acquire()
  582. sem.release()
  583. sem = self.semtype(2)
  584. sem.acquire()
  585. sem.acquire()
  586. sem.release()
  587. sem.release()
  588. def test_acquire_destroy(self):
  589. sem = self.semtype()
  590. sem.acquire()
  591. del sem
  592. def test_acquire_contended(self):
  593. sem = self.semtype(7)
  594. sem.acquire()
  595. N = 10
  596. sem_results = []
  597. results1 = []
  598. results2 = []
  599. phase_num = 0
  600. def f():
  601. sem_results.append(sem.acquire())
  602. results1.append(phase_num)
  603. sem_results.append(sem.acquire())
  604. results2.append(phase_num)
  605. b = Bunch(f, 10)
  606. b.wait_for_started()
  607. while len(results1) + len(results2) < 6:
  608. _wait()
  609. self.assertEqual(results1 + results2, [0] * 6)
  610. phase_num = 1
  611. for i in range(7):
  612. sem.release()
  613. while len(results1) + len(results2) < 13:
  614. _wait()
  615. self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
  616. phase_num = 2
  617. for i in range(6):
  618. sem.release()
  619. while len(results1) + len(results2) < 19:
  620. _wait()
  621. self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
  622. # The semaphore is still locked
  623. self.assertFalse(sem.acquire(False))
  624. # Final release, to let the last thread finish
  625. sem.release()
  626. b.wait_for_finished()
  627. self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
  628. def test_multirelease(self):
  629. sem = self.semtype(7)
  630. sem.acquire()
  631. results1 = []
  632. results2 = []
  633. phase_num = 0
  634. def f():
  635. sem.acquire()
  636. results1.append(phase_num)
  637. sem.acquire()
  638. results2.append(phase_num)
  639. b = Bunch(f, 10)
  640. b.wait_for_started()
  641. while len(results1) + len(results2) < 6:
  642. _wait()
  643. self.assertEqual(results1 + results2, [0] * 6)
  644. phase_num = 1
  645. sem.release(7)
  646. while len(results1) + len(results2) < 13:
  647. _wait()
  648. self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
  649. phase_num = 2
  650. sem.release(6)
  651. while len(results1) + len(results2) < 19:
  652. _wait()
  653. self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
  654. # The semaphore is still locked
  655. self.assertFalse(sem.acquire(False))
  656. # Final release, to let the last thread finish
  657. sem.release()
  658. b.wait_for_finished()
  659. def test_try_acquire(self):
  660. sem = self.semtype(2)
  661. self.assertTrue(sem.acquire(False))
  662. self.assertTrue(sem.acquire(False))
  663. self.assertFalse(sem.acquire(False))
  664. sem.release()
  665. self.assertTrue(sem.acquire(False))
  666. def test_try_acquire_contended(self):
  667. sem = self.semtype(4)
  668. sem.acquire()
  669. results = []
  670. def f():
  671. results.append(sem.acquire(False))
  672. results.append(sem.acquire(False))
  673. Bunch(f, 5).wait_for_finished()
  674. # There can be a thread switch between acquiring the semaphore and
  675. # appending the result, therefore results will not necessarily be
  676. # ordered.
  677. self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
  678. def test_acquire_timeout(self):
  679. sem = self.semtype(2)
  680. self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
  681. self.assertTrue(sem.acquire(timeout=0.005))
  682. self.assertTrue(sem.acquire(timeout=0.005))
  683. self.assertFalse(sem.acquire(timeout=0.005))
  684. sem.release()
  685. self.assertTrue(sem.acquire(timeout=0.005))
  686. t = time.monotonic()
  687. self.assertFalse(sem.acquire(timeout=0.5))
  688. dt = time.monotonic() - t
  689. self.assertTimeout(dt, 0.5)
  690. def test_default_value(self):
  691. # The default initial value is 1.
  692. sem = self.semtype()
  693. sem.acquire()
  694. def f():
  695. sem.acquire()
  696. sem.release()
  697. b = Bunch(f, 1)
  698. b.wait_for_started()
  699. _wait()
  700. self.assertFalse(b.finished)
  701. sem.release()
  702. b.wait_for_finished()
  703. def test_with(self):
  704. sem = self.semtype(2)
  705. def _with(err=None):
  706. with sem:
  707. self.assertTrue(sem.acquire(False))
  708. sem.release()
  709. with sem:
  710. self.assertFalse(sem.acquire(False))
  711. if err:
  712. raise err
  713. _with()
  714. self.assertTrue(sem.acquire(False))
  715. sem.release()
  716. self.assertRaises(TypeError, _with, TypeError)
  717. self.assertTrue(sem.acquire(False))
  718. sem.release()
  719. class SemaphoreTests(BaseSemaphoreTests):
  720. """
  721. Tests for unbounded semaphores.
  722. """
  723. def test_release_unacquired(self):
  724. # Unbounded releases are allowed and increment the semaphore's value
  725. sem = self.semtype(1)
  726. sem.release()
  727. sem.acquire()
  728. sem.acquire()
  729. sem.release()
  730. def test_repr(self):
  731. sem = self.semtype(3)
  732. self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>")
  733. sem.acquire()
  734. self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>")
  735. sem.release()
  736. sem.release()
  737. self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>")
  738. class BoundedSemaphoreTests(BaseSemaphoreTests):
  739. """
  740. Tests for bounded semaphores.
  741. """
  742. def test_release_unacquired(self):
  743. # Cannot go past the initial value
  744. sem = self.semtype()
  745. self.assertRaises(ValueError, sem.release)
  746. sem.acquire()
  747. sem.release()
  748. self.assertRaises(ValueError, sem.release)
  749. def test_repr(self):
  750. sem = self.semtype(3)
  751. self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>")
  752. sem.acquire()
  753. self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>")
  754. class BarrierTests(BaseTestCase):
  755. """
  756. Tests for Barrier objects.
  757. """
  758. N = 5
  759. defaultTimeout = 2.0
  760. def setUp(self):
  761. self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
  762. def tearDown(self):
  763. self.barrier.abort()
  764. def run_threads(self, f):
  765. b = Bunch(f, self.N-1)
  766. f()
  767. b.wait_for_finished()
  768. def multipass(self, results, n):
  769. m = self.barrier.parties
  770. self.assertEqual(m, self.N)
  771. for i in range(n):
  772. results[0].append(True)
  773. self.assertEqual(len(results[1]), i * m)
  774. self.barrier.wait()
  775. results[1].append(True)
  776. self.assertEqual(len(results[0]), (i + 1) * m)
  777. self.barrier.wait()
  778. self.assertEqual(self.barrier.n_waiting, 0)
  779. self.assertFalse(self.barrier.broken)
  780. def test_barrier(self, passes=1):
  781. """
  782. Test that a barrier is passed in lockstep
  783. """
  784. results = [[],[]]
  785. def f():
  786. self.multipass(results, passes)
  787. self.run_threads(f)
  788. def test_barrier_10(self):
  789. """
  790. Test that a barrier works for 10 consecutive runs
  791. """
  792. return self.test_barrier(10)
  793. def test_wait_return(self):
  794. """
  795. test the return value from barrier.wait
  796. """
  797. results = []
  798. def f():
  799. r = self.barrier.wait()
  800. results.append(r)
  801. self.run_threads(f)
  802. self.assertEqual(sum(results), sum(range(self.N)))
  803. def test_action(self):
  804. """
  805. Test the 'action' callback
  806. """
  807. results = []
  808. def action():
  809. results.append(True)
  810. barrier = self.barriertype(self.N, action)
  811. def f():
  812. barrier.wait()
  813. self.assertEqual(len(results), 1)
  814. self.run_threads(f)
  815. def test_abort(self):
  816. """
  817. Test that an abort will put the barrier in a broken state
  818. """
  819. results1 = []
  820. results2 = []
  821. def f():
  822. try:
  823. i = self.barrier.wait()
  824. if i == self.N//2:
  825. raise RuntimeError
  826. self.barrier.wait()
  827. results1.append(True)
  828. except threading.BrokenBarrierError:
  829. results2.append(True)
  830. except RuntimeError:
  831. self.barrier.abort()
  832. pass
  833. self.run_threads(f)
  834. self.assertEqual(len(results1), 0)
  835. self.assertEqual(len(results2), self.N-1)
  836. self.assertTrue(self.barrier.broken)
  837. def test_reset(self):
  838. """
  839. Test that a 'reset' on a barrier frees the waiting threads
  840. """
  841. results1 = []
  842. results2 = []
  843. results3 = []
  844. def f():
  845. i = self.barrier.wait()
  846. if i == self.N//2:
  847. # Wait until the other threads are all in the barrier.
  848. while self.barrier.n_waiting < self.N-1:
  849. time.sleep(0.001)
  850. self.barrier.reset()
  851. else:
  852. try:
  853. self.barrier.wait()
  854. results1.append(True)
  855. except threading.BrokenBarrierError:
  856. results2.append(True)
  857. # Now, pass the barrier again
  858. self.barrier.wait()
  859. results3.append(True)
  860. self.run_threads(f)
  861. self.assertEqual(len(results1), 0)
  862. self.assertEqual(len(results2), self.N-1)
  863. self.assertEqual(len(results3), self.N)
  864. def test_abort_and_reset(self):
  865. """
  866. Test that a barrier can be reset after being broken.
  867. """
  868. results1 = []
  869. results2 = []
  870. results3 = []
  871. barrier2 = self.barriertype(self.N)
  872. def f():
  873. try:
  874. i = self.barrier.wait()
  875. if i == self.N//2:
  876. raise RuntimeError
  877. self.barrier.wait()
  878. results1.append(True)
  879. except threading.BrokenBarrierError:
  880. results2.append(True)
  881. except RuntimeError:
  882. self.barrier.abort()
  883. pass
  884. # Synchronize and reset the barrier. Must synchronize first so
  885. # that everyone has left it when we reset, and after so that no
  886. # one enters it before the reset.
  887. if barrier2.wait() == self.N//2:
  888. self.barrier.reset()
  889. barrier2.wait()
  890. self.barrier.wait()
  891. results3.append(True)
  892. self.run_threads(f)
  893. self.assertEqual(len(results1), 0)
  894. self.assertEqual(len(results2), self.N-1)
  895. self.assertEqual(len(results3), self.N)
  896. def test_timeout(self):
  897. """
  898. Test wait(timeout)
  899. """
  900. def f():
  901. i = self.barrier.wait()
  902. if i == self.N // 2:
  903. # One thread is late!
  904. time.sleep(1.0)
  905. # Default timeout is 2.0, so this is shorter.
  906. self.assertRaises(threading.BrokenBarrierError,
  907. self.barrier.wait, 0.5)
  908. self.run_threads(f)
  909. def test_default_timeout(self):
  910. """
  911. Test the barrier's default timeout
  912. """
  913. # create a barrier with a low default timeout
  914. barrier = self.barriertype(self.N, timeout=0.3)
  915. def f():
  916. i = barrier.wait()
  917. if i == self.N // 2:
  918. # One thread is later than the default timeout of 0.3s.
  919. time.sleep(1.0)
  920. self.assertRaises(threading.BrokenBarrierError, barrier.wait)
  921. self.run_threads(f)
  922. def test_single_thread(self):
  923. b = self.barriertype(1)
  924. b.wait()
  925. b.wait()
  926. def test_repr(self):
  927. b = self.barriertype(3)
  928. self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
  929. def f():
  930. b.wait(3)
  931. bunch = Bunch(f, 2)
  932. bunch.wait_for_started()
  933. time.sleep(0.2)
  934. self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>")
  935. b.wait(3)
  936. bunch.wait_for_finished()
  937. self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
  938. b.abort()
  939. self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>")