test_select.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import errno
  2. import os
  3. import select
  4. import subprocess
  5. import sys
  6. import textwrap
  7. import unittest
  8. from test import support
  9. support.requires_working_socket(module=True)
  10. @unittest.skipIf((sys.platform[:3]=='win'),
  11. "can't easily test on this system")
  12. class SelectTestCase(unittest.TestCase):
  13. class Nope:
  14. pass
  15. class Almost:
  16. def fileno(self):
  17. return 'fileno'
  18. def test_error_conditions(self):
  19. self.assertRaises(TypeError, select.select, 1, 2, 3)
  20. self.assertRaises(TypeError, select.select, [self.Nope()], [], [])
  21. self.assertRaises(TypeError, select.select, [self.Almost()], [], [])
  22. self.assertRaises(TypeError, select.select, [], [], [], "not a number")
  23. self.assertRaises(ValueError, select.select, [], [], [], -1)
  24. # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
  25. @unittest.skipIf(sys.platform.startswith('freebsd'),
  26. 'skip because of a FreeBSD bug: kern/155606')
  27. def test_errno(self):
  28. with open(__file__, 'rb') as fp:
  29. fd = fp.fileno()
  30. fp.close()
  31. try:
  32. select.select([fd], [], [], 0)
  33. except OSError as err:
  34. self.assertEqual(err.errno, errno.EBADF)
  35. else:
  36. self.fail("exception not raised")
  37. def test_returned_list_identity(self):
  38. # See issue #8329
  39. r, w, x = select.select([], [], [], 1)
  40. self.assertIsNot(r, w)
  41. self.assertIsNot(r, x)
  42. self.assertIsNot(w, x)
  43. @support.requires_fork()
  44. def test_select(self):
  45. code = textwrap.dedent('''
  46. import time
  47. for i in range(10):
  48. print("testing...", flush=True)
  49. time.sleep(0.050)
  50. ''')
  51. cmd = [sys.executable, '-I', '-c', code]
  52. with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
  53. pipe = proc.stdout
  54. for timeout in (0, 1, 2, 4, 8, 16) + (None,)*10:
  55. if support.verbose:
  56. print(f'timeout = {timeout}')
  57. rfd, wfd, xfd = select.select([pipe], [], [], timeout)
  58. self.assertEqual(wfd, [])
  59. self.assertEqual(xfd, [])
  60. if not rfd:
  61. continue
  62. if rfd == [pipe]:
  63. line = pipe.readline()
  64. if support.verbose:
  65. print(repr(line))
  66. if not line:
  67. if support.verbose:
  68. print('EOF')
  69. break
  70. continue
  71. self.fail('Unexpected return values from select():',
  72. rfd, wfd, xfd)
  73. # Issue 16230: Crash on select resized list
  74. @unittest.skipIf(
  75. support.is_emscripten, "Emscripten cannot select a fd multiple times."
  76. )
  77. def test_select_mutated(self):
  78. a = []
  79. class F:
  80. def fileno(self):
  81. del a[-1]
  82. return sys.__stdout__.fileno()
  83. a[:] = [F()] * 10
  84. self.assertEqual(select.select([], a, []), ([], a[:5], []))
  85. def test_disallow_instantiation(self):
  86. support.check_disallow_instantiation(self, type(select.poll()))
  87. if hasattr(select, 'devpoll'):
  88. support.check_disallow_instantiation(self, type(select.devpoll()))
  89. def tearDownModule():
  90. support.reap_children()
  91. if __name__ == "__main__":
  92. unittest.main()