test_sched.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import queue
  2. import sched
  3. import threading
  4. import time
  5. import unittest
  6. from test import support
  7. from test.support import threading_helper
  8. TIMEOUT = support.SHORT_TIMEOUT
  9. class Timer:
  10. def __init__(self):
  11. self._cond = threading.Condition()
  12. self._time = 0
  13. self._stop = 0
  14. def time(self):
  15. with self._cond:
  16. return self._time
  17. # increase the time but not beyond the established limit
  18. def sleep(self, t):
  19. assert t >= 0
  20. with self._cond:
  21. t += self._time
  22. while self._stop < t:
  23. self._time = self._stop
  24. self._cond.wait()
  25. self._time = t
  26. # advance time limit for user code
  27. def advance(self, t):
  28. assert t >= 0
  29. with self._cond:
  30. self._stop += t
  31. self._cond.notify_all()
  32. class TestCase(unittest.TestCase):
  33. def test_enter(self):
  34. l = []
  35. fun = lambda x: l.append(x)
  36. scheduler = sched.scheduler(time.time, time.sleep)
  37. for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
  38. z = scheduler.enter(x, 1, fun, (x,))
  39. scheduler.run()
  40. self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
  41. def test_enterabs(self):
  42. l = []
  43. fun = lambda x: l.append(x)
  44. scheduler = sched.scheduler(time.time, time.sleep)
  45. for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
  46. z = scheduler.enterabs(x, 1, fun, (x,))
  47. scheduler.run()
  48. self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
  49. @threading_helper.requires_working_threading()
  50. def test_enter_concurrent(self):
  51. q = queue.Queue()
  52. fun = q.put
  53. timer = Timer()
  54. scheduler = sched.scheduler(timer.time, timer.sleep)
  55. scheduler.enter(1, 1, fun, (1,))
  56. scheduler.enter(3, 1, fun, (3,))
  57. t = threading.Thread(target=scheduler.run)
  58. t.start()
  59. timer.advance(1)
  60. self.assertEqual(q.get(timeout=TIMEOUT), 1)
  61. self.assertTrue(q.empty())
  62. for x in [4, 5, 2]:
  63. z = scheduler.enter(x - 1, 1, fun, (x,))
  64. timer.advance(2)
  65. self.assertEqual(q.get(timeout=TIMEOUT), 2)
  66. self.assertEqual(q.get(timeout=TIMEOUT), 3)
  67. self.assertTrue(q.empty())
  68. timer.advance(1)
  69. self.assertEqual(q.get(timeout=TIMEOUT), 4)
  70. self.assertTrue(q.empty())
  71. timer.advance(1)
  72. self.assertEqual(q.get(timeout=TIMEOUT), 5)
  73. self.assertTrue(q.empty())
  74. timer.advance(1000)
  75. threading_helper.join_thread(t)
  76. self.assertTrue(q.empty())
  77. self.assertEqual(timer.time(), 5)
  78. def test_priority(self):
  79. l = []
  80. fun = lambda x: l.append(x)
  81. scheduler = sched.scheduler(time.time, time.sleep)
  82. cases = [
  83. ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
  84. ([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]),
  85. ([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]),
  86. ([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]),
  87. ]
  88. for priorities, expected in cases:
  89. with self.subTest(priorities=priorities, expected=expected):
  90. for priority in priorities:
  91. scheduler.enterabs(0.01, priority, fun, (priority,))
  92. scheduler.run()
  93. self.assertEqual(l, expected)
  94. # Cleanup:
  95. self.assertTrue(scheduler.empty())
  96. l.clear()
  97. def test_cancel(self):
  98. l = []
  99. fun = lambda x: l.append(x)
  100. scheduler = sched.scheduler(time.time, time.sleep)
  101. now = time.time()
  102. event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
  103. event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
  104. event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
  105. event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
  106. event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
  107. scheduler.cancel(event1)
  108. scheduler.cancel(event5)
  109. scheduler.run()
  110. self.assertEqual(l, [0.02, 0.03, 0.04])
  111. @threading_helper.requires_working_threading()
  112. def test_cancel_concurrent(self):
  113. q = queue.Queue()
  114. fun = q.put
  115. timer = Timer()
  116. scheduler = sched.scheduler(timer.time, timer.sleep)
  117. now = timer.time()
  118. event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
  119. event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
  120. event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
  121. event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
  122. event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
  123. t = threading.Thread(target=scheduler.run)
  124. t.start()
  125. timer.advance(1)
  126. self.assertEqual(q.get(timeout=TIMEOUT), 1)
  127. self.assertTrue(q.empty())
  128. scheduler.cancel(event2)
  129. scheduler.cancel(event5)
  130. timer.advance(1)
  131. self.assertTrue(q.empty())
  132. timer.advance(1)
  133. self.assertEqual(q.get(timeout=TIMEOUT), 3)
  134. self.assertTrue(q.empty())
  135. timer.advance(1)
  136. self.assertEqual(q.get(timeout=TIMEOUT), 4)
  137. self.assertTrue(q.empty())
  138. timer.advance(1000)
  139. threading_helper.join_thread(t)
  140. self.assertTrue(q.empty())
  141. self.assertEqual(timer.time(), 4)
  142. def test_cancel_correct_event(self):
  143. # bpo-19270
  144. events = []
  145. scheduler = sched.scheduler()
  146. scheduler.enterabs(1, 1, events.append, ("a",))
  147. b = scheduler.enterabs(1, 1, events.append, ("b",))
  148. scheduler.enterabs(1, 1, events.append, ("c",))
  149. scheduler.cancel(b)
  150. scheduler.run()
  151. self.assertEqual(events, ["a", "c"])
  152. def test_empty(self):
  153. l = []
  154. fun = lambda x: l.append(x)
  155. scheduler = sched.scheduler(time.time, time.sleep)
  156. self.assertTrue(scheduler.empty())
  157. for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
  158. z = scheduler.enterabs(x, 1, fun, (x,))
  159. self.assertFalse(scheduler.empty())
  160. scheduler.run()
  161. self.assertTrue(scheduler.empty())
  162. def test_queue(self):
  163. l = []
  164. fun = lambda x: l.append(x)
  165. scheduler = sched.scheduler(time.time, time.sleep)
  166. now = time.time()
  167. e5 = scheduler.enterabs(now + 0.05, 1, fun)
  168. e1 = scheduler.enterabs(now + 0.01, 1, fun)
  169. e2 = scheduler.enterabs(now + 0.02, 1, fun)
  170. e4 = scheduler.enterabs(now + 0.04, 1, fun)
  171. e3 = scheduler.enterabs(now + 0.03, 1, fun)
  172. # queue property is supposed to return an order list of
  173. # upcoming events
  174. self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
  175. def test_args_kwargs(self):
  176. seq = []
  177. def fun(*a, **b):
  178. seq.append((a, b))
  179. now = time.time()
  180. scheduler = sched.scheduler(time.time, time.sleep)
  181. scheduler.enterabs(now, 1, fun)
  182. scheduler.enterabs(now, 1, fun, argument=(1, 2))
  183. scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
  184. scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
  185. scheduler.run()
  186. self.assertCountEqual(seq, [
  187. ((), {}),
  188. ((1, 2), {}),
  189. (('a', 'b'), {}),
  190. ((1, 2), {'foo': 3})
  191. ])
  192. def test_run_non_blocking(self):
  193. l = []
  194. fun = lambda x: l.append(x)
  195. scheduler = sched.scheduler(time.time, time.sleep)
  196. for x in [10, 9, 8, 7, 6]:
  197. scheduler.enter(x, 1, fun, (x,))
  198. scheduler.run(blocking=False)
  199. self.assertEqual(l, [])
  200. if __name__ == "__main__":
  201. unittest.main()