test_fcntl.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. """Test program for the fcntl C module.
  2. """
  3. import platform
  4. import os
  5. import struct
  6. import sys
  7. import unittest
  8. from multiprocessing import Process
  9. from test.support import verbose, cpython_only
  10. from test.support.import_helper import import_module
  11. from test.support.os_helper import TESTFN, unlink
  12. # Skip test if no fcntl module.
  13. fcntl = import_module('fcntl')
  14. def get_lockdata():
  15. try:
  16. os.O_LARGEFILE
  17. except AttributeError:
  18. start_len = "ll"
  19. else:
  20. start_len = "qq"
  21. if (sys.platform.startswith(('netbsd', 'freebsd', 'openbsd'))
  22. or sys.platform == 'darwin'):
  23. if struct.calcsize('l') == 8:
  24. off_t = 'l'
  25. pid_t = 'i'
  26. else:
  27. off_t = 'lxxxx'
  28. pid_t = 'l'
  29. lockdata = struct.pack(off_t + off_t + pid_t + 'hh', 0, 0, 0,
  30. fcntl.F_WRLCK, 0)
  31. elif sys.platform.startswith('gnukfreebsd'):
  32. lockdata = struct.pack('qqihhi', 0, 0, 0, fcntl.F_WRLCK, 0, 0)
  33. elif sys.platform in ['hp-uxB', 'unixware7']:
  34. lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
  35. else:
  36. lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
  37. if lockdata:
  38. if verbose:
  39. print('struct.pack: ', repr(lockdata))
  40. return lockdata
  41. lockdata = get_lockdata()
  42. class BadFile:
  43. def __init__(self, fn):
  44. self.fn = fn
  45. def fileno(self):
  46. return self.fn
  47. def try_lockf_on_other_process_fail(fname, cmd):
  48. f = open(fname, 'wb+')
  49. try:
  50. fcntl.lockf(f, cmd)
  51. except BlockingIOError:
  52. pass
  53. finally:
  54. f.close()
  55. def try_lockf_on_other_process(fname, cmd):
  56. f = open(fname, 'wb+')
  57. fcntl.lockf(f, cmd)
  58. fcntl.lockf(f, fcntl.LOCK_UN)
  59. f.close()
  60. class TestFcntl(unittest.TestCase):
  61. def setUp(self):
  62. self.f = None
  63. def tearDown(self):
  64. if self.f and not self.f.closed:
  65. self.f.close()
  66. unlink(TESTFN)
  67. def test_fcntl_fileno(self):
  68. # the example from the library docs
  69. self.f = open(TESTFN, 'wb')
  70. rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
  71. if verbose:
  72. print('Status from fcntl with O_NONBLOCK: ', rv)
  73. rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
  74. if verbose:
  75. print('String from fcntl with F_SETLKW: ', repr(rv))
  76. self.f.close()
  77. def test_fcntl_file_descriptor(self):
  78. # again, but pass the file rather than numeric descriptor
  79. self.f = open(TESTFN, 'wb')
  80. rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK)
  81. if verbose:
  82. print('Status from fcntl with O_NONBLOCK: ', rv)
  83. rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
  84. if verbose:
  85. print('String from fcntl with F_SETLKW: ', repr(rv))
  86. self.f.close()
  87. def test_fcntl_bad_file(self):
  88. with self.assertRaises(ValueError):
  89. fcntl.fcntl(-1, fcntl.F_SETFL, os.O_NONBLOCK)
  90. with self.assertRaises(ValueError):
  91. fcntl.fcntl(BadFile(-1), fcntl.F_SETFL, os.O_NONBLOCK)
  92. with self.assertRaises(TypeError):
  93. fcntl.fcntl('spam', fcntl.F_SETFL, os.O_NONBLOCK)
  94. with self.assertRaises(TypeError):
  95. fcntl.fcntl(BadFile('spam'), fcntl.F_SETFL, os.O_NONBLOCK)
  96. @cpython_only
  97. def test_fcntl_bad_file_overflow(self):
  98. from _testcapi import INT_MAX, INT_MIN
  99. # Issue 15989
  100. with self.assertRaises(OverflowError):
  101. fcntl.fcntl(INT_MAX + 1, fcntl.F_SETFL, os.O_NONBLOCK)
  102. with self.assertRaises(OverflowError):
  103. fcntl.fcntl(BadFile(INT_MAX + 1), fcntl.F_SETFL, os.O_NONBLOCK)
  104. with self.assertRaises(OverflowError):
  105. fcntl.fcntl(INT_MIN - 1, fcntl.F_SETFL, os.O_NONBLOCK)
  106. with self.assertRaises(OverflowError):
  107. fcntl.fcntl(BadFile(INT_MIN - 1), fcntl.F_SETFL, os.O_NONBLOCK)
  108. @unittest.skipIf(
  109. platform.machine().startswith('arm') and platform.system() == 'Linux',
  110. "ARM Linux returns EINVAL for F_NOTIFY DN_MULTISHOT")
  111. def test_fcntl_64_bit(self):
  112. # Issue #1309352: fcntl shouldn't fail when the third arg fits in a
  113. # C 'long' but not in a C 'int'.
  114. try:
  115. cmd = fcntl.F_NOTIFY
  116. # This flag is larger than 2**31 in 64-bit builds
  117. flags = fcntl.DN_MULTISHOT
  118. except AttributeError:
  119. self.skipTest("F_NOTIFY or DN_MULTISHOT unavailable")
  120. fd = os.open(os.path.dirname(os.path.abspath(TESTFN)), os.O_RDONLY)
  121. try:
  122. fcntl.fcntl(fd, cmd, flags)
  123. finally:
  124. os.close(fd)
  125. def test_flock(self):
  126. # Solaris needs readable file for shared lock
  127. self.f = open(TESTFN, 'wb+')
  128. fileno = self.f.fileno()
  129. fcntl.flock(fileno, fcntl.LOCK_SH)
  130. fcntl.flock(fileno, fcntl.LOCK_UN)
  131. fcntl.flock(self.f, fcntl.LOCK_SH | fcntl.LOCK_NB)
  132. fcntl.flock(self.f, fcntl.LOCK_UN)
  133. fcntl.flock(fileno, fcntl.LOCK_EX)
  134. fcntl.flock(fileno, fcntl.LOCK_UN)
  135. self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
  136. self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
  137. @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
  138. def test_lockf_exclusive(self):
  139. self.f = open(TESTFN, 'wb+')
  140. cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
  141. fcntl.lockf(self.f, cmd)
  142. p = Process(target=try_lockf_on_other_process_fail, args=(TESTFN, cmd))
  143. p.start()
  144. p.join()
  145. fcntl.lockf(self.f, fcntl.LOCK_UN)
  146. self.assertEqual(p.exitcode, 0)
  147. @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
  148. def test_lockf_share(self):
  149. self.f = open(TESTFN, 'wb+')
  150. cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
  151. fcntl.lockf(self.f, cmd)
  152. p = Process(target=try_lockf_on_other_process, args=(TESTFN, cmd))
  153. p.start()
  154. p.join()
  155. fcntl.lockf(self.f, fcntl.LOCK_UN)
  156. self.assertEqual(p.exitcode, 0)
  157. @cpython_only
  158. def test_flock_overflow(self):
  159. import _testcapi
  160. self.assertRaises(OverflowError, fcntl.flock, _testcapi.INT_MAX+1,
  161. fcntl.LOCK_SH)
  162. @unittest.skipIf(sys.platform != 'darwin', "F_GETPATH is only available on macos")
  163. def test_fcntl_f_getpath(self):
  164. self.f = open(TESTFN, 'wb')
  165. expected = os.path.abspath(TESTFN).encode('utf-8')
  166. res = fcntl.fcntl(self.f.fileno(), fcntl.F_GETPATH, bytes(len(expected)))
  167. self.assertEqual(expected, res)
  168. @unittest.skipUnless(
  169. hasattr(fcntl, "F_SETPIPE_SZ") and hasattr(fcntl, "F_GETPIPE_SZ"),
  170. "F_SETPIPE_SZ and F_GETPIPE_SZ are not available on all platforms.")
  171. def test_fcntl_f_pipesize(self):
  172. test_pipe_r, test_pipe_w = os.pipe()
  173. try:
  174. # Get the default pipesize with F_GETPIPE_SZ
  175. pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
  176. pipesize = pipesize_default // 2 # A new value to detect change.
  177. if pipesize < 512: # the POSIX minimum
  178. raise unittest.SkitTest(
  179. 'default pipesize too small to perform test.')
  180. fcntl.fcntl(test_pipe_w, fcntl.F_SETPIPE_SZ, pipesize)
  181. self.assertEqual(fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ),
  182. pipesize)
  183. finally:
  184. os.close(test_pipe_r)
  185. os.close(test_pipe_w)
  186. if __name__ == '__main__':
  187. unittest.main()