test_mmap.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952
  1. from test.support import (
  2. requires, _2G, _4G, gc_collect, cpython_only, is_emscripten
  3. )
  4. from test.support.import_helper import import_module
  5. from test.support.os_helper import TESTFN, unlink
  6. import unittest
  7. import os
  8. import re
  9. import itertools
  10. import random
  11. import socket
  12. import string
  13. import sys
  14. import weakref
  15. # Skip test if we can't import mmap.
  16. mmap = import_module('mmap')
  17. PAGESIZE = mmap.PAGESIZE
  18. tagname_prefix = f'python_{os.getpid()}_test_mmap'
  19. def random_tagname(length=10):
  20. suffix = ''.join(random.choices(string.ascii_uppercase, k=length))
  21. return f'{tagname_prefix}_{suffix}'
  22. # Python's mmap module dup()s the file descriptor. Emscripten's FS layer
  23. # does not materialize file changes through a dupped fd to a new mmap.
  24. if is_emscripten:
  25. raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")
  26. class MmapTests(unittest.TestCase):
  27. def setUp(self):
  28. if os.path.exists(TESTFN):
  29. os.unlink(TESTFN)
  30. def tearDown(self):
  31. try:
  32. os.unlink(TESTFN)
  33. except OSError:
  34. pass
  35. def test_basic(self):
  36. # Test mmap module on Unix systems and Windows
  37. # Create a file to be mmap'ed.
  38. f = open(TESTFN, 'bw+')
  39. try:
  40. # Write 2 pages worth of data to the file
  41. f.write(b'\0'* PAGESIZE)
  42. f.write(b'foo')
  43. f.write(b'\0'* (PAGESIZE-3) )
  44. f.flush()
  45. m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
  46. finally:
  47. f.close()
  48. # Simple sanity checks
  49. tp = str(type(m)) # SF bug 128713: segfaulted on Linux
  50. self.assertEqual(m.find(b'foo'), PAGESIZE)
  51. self.assertEqual(len(m), 2*PAGESIZE)
  52. self.assertEqual(m[0], 0)
  53. self.assertEqual(m[0:3], b'\0\0\0')
  54. # Shouldn't crash on boundary (Issue #5292)
  55. self.assertRaises(IndexError, m.__getitem__, len(m))
  56. self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')
  57. # Modify the file's content
  58. m[0] = b'3'[0]
  59. m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
  60. # Check that the modification worked
  61. self.assertEqual(m[0], b'3'[0])
  62. self.assertEqual(m[0:3], b'3\0\0')
  63. self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
  64. m.flush()
  65. # Test doing a regular expression match in an mmap'ed file
  66. match = re.search(b'[A-Za-z]+', m)
  67. if match is None:
  68. self.fail('regex match on mmap failed!')
  69. else:
  70. start, end = match.span(0)
  71. length = end - start
  72. self.assertEqual(start, PAGESIZE)
  73. self.assertEqual(end, PAGESIZE + 6)
  74. # test seeking around (try to overflow the seek implementation)
  75. m.seek(0,0)
  76. self.assertEqual(m.tell(), 0)
  77. m.seek(42,1)
  78. self.assertEqual(m.tell(), 42)
  79. m.seek(0,2)
  80. self.assertEqual(m.tell(), len(m))
  81. # Try to seek to negative position...
  82. self.assertRaises(ValueError, m.seek, -1)
  83. # Try to seek beyond end of mmap...
  84. self.assertRaises(ValueError, m.seek, 1, 2)
  85. # Try to seek to negative position...
  86. self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
  87. # Try resizing map
  88. try:
  89. m.resize(512)
  90. except SystemError:
  91. # resize() not supported
  92. # No messages are printed, since the output of this test suite
  93. # would then be different across platforms.
  94. pass
  95. else:
  96. # resize() is supported
  97. self.assertEqual(len(m), 512)
  98. # Check that we can no longer seek beyond the new size.
  99. self.assertRaises(ValueError, m.seek, 513, 0)
  100. # Check that the underlying file is truncated too
  101. # (bug #728515)
  102. f = open(TESTFN, 'rb')
  103. try:
  104. f.seek(0, 2)
  105. self.assertEqual(f.tell(), 512)
  106. finally:
  107. f.close()
  108. self.assertEqual(m.size(), 512)
  109. m.close()
  110. def test_access_parameter(self):
  111. # Test for "access" keyword parameter
  112. mapsize = 10
  113. with open(TESTFN, "wb") as fp:
  114. fp.write(b"a"*mapsize)
  115. with open(TESTFN, "rb") as f:
  116. m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
  117. self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
  118. # Ensuring that readonly mmap can't be slice assigned
  119. try:
  120. m[:] = b'b'*mapsize
  121. except TypeError:
  122. pass
  123. else:
  124. self.fail("Able to write to readonly memory map")
  125. # Ensuring that readonly mmap can't be item assigned
  126. try:
  127. m[0] = b'b'
  128. except TypeError:
  129. pass
  130. else:
  131. self.fail("Able to write to readonly memory map")
  132. # Ensuring that readonly mmap can't be write() to
  133. try:
  134. m.seek(0,0)
  135. m.write(b'abc')
  136. except TypeError:
  137. pass
  138. else:
  139. self.fail("Able to write to readonly memory map")
  140. # Ensuring that readonly mmap can't be write_byte() to
  141. try:
  142. m.seek(0,0)
  143. m.write_byte(b'd')
  144. except TypeError:
  145. pass
  146. else:
  147. self.fail("Able to write to readonly memory map")
  148. # Ensuring that readonly mmap can't be resized
  149. try:
  150. m.resize(2*mapsize)
  151. except SystemError: # resize is not universally supported
  152. pass
  153. except TypeError:
  154. pass
  155. else:
  156. self.fail("Able to resize readonly memory map")
  157. with open(TESTFN, "rb") as fp:
  158. self.assertEqual(fp.read(), b'a'*mapsize,
  159. "Readonly memory map data file was modified")
  160. # Opening mmap with size too big
  161. with open(TESTFN, "r+b") as f:
  162. try:
  163. m = mmap.mmap(f.fileno(), mapsize+1)
  164. except ValueError:
  165. # we do not expect a ValueError on Windows
  166. # CAUTION: This also changes the size of the file on disk, and
  167. # later tests assume that the length hasn't changed. We need to
  168. # repair that.
  169. if sys.platform.startswith('win'):
  170. self.fail("Opening mmap with size+1 should work on Windows.")
  171. else:
  172. # we expect a ValueError on Unix, but not on Windows
  173. if not sys.platform.startswith('win'):
  174. self.fail("Opening mmap with size+1 should raise ValueError.")
  175. m.close()
  176. if sys.platform.startswith('win'):
  177. # Repair damage from the resizing test.
  178. with open(TESTFN, 'r+b') as f:
  179. f.truncate(mapsize)
  180. # Opening mmap with access=ACCESS_WRITE
  181. with open(TESTFN, "r+b") as f:
  182. m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
  183. # Modifying write-through memory map
  184. m[:] = b'c'*mapsize
  185. self.assertEqual(m[:], b'c'*mapsize,
  186. "Write-through memory map memory not updated properly.")
  187. m.flush()
  188. m.close()
  189. with open(TESTFN, 'rb') as f:
  190. stuff = f.read()
  191. self.assertEqual(stuff, b'c'*mapsize,
  192. "Write-through memory map data file not updated properly.")
  193. # Opening mmap with access=ACCESS_COPY
  194. with open(TESTFN, "r+b") as f:
  195. m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
  196. # Modifying copy-on-write memory map
  197. m[:] = b'd'*mapsize
  198. self.assertEqual(m[:], b'd' * mapsize,
  199. "Copy-on-write memory map data not written correctly.")
  200. m.flush()
  201. with open(TESTFN, "rb") as fp:
  202. self.assertEqual(fp.read(), b'c'*mapsize,
  203. "Copy-on-write test data file should not be modified.")
  204. # Ensuring copy-on-write maps cannot be resized
  205. self.assertRaises(TypeError, m.resize, 2*mapsize)
  206. m.close()
  207. # Ensuring invalid access parameter raises exception
  208. with open(TESTFN, "r+b") as f:
  209. self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
  210. if os.name == "posix":
  211. # Try incompatible flags, prot and access parameters.
  212. with open(TESTFN, "r+b") as f:
  213. self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
  214. flags=mmap.MAP_PRIVATE,
  215. prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
  216. # Try writing with PROT_EXEC and without PROT_WRITE
  217. prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
  218. with open(TESTFN, "r+b") as f:
  219. m = mmap.mmap(f.fileno(), mapsize, prot=prot)
  220. self.assertRaises(TypeError, m.write, b"abcdef")
  221. self.assertRaises(TypeError, m.write_byte, 0)
  222. m.close()
  223. def test_bad_file_desc(self):
  224. # Try opening a bad file descriptor...
  225. self.assertRaises(OSError, mmap.mmap, -2, 4096)
  226. def test_tougher_find(self):
  227. # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
  228. # searching for data with embedded \0 bytes didn't work.
  229. with open(TESTFN, 'wb+') as f:
  230. data = b'aabaac\x00deef\x00\x00aa\x00'
  231. n = len(data)
  232. f.write(data)
  233. f.flush()
  234. m = mmap.mmap(f.fileno(), n)
  235. for start in range(n+1):
  236. for finish in range(start, n+1):
  237. slice = data[start : finish]
  238. self.assertEqual(m.find(slice), data.find(slice))
  239. self.assertEqual(m.find(slice + b'x'), -1)
  240. m.close()
  241. def test_find_end(self):
  242. # test the new 'end' parameter works as expected
  243. with open(TESTFN, 'wb+') as f:
  244. data = b'one two ones'
  245. n = len(data)
  246. f.write(data)
  247. f.flush()
  248. m = mmap.mmap(f.fileno(), n)
  249. self.assertEqual(m.find(b'one'), 0)
  250. self.assertEqual(m.find(b'ones'), 8)
  251. self.assertEqual(m.find(b'one', 0, -1), 0)
  252. self.assertEqual(m.find(b'one', 1), 8)
  253. self.assertEqual(m.find(b'one', 1, -1), 8)
  254. self.assertEqual(m.find(b'one', 1, -2), -1)
  255. self.assertEqual(m.find(bytearray(b'one')), 0)
  256. def test_rfind(self):
  257. # test the new 'end' parameter works as expected
  258. with open(TESTFN, 'wb+') as f:
  259. data = b'one two ones'
  260. n = len(data)
  261. f.write(data)
  262. f.flush()
  263. m = mmap.mmap(f.fileno(), n)
  264. self.assertEqual(m.rfind(b'one'), 8)
  265. self.assertEqual(m.rfind(b'one '), 0)
  266. self.assertEqual(m.rfind(b'one', 0, -1), 8)
  267. self.assertEqual(m.rfind(b'one', 0, -2), 0)
  268. self.assertEqual(m.rfind(b'one', 1, -1), 8)
  269. self.assertEqual(m.rfind(b'one', 1, -2), -1)
  270. self.assertEqual(m.rfind(bytearray(b'one')), 8)
  271. def test_double_close(self):
  272. # make sure a double close doesn't crash on Solaris (Bug# 665913)
  273. with open(TESTFN, 'wb+') as f:
  274. f.write(2**16 * b'a') # Arbitrary character
  275. with open(TESTFN, 'rb') as f:
  276. mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
  277. mf.close()
  278. mf.close()
  279. def test_entire_file(self):
  280. # test mapping of entire file by passing 0 for map length
  281. with open(TESTFN, "wb+") as f:
  282. f.write(2**16 * b'm') # Arbitrary character
  283. with open(TESTFN, "rb+") as f, \
  284. mmap.mmap(f.fileno(), 0) as mf:
  285. self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
  286. self.assertEqual(mf.read(2**16), 2**16 * b"m")
  287. def test_length_0_offset(self):
  288. # Issue #10916: test mapping of remainder of file by passing 0 for
  289. # map length with an offset doesn't cause a segfault.
  290. # NOTE: allocation granularity is currently 65536 under Win64,
  291. # and therefore the minimum offset alignment.
  292. with open(TESTFN, "wb") as f:
  293. f.write((65536 * 2) * b'm') # Arbitrary character
  294. with open(TESTFN, "rb") as f:
  295. with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
  296. self.assertRaises(IndexError, mf.__getitem__, 80000)
  297. def test_length_0_large_offset(self):
  298. # Issue #10959: test mapping of a file by passing 0 for
  299. # map length with a large offset doesn't cause a segfault.
  300. with open(TESTFN, "wb") as f:
  301. f.write(115699 * b'm') # Arbitrary character
  302. with open(TESTFN, "w+b") as f:
  303. self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
  304. offset=2147418112)
  305. def test_move(self):
  306. # make move works everywhere (64-bit format problem earlier)
  307. with open(TESTFN, 'wb+') as f:
  308. f.write(b"ABCDEabcde") # Arbitrary character
  309. f.flush()
  310. mf = mmap.mmap(f.fileno(), 10)
  311. mf.move(5, 0, 5)
  312. self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
  313. mf.close()
  314. # more excessive test
  315. data = b"0123456789"
  316. for dest in range(len(data)):
  317. for src in range(len(data)):
  318. for count in range(len(data) - max(dest, src)):
  319. expected = data[:dest] + data[src:src+count] + data[dest+count:]
  320. m = mmap.mmap(-1, len(data))
  321. m[:] = data
  322. m.move(dest, src, count)
  323. self.assertEqual(m[:], expected)
  324. m.close()
  325. # segfault test (Issue 5387)
  326. m = mmap.mmap(-1, 100)
  327. offsets = [-100, -1, 0, 1, 100]
  328. for source, dest, size in itertools.product(offsets, offsets, offsets):
  329. try:
  330. m.move(source, dest, size)
  331. except ValueError:
  332. pass
  333. offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
  334. (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
  335. for source, dest, size in offsets:
  336. self.assertRaises(ValueError, m.move, source, dest, size)
  337. m.close()
  338. m = mmap.mmap(-1, 1) # single byte
  339. self.assertRaises(ValueError, m.move, 0, 0, 2)
  340. self.assertRaises(ValueError, m.move, 1, 0, 1)
  341. self.assertRaises(ValueError, m.move, 0, 1, 1)
  342. m.move(0, 0, 1)
  343. m.move(0, 0, 0)
  344. def test_anonymous(self):
  345. # anonymous mmap.mmap(-1, PAGE)
  346. m = mmap.mmap(-1, PAGESIZE)
  347. for x in range(PAGESIZE):
  348. self.assertEqual(m[x], 0,
  349. "anonymously mmap'ed contents should be zero")
  350. for x in range(PAGESIZE):
  351. b = x & 0xff
  352. m[x] = b
  353. self.assertEqual(m[x], b)
  354. def test_read_all(self):
  355. m = mmap.mmap(-1, 16)
  356. self.addCleanup(m.close)
  357. # With no parameters, or None or a negative argument, reads all
  358. m.write(bytes(range(16)))
  359. m.seek(0)
  360. self.assertEqual(m.read(), bytes(range(16)))
  361. m.seek(8)
  362. self.assertEqual(m.read(), bytes(range(8, 16)))
  363. m.seek(16)
  364. self.assertEqual(m.read(), b'')
  365. m.seek(3)
  366. self.assertEqual(m.read(None), bytes(range(3, 16)))
  367. m.seek(4)
  368. self.assertEqual(m.read(-1), bytes(range(4, 16)))
  369. m.seek(5)
  370. self.assertEqual(m.read(-2), bytes(range(5, 16)))
  371. m.seek(9)
  372. self.assertEqual(m.read(-42), bytes(range(9, 16)))
  373. def test_read_invalid_arg(self):
  374. m = mmap.mmap(-1, 16)
  375. self.addCleanup(m.close)
  376. self.assertRaises(TypeError, m.read, 'foo')
  377. self.assertRaises(TypeError, m.read, 5.5)
  378. self.assertRaises(TypeError, m.read, [1, 2, 3])
  379. def test_extended_getslice(self):
  380. # Test extended slicing by comparing with list slicing.
  381. s = bytes(reversed(range(256)))
  382. m = mmap.mmap(-1, len(s))
  383. m[:] = s
  384. self.assertEqual(m[:], s)
  385. indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
  386. for start in indices:
  387. for stop in indices:
  388. # Skip step 0 (invalid)
  389. for step in indices[1:]:
  390. self.assertEqual(m[start:stop:step],
  391. s[start:stop:step])
  392. def test_extended_set_del_slice(self):
  393. # Test extended slicing by comparing with list slicing.
  394. s = bytes(reversed(range(256)))
  395. m = mmap.mmap(-1, len(s))
  396. indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
  397. for start in indices:
  398. for stop in indices:
  399. # Skip invalid step 0
  400. for step in indices[1:]:
  401. m[:] = s
  402. self.assertEqual(m[:], s)
  403. L = list(s)
  404. # Make sure we have a slice of exactly the right length,
  405. # but with different data.
  406. data = L[start:stop:step]
  407. data = bytes(reversed(data))
  408. L[start:stop:step] = data
  409. m[start:stop:step] = data
  410. self.assertEqual(m[:], bytes(L))
  411. def make_mmap_file (self, f, halfsize):
  412. # Write 2 pages worth of data to the file
  413. f.write (b'\0' * halfsize)
  414. f.write (b'foo')
  415. f.write (b'\0' * (halfsize - 3))
  416. f.flush ()
  417. return mmap.mmap (f.fileno(), 0)
  418. def test_empty_file (self):
  419. f = open (TESTFN, 'w+b')
  420. f.close()
  421. with open(TESTFN, "rb") as f :
  422. self.assertRaisesRegex(ValueError,
  423. "cannot mmap an empty file",
  424. mmap.mmap, f.fileno(), 0,
  425. access=mmap.ACCESS_READ)
  426. def test_offset (self):
  427. f = open (TESTFN, 'w+b')
  428. try: # unlink TESTFN no matter what
  429. halfsize = mmap.ALLOCATIONGRANULARITY
  430. m = self.make_mmap_file (f, halfsize)
  431. m.close ()
  432. f.close ()
  433. mapsize = halfsize * 2
  434. # Try invalid offset
  435. f = open(TESTFN, "r+b")
  436. for offset in [-2, -1, None]:
  437. try:
  438. m = mmap.mmap(f.fileno(), mapsize, offset=offset)
  439. self.assertEqual(0, 1)
  440. except (ValueError, TypeError, OverflowError):
  441. pass
  442. else:
  443. self.assertEqual(0, 0)
  444. f.close()
  445. # Try valid offset, hopefully 8192 works on all OSes
  446. f = open(TESTFN, "r+b")
  447. m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
  448. self.assertEqual(m[0:3], b'foo')
  449. f.close()
  450. # Try resizing map
  451. try:
  452. m.resize(512)
  453. except SystemError:
  454. pass
  455. else:
  456. # resize() is supported
  457. self.assertEqual(len(m), 512)
  458. # Check that we can no longer seek beyond the new size.
  459. self.assertRaises(ValueError, m.seek, 513, 0)
  460. # Check that the content is not changed
  461. self.assertEqual(m[0:3], b'foo')
  462. # Check that the underlying file is truncated too
  463. f = open(TESTFN, 'rb')
  464. f.seek(0, 2)
  465. self.assertEqual(f.tell(), halfsize + 512)
  466. f.close()
  467. self.assertEqual(m.size(), halfsize + 512)
  468. m.close()
  469. finally:
  470. f.close()
  471. try:
  472. os.unlink(TESTFN)
  473. except OSError:
  474. pass
  475. def test_subclass(self):
  476. class anon_mmap(mmap.mmap):
  477. def __new__(klass, *args, **kwargs):
  478. return mmap.mmap.__new__(klass, -1, *args, **kwargs)
  479. anon_mmap(PAGESIZE)
  480. @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
  481. def test_prot_readonly(self):
  482. mapsize = 10
  483. with open(TESTFN, "wb") as fp:
  484. fp.write(b"a"*mapsize)
  485. with open(TESTFN, "rb") as f:
  486. m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
  487. self.assertRaises(TypeError, m.write, "foo")
  488. def test_error(self):
  489. self.assertIs(mmap.error, OSError)
  490. def test_io_methods(self):
  491. data = b"0123456789"
  492. with open(TESTFN, "wb") as fp:
  493. fp.write(b"x"*len(data))
  494. with open(TESTFN, "r+b") as f:
  495. m = mmap.mmap(f.fileno(), len(data))
  496. # Test write_byte()
  497. for i in range(len(data)):
  498. self.assertEqual(m.tell(), i)
  499. m.write_byte(data[i])
  500. self.assertEqual(m.tell(), i+1)
  501. self.assertRaises(ValueError, m.write_byte, b"x"[0])
  502. self.assertEqual(m[:], data)
  503. # Test read_byte()
  504. m.seek(0)
  505. for i in range(len(data)):
  506. self.assertEqual(m.tell(), i)
  507. self.assertEqual(m.read_byte(), data[i])
  508. self.assertEqual(m.tell(), i+1)
  509. self.assertRaises(ValueError, m.read_byte)
  510. # Test read()
  511. m.seek(3)
  512. self.assertEqual(m.read(3), b"345")
  513. self.assertEqual(m.tell(), 6)
  514. # Test write()
  515. m.seek(3)
  516. m.write(b"bar")
  517. self.assertEqual(m.tell(), 6)
  518. self.assertEqual(m[:], b"012bar6789")
  519. m.write(bytearray(b"baz"))
  520. self.assertEqual(m.tell(), 9)
  521. self.assertEqual(m[:], b"012barbaz9")
  522. self.assertRaises(ValueError, m.write, b"ba")
  523. def test_non_ascii_byte(self):
  524. for b in (129, 200, 255): # > 128
  525. m = mmap.mmap(-1, 1)
  526. m.write_byte(b)
  527. self.assertEqual(m[0], b)
  528. m.seek(0)
  529. self.assertEqual(m.read_byte(), b)
  530. m.close()
  531. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  532. def test_tagname(self):
  533. data1 = b"0123456789"
  534. data2 = b"abcdefghij"
  535. assert len(data1) == len(data2)
  536. tagname1 = random_tagname()
  537. tagname2 = random_tagname()
  538. # Test same tag
  539. m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
  540. m1[:] = data1
  541. m2 = mmap.mmap(-1, len(data2), tagname=tagname1)
  542. m2[:] = data2
  543. self.assertEqual(m1[:], data2)
  544. self.assertEqual(m2[:], data2)
  545. m2.close()
  546. m1.close()
  547. # Test different tag
  548. m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
  549. m1[:] = data1
  550. m2 = mmap.mmap(-1, len(data2), tagname=tagname2)
  551. m2[:] = data2
  552. self.assertEqual(m1[:], data1)
  553. self.assertEqual(m2[:], data2)
  554. m2.close()
  555. m1.close()
  556. @cpython_only
  557. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  558. def test_sizeof(self):
  559. m1 = mmap.mmap(-1, 100)
  560. tagname = random_tagname()
  561. m2 = mmap.mmap(-1, 100, tagname=tagname)
  562. self.assertEqual(sys.getsizeof(m2),
  563. sys.getsizeof(m1) + len(tagname) + 1)
  564. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  565. def test_crasher_on_windows(self):
  566. # Should not crash (Issue 1733986)
  567. tagname = random_tagname()
  568. m = mmap.mmap(-1, 1000, tagname=tagname)
  569. try:
  570. mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size
  571. except:
  572. pass
  573. m.close()
  574. # Should not crash (Issue 5385)
  575. with open(TESTFN, "wb") as fp:
  576. fp.write(b"x"*10)
  577. f = open(TESTFN, "r+b")
  578. m = mmap.mmap(f.fileno(), 0)
  579. f.close()
  580. try:
  581. m.resize(0) # will raise OSError
  582. except:
  583. pass
  584. try:
  585. m[:]
  586. except:
  587. pass
  588. m.close()
  589. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  590. def test_invalid_descriptor(self):
  591. # socket file descriptors are valid, but out of range
  592. # for _get_osfhandle, causing a crash when validating the
  593. # parameters to _get_osfhandle.
  594. s = socket.socket()
  595. try:
  596. with self.assertRaises(OSError):
  597. m = mmap.mmap(s.fileno(), 10)
  598. finally:
  599. s.close()
  600. def test_context_manager(self):
  601. with mmap.mmap(-1, 10) as m:
  602. self.assertFalse(m.closed)
  603. self.assertTrue(m.closed)
  604. def test_context_manager_exception(self):
  605. # Test that the OSError gets passed through
  606. with self.assertRaises(Exception) as exc:
  607. with mmap.mmap(-1, 10) as m:
  608. raise OSError
  609. self.assertIsInstance(exc.exception, OSError,
  610. "wrong exception raised in context manager")
  611. self.assertTrue(m.closed, "context manager failed")
  612. def test_weakref(self):
  613. # Check mmap objects are weakrefable
  614. mm = mmap.mmap(-1, 16)
  615. wr = weakref.ref(mm)
  616. self.assertIs(wr(), mm)
  617. del mm
  618. gc_collect()
  619. self.assertIs(wr(), None)
  620. def test_write_returning_the_number_of_bytes_written(self):
  621. mm = mmap.mmap(-1, 16)
  622. self.assertEqual(mm.write(b""), 0)
  623. self.assertEqual(mm.write(b"x"), 1)
  624. self.assertEqual(mm.write(b"yz"), 2)
  625. self.assertEqual(mm.write(b"python"), 6)
  626. def test_resize_past_pos(self):
  627. m = mmap.mmap(-1, 8192)
  628. self.addCleanup(m.close)
  629. m.read(5000)
  630. try:
  631. m.resize(4096)
  632. except SystemError:
  633. self.skipTest("resizing not supported")
  634. self.assertEqual(m.read(14), b'')
  635. self.assertRaises(ValueError, m.read_byte)
  636. self.assertRaises(ValueError, m.write_byte, 42)
  637. self.assertRaises(ValueError, m.write, b'abc')
  638. def test_concat_repeat_exception(self):
  639. m = mmap.mmap(-1, 16)
  640. with self.assertRaises(TypeError):
  641. m + m
  642. with self.assertRaises(TypeError):
  643. m * 2
  644. def test_flush_return_value(self):
  645. # mm.flush() should return None on success, raise an
  646. # exception on error under all platforms.
  647. mm = mmap.mmap(-1, 16)
  648. self.addCleanup(mm.close)
  649. mm.write(b'python')
  650. result = mm.flush()
  651. self.assertIsNone(result)
  652. if sys.platform.startswith('linux'):
  653. # 'offset' must be a multiple of mmap.PAGESIZE on Linux.
  654. # See bpo-34754 for details.
  655. self.assertRaises(OSError, mm.flush, 1, len(b'python'))
  656. def test_repr(self):
  657. open_mmap_repr_pat = re.compile(
  658. r"<mmap.mmap closed=False, "
  659. r"access=(?P<access>\S+), "
  660. r"length=(?P<length>\d+), "
  661. r"pos=(?P<pos>\d+), "
  662. r"offset=(?P<offset>\d+)>")
  663. closed_mmap_repr_pat = re.compile(r"<mmap.mmap closed=True>")
  664. mapsizes = (50, 100, 1_000, 1_000_000, 10_000_000)
  665. offsets = tuple((mapsize // 2 // mmap.ALLOCATIONGRANULARITY)
  666. * mmap.ALLOCATIONGRANULARITY for mapsize in mapsizes)
  667. for offset, mapsize in zip(offsets, mapsizes):
  668. data = b'a' * mapsize
  669. length = mapsize - offset
  670. accesses = ('ACCESS_DEFAULT', 'ACCESS_READ',
  671. 'ACCESS_COPY', 'ACCESS_WRITE')
  672. positions = (0, length//10, length//5, length//4)
  673. with open(TESTFN, "wb+") as fp:
  674. fp.write(data)
  675. fp.flush()
  676. for access, pos in itertools.product(accesses, positions):
  677. accint = getattr(mmap, access)
  678. with mmap.mmap(fp.fileno(),
  679. length,
  680. access=accint,
  681. offset=offset) as mm:
  682. mm.seek(pos)
  683. match = open_mmap_repr_pat.match(repr(mm))
  684. self.assertIsNotNone(match)
  685. self.assertEqual(match.group('access'), access)
  686. self.assertEqual(match.group('length'), str(length))
  687. self.assertEqual(match.group('pos'), str(pos))
  688. self.assertEqual(match.group('offset'), str(offset))
  689. match = closed_mmap_repr_pat.match(repr(mm))
  690. self.assertIsNotNone(match)
  691. @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise')
  692. def test_madvise(self):
  693. size = 2 * PAGESIZE
  694. m = mmap.mmap(-1, size)
  695. with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
  696. m.madvise(mmap.MADV_NORMAL, size)
  697. with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
  698. m.madvise(mmap.MADV_NORMAL, -1)
  699. with self.assertRaisesRegex(ValueError, "madvise length invalid"):
  700. m.madvise(mmap.MADV_NORMAL, 0, -1)
  701. with self.assertRaisesRegex(OverflowError, "madvise length too large"):
  702. m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize)
  703. self.assertEqual(m.madvise(mmap.MADV_NORMAL), None)
  704. self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None)
  705. self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None)
  706. self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
  707. self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
  708. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  709. def test_resize_up_when_mapped_to_pagefile(self):
  710. """If the mmap is backed by the pagefile ensure a resize up can happen
  711. and that the original data is still in place
  712. """
  713. start_size = PAGESIZE
  714. new_size = 2 * start_size
  715. data = bytes(random.getrandbits(8) for _ in range(start_size))
  716. m = mmap.mmap(-1, start_size)
  717. m[:] = data
  718. m.resize(new_size)
  719. self.assertEqual(len(m), new_size)
  720. self.assertEqual(m[:start_size], data[:start_size])
  721. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  722. def test_resize_down_when_mapped_to_pagefile(self):
  723. """If the mmap is backed by the pagefile ensure a resize down up can happen
  724. and that a truncated form of the original data is still in place
  725. """
  726. start_size = PAGESIZE
  727. new_size = start_size // 2
  728. data = bytes(random.getrandbits(8) for _ in range(start_size))
  729. m = mmap.mmap(-1, start_size)
  730. m[:] = data
  731. m.resize(new_size)
  732. self.assertEqual(len(m), new_size)
  733. self.assertEqual(m[:new_size], data[:new_size])
  734. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  735. def test_resize_fails_if_mapping_held_elsewhere(self):
  736. """If more than one mapping is held against a named file on Windows, neither
  737. mapping can be resized
  738. """
  739. start_size = 2 * PAGESIZE
  740. reduced_size = PAGESIZE
  741. f = open(TESTFN, 'wb+')
  742. f.truncate(start_size)
  743. try:
  744. m1 = mmap.mmap(f.fileno(), start_size)
  745. m2 = mmap.mmap(f.fileno(), start_size)
  746. with self.assertRaises(OSError):
  747. m1.resize(reduced_size)
  748. with self.assertRaises(OSError):
  749. m2.resize(reduced_size)
  750. m2.close()
  751. m1.resize(reduced_size)
  752. self.assertEqual(m1.size(), reduced_size)
  753. self.assertEqual(os.stat(f.fileno()).st_size, reduced_size)
  754. finally:
  755. f.close()
  756. @unittest.skipUnless(os.name == 'nt', 'requires Windows')
  757. def test_resize_succeeds_with_error_for_second_named_mapping(self):
  758. """If a more than one mapping exists of the same name, none of them can
  759. be resized: they'll raise an Exception and leave the original mapping intact
  760. """
  761. start_size = 2 * PAGESIZE
  762. reduced_size = PAGESIZE
  763. tagname = random_tagname()
  764. data_length = 8
  765. data = bytes(random.getrandbits(8) for _ in range(data_length))
  766. m1 = mmap.mmap(-1, start_size, tagname=tagname)
  767. m2 = mmap.mmap(-1, start_size, tagname=tagname)
  768. m1[:data_length] = data
  769. self.assertEqual(m2[:data_length], data)
  770. with self.assertRaises(OSError):
  771. m1.resize(reduced_size)
  772. self.assertEqual(m1.size(), start_size)
  773. self.assertEqual(m1[:data_length], data)
  774. self.assertEqual(m2[:data_length], data)
  775. class LargeMmapTests(unittest.TestCase):
  776. def setUp(self):
  777. unlink(TESTFN)
  778. def tearDown(self):
  779. unlink(TESTFN)
  780. def _make_test_file(self, num_zeroes, tail):
  781. if sys.platform[:3] == 'win' or sys.platform == 'darwin':
  782. requires('largefile',
  783. 'test requires %s bytes and a long time to run' % str(0x180000000))
  784. f = open(TESTFN, 'w+b')
  785. try:
  786. f.seek(num_zeroes)
  787. f.write(tail)
  788. f.flush()
  789. except (OSError, OverflowError, ValueError):
  790. try:
  791. f.close()
  792. except (OSError, OverflowError):
  793. pass
  794. raise unittest.SkipTest("filesystem does not have largefile support")
  795. return f
  796. def test_large_offset(self):
  797. with self._make_test_file(0x14FFFFFFF, b" ") as f:
  798. with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
  799. self.assertEqual(m[0xFFFFFFF], 32)
  800. def test_large_filesize(self):
  801. with self._make_test_file(0x17FFFFFFF, b" ") as f:
  802. if sys.maxsize < 0x180000000:
  803. # On 32 bit platforms the file is larger than sys.maxsize so
  804. # mapping the whole file should fail -- Issue #16743
  805. with self.assertRaises(OverflowError):
  806. mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
  807. with self.assertRaises(ValueError):
  808. mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
  809. with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
  810. self.assertEqual(m.size(), 0x180000000)
  811. # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
  812. def _test_around_boundary(self, boundary):
  813. tail = b' DEARdear '
  814. start = boundary - len(tail) // 2
  815. end = start + len(tail)
  816. with self._make_test_file(start, tail) as f:
  817. with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
  818. self.assertEqual(m[start:end], tail)
  819. @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
  820. def test_around_2GB(self):
  821. self._test_around_boundary(_2G)
  822. @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
  823. def test_around_4GB(self):
  824. self._test_around_boundary(_4G)
  825. if __name__ == '__main__':
  826. unittest.main()