| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- import queue
- import sched
- import threading
- import time
- import unittest
- from test import support
- from test.support import threading_helper
- TIMEOUT = support.SHORT_TIMEOUT
- class Timer:
- def __init__(self):
- self._cond = threading.Condition()
- self._time = 0
- self._stop = 0
- def time(self):
- with self._cond:
- return self._time
- # increase the time but not beyond the established limit
- def sleep(self, t):
- assert t >= 0
- with self._cond:
- t += self._time
- while self._stop < t:
- self._time = self._stop
- self._cond.wait()
- self._time = t
- # advance time limit for user code
- def advance(self, t):
- assert t >= 0
- with self._cond:
- self._stop += t
- self._cond.notify_all()
- class TestCase(unittest.TestCase):
- def test_enter(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
- z = scheduler.enter(x, 1, fun, (x,))
- scheduler.run()
- self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
- def test_enterabs(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
- z = scheduler.enterabs(x, 1, fun, (x,))
- scheduler.run()
- self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
- @threading_helper.requires_working_threading()
- def test_enter_concurrent(self):
- q = queue.Queue()
- fun = q.put
- timer = Timer()
- scheduler = sched.scheduler(timer.time, timer.sleep)
- scheduler.enter(1, 1, fun, (1,))
- scheduler.enter(3, 1, fun, (3,))
- t = threading.Thread(target=scheduler.run)
- t.start()
- timer.advance(1)
- self.assertEqual(q.get(timeout=TIMEOUT), 1)
- self.assertTrue(q.empty())
- for x in [4, 5, 2]:
- z = scheduler.enter(x - 1, 1, fun, (x,))
- timer.advance(2)
- self.assertEqual(q.get(timeout=TIMEOUT), 2)
- self.assertEqual(q.get(timeout=TIMEOUT), 3)
- self.assertTrue(q.empty())
- timer.advance(1)
- self.assertEqual(q.get(timeout=TIMEOUT), 4)
- self.assertTrue(q.empty())
- timer.advance(1)
- self.assertEqual(q.get(timeout=TIMEOUT), 5)
- self.assertTrue(q.empty())
- timer.advance(1000)
- threading_helper.join_thread(t)
- self.assertTrue(q.empty())
- self.assertEqual(timer.time(), 5)
- def test_priority(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- cases = [
- ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
- ([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]),
- ([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]),
- ([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]),
- ]
- for priorities, expected in cases:
- with self.subTest(priorities=priorities, expected=expected):
- for priority in priorities:
- scheduler.enterabs(0.01, priority, fun, (priority,))
- scheduler.run()
- self.assertEqual(l, expected)
- # Cleanup:
- self.assertTrue(scheduler.empty())
- l.clear()
- def test_cancel(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- now = time.time()
- event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
- event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
- event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
- event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
- event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
- scheduler.cancel(event1)
- scheduler.cancel(event5)
- scheduler.run()
- self.assertEqual(l, [0.02, 0.03, 0.04])
- @threading_helper.requires_working_threading()
- def test_cancel_concurrent(self):
- q = queue.Queue()
- fun = q.put
- timer = Timer()
- scheduler = sched.scheduler(timer.time, timer.sleep)
- now = timer.time()
- event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
- event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
- event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
- event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
- event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
- t = threading.Thread(target=scheduler.run)
- t.start()
- timer.advance(1)
- self.assertEqual(q.get(timeout=TIMEOUT), 1)
- self.assertTrue(q.empty())
- scheduler.cancel(event2)
- scheduler.cancel(event5)
- timer.advance(1)
- self.assertTrue(q.empty())
- timer.advance(1)
- self.assertEqual(q.get(timeout=TIMEOUT), 3)
- self.assertTrue(q.empty())
- timer.advance(1)
- self.assertEqual(q.get(timeout=TIMEOUT), 4)
- self.assertTrue(q.empty())
- timer.advance(1000)
- threading_helper.join_thread(t)
- self.assertTrue(q.empty())
- self.assertEqual(timer.time(), 4)
- def test_cancel_correct_event(self):
- # bpo-19270
- events = []
- scheduler = sched.scheduler()
- scheduler.enterabs(1, 1, events.append, ("a",))
- b = scheduler.enterabs(1, 1, events.append, ("b",))
- scheduler.enterabs(1, 1, events.append, ("c",))
- scheduler.cancel(b)
- scheduler.run()
- self.assertEqual(events, ["a", "c"])
- def test_empty(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- self.assertTrue(scheduler.empty())
- for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
- z = scheduler.enterabs(x, 1, fun, (x,))
- self.assertFalse(scheduler.empty())
- scheduler.run()
- self.assertTrue(scheduler.empty())
- def test_queue(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- now = time.time()
- e5 = scheduler.enterabs(now + 0.05, 1, fun)
- e1 = scheduler.enterabs(now + 0.01, 1, fun)
- e2 = scheduler.enterabs(now + 0.02, 1, fun)
- e4 = scheduler.enterabs(now + 0.04, 1, fun)
- e3 = scheduler.enterabs(now + 0.03, 1, fun)
- # queue property is supposed to return an order list of
- # upcoming events
- self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
- def test_args_kwargs(self):
- seq = []
- def fun(*a, **b):
- seq.append((a, b))
- now = time.time()
- scheduler = sched.scheduler(time.time, time.sleep)
- scheduler.enterabs(now, 1, fun)
- scheduler.enterabs(now, 1, fun, argument=(1, 2))
- scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
- scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
- scheduler.run()
- self.assertCountEqual(seq, [
- ((), {}),
- ((1, 2), {}),
- (('a', 'b'), {}),
- ((1, 2), {'foo': 3})
- ])
- def test_run_non_blocking(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- for x in [10, 9, 8, 7, 6]:
- scheduler.enter(x, 1, fun, (x,))
- scheduler.run(blocking=False)
- self.assertEqual(l, [])
- if __name__ == "__main__":
- unittest.main()
|