| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- """Test largefile support on system where this makes sense.
- """
- import os
- import stat
- import sys
- import unittest
- import socket
- import shutil
- import threading
- from test.support import requires, bigmemtest
- from test.support import SHORT_TIMEOUT
- from test.support import socket_helper
- from test.support.os_helper import TESTFN, unlink
- import io # C implementation of io
- import _pyio as pyio # Python implementation of io
- # size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes)
- size = 2_500_000_000
- TESTFN2 = TESTFN + '2'
- class LargeFileTest:
- def setUp(self):
- if os.path.exists(TESTFN):
- mode = 'r+b'
- else:
- mode = 'w+b'
- with self.open(TESTFN, mode) as f:
- current_size = os.fstat(f.fileno())[stat.ST_SIZE]
- if current_size == size+1:
- return
- if current_size == 0:
- f.write(b'z')
- f.seek(0)
- f.seek(size)
- f.write(b'a')
- f.flush()
- self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
- @classmethod
- def tearDownClass(cls):
- with cls.open(TESTFN, 'wb'):
- pass
- if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
- raise cls.failureException('File was not truncated by opening '
- 'with mode "wb"')
- unlink(TESTFN2)
- class TestFileMethods(LargeFileTest):
- """Test that each file function works as expected for large
- (i.e. > 2 GiB) files.
- """
- # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes,
- # so memuse=2 is needed
- @bigmemtest(size=size, memuse=2, dry_run=False)
- def test_large_read(self, _size):
- # bpo-24658: Test that a read greater than 2GB does not fail.
- with self.open(TESTFN, "rb") as f:
- self.assertEqual(len(f.read()), size + 1)
- self.assertEqual(f.tell(), size + 1)
- def test_osstat(self):
- self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
- def test_seek_read(self):
- with self.open(TESTFN, 'rb') as f:
- self.assertEqual(f.tell(), 0)
- self.assertEqual(f.read(1), b'z')
- self.assertEqual(f.tell(), 1)
- f.seek(0)
- self.assertEqual(f.tell(), 0)
- f.seek(0, 0)
- self.assertEqual(f.tell(), 0)
- f.seek(42)
- self.assertEqual(f.tell(), 42)
- f.seek(42, 0)
- self.assertEqual(f.tell(), 42)
- f.seek(42, 1)
- self.assertEqual(f.tell(), 84)
- f.seek(0, 1)
- self.assertEqual(f.tell(), 84)
- f.seek(0, 2) # seek from the end
- self.assertEqual(f.tell(), size + 1 + 0)
- f.seek(-10, 2)
- self.assertEqual(f.tell(), size + 1 - 10)
- f.seek(-size-1, 2)
- self.assertEqual(f.tell(), 0)
- f.seek(size)
- self.assertEqual(f.tell(), size)
- # the 'a' that was written at the end of file above
- self.assertEqual(f.read(1), b'a')
- f.seek(-size-1, 1)
- self.assertEqual(f.read(1), b'z')
- self.assertEqual(f.tell(), 1)
- def test_lseek(self):
- with self.open(TESTFN, 'rb') as f:
- self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
- self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
- self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
- self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
- self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
- self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
- self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
- self.assertEqual(os.lseek(f.fileno(), size, 0), size)
- # the 'a' that was written at the end of file above
- self.assertEqual(f.read(1), b'a')
- def test_truncate(self):
- with self.open(TESTFN, 'r+b') as f:
- if not hasattr(f, 'truncate'):
- raise unittest.SkipTest("open().truncate() not available "
- "on this system")
- f.seek(0, 2)
- # else we've lost track of the true size
- self.assertEqual(f.tell(), size+1)
- # Cut it back via seek + truncate with no argument.
- newsize = size - 10
- f.seek(newsize)
- f.truncate()
- self.assertEqual(f.tell(), newsize) # else pointer moved
- f.seek(0, 2)
- self.assertEqual(f.tell(), newsize) # else wasn't truncated
- # Ensure that truncate(smaller than true size) shrinks
- # the file.
- newsize -= 1
- f.seek(42)
- f.truncate(newsize)
- self.assertEqual(f.tell(), 42)
- f.seek(0, 2)
- self.assertEqual(f.tell(), newsize)
- # XXX truncate(larger than true size) is ill-defined
- # across platform; cut it waaaaay back
- f.seek(0)
- f.truncate(1)
- self.assertEqual(f.tell(), 0) # else pointer moved
- f.seek(0)
- self.assertEqual(len(f.read()), 1) # else wasn't truncated
- def test_seekable(self):
- # Issue #5016; seekable() can return False when the current position
- # is negative when truncated to an int.
- for pos in (2**31-1, 2**31, 2**31+1):
- with self.open(TESTFN, 'rb') as f:
- f.seek(pos)
- self.assertTrue(f.seekable())
- def skip_no_disk_space(path, required):
- def decorator(fun):
- def wrapper(*args, **kwargs):
- if not hasattr(shutil, "disk_usage"):
- raise unittest.SkipTest("requires shutil.disk_usage")
- if shutil.disk_usage(os.path.realpath(path)).free < required:
- hsize = int(required / 1024 / 1024)
- raise unittest.SkipTest(
- f"required {hsize} MiB of free disk space")
- return fun(*args, **kwargs)
- return wrapper
- return decorator
- class TestCopyfile(LargeFileTest, unittest.TestCase):
- open = staticmethod(io.open)
- # Exact required disk space would be (size * 2), but let's give it a
- # bit more tolerance.
- @skip_no_disk_space(TESTFN, size * 2.5)
- def test_it(self):
- # Internally shutil.copyfile() can use "fast copy" methods like
- # os.sendfile().
- size = os.path.getsize(TESTFN)
- shutil.copyfile(TESTFN, TESTFN2)
- self.assertEqual(os.path.getsize(TESTFN2), size)
- with open(TESTFN2, 'rb') as f:
- self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
- f.seek(size - 5)
- self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
- @unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported')
- class TestSocketSendfile(LargeFileTest, unittest.TestCase):
- open = staticmethod(io.open)
- timeout = SHORT_TIMEOUT
- def setUp(self):
- super().setUp()
- self.thread = None
- def tearDown(self):
- super().tearDown()
- if self.thread is not None:
- self.thread.join(self.timeout)
- self.thread = None
- def tcp_server(self, sock):
- def run(sock):
- with sock:
- conn, _ = sock.accept()
- conn.settimeout(self.timeout)
- with conn, open(TESTFN2, 'wb') as f:
- event.wait(self.timeout)
- while True:
- chunk = conn.recv(65536)
- if not chunk:
- return
- f.write(chunk)
- event = threading.Event()
- sock.settimeout(self.timeout)
- self.thread = threading.Thread(target=run, args=(sock, ))
- self.thread.start()
- event.set()
- # Exact required disk space would be (size * 2), but let's give it a
- # bit more tolerance.
- @skip_no_disk_space(TESTFN, size * 2.5)
- def test_it(self):
- port = socket_helper.find_unused_port()
- with socket.create_server(("", port)) as sock:
- self.tcp_server(sock)
- with socket.create_connection(("127.0.0.1", port)) as client:
- with open(TESTFN, 'rb') as f:
- client.sendfile(f)
- self.tearDown()
- size = os.path.getsize(TESTFN)
- self.assertEqual(os.path.getsize(TESTFN2), size)
- with open(TESTFN2, 'rb') as f:
- self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
- f.seek(size - 5)
- self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
- def setUpModule():
- try:
- import signal
- # The default handler for SIGXFSZ is to abort the process.
- # By ignoring it, system calls exceeding the file size resource
- # limit will raise OSError instead of crashing the interpreter.
- signal.signal(signal.SIGXFSZ, signal.SIG_IGN)
- except (ImportError, AttributeError):
- pass
- # On Windows and Mac OSX this test consumes large resources; It
- # takes a long time to build the >2 GiB file and takes >2 GiB of disk
- # space therefore the resource must be enabled to run this test.
- # If not, nothing after this line stanza will be executed.
- if sys.platform[:3] == 'win' or sys.platform == 'darwin':
- requires('largefile',
- 'test requires %s bytes and a long time to run' % str(size))
- else:
- # Only run if the current filesystem supports large files.
- # (Skip this test on Windows, since we now always support
- # large files.)
- f = open(TESTFN, 'wb', buffering=0)
- try:
- # 2**31 == 2147483648
- f.seek(2147483649)
- # Seeking is not enough of a test: you must write and flush, too!
- f.write(b'x')
- f.flush()
- except (OSError, OverflowError):
- raise unittest.SkipTest("filesystem does not have "
- "largefile support")
- finally:
- f.close()
- unlink(TESTFN)
- class CLargeFileTest(TestFileMethods, unittest.TestCase):
- open = staticmethod(io.open)
- class PyLargeFileTest(TestFileMethods, unittest.TestCase):
- open = staticmethod(pyio.open)
- def tearDownModule():
- unlink(TESTFN)
- unlink(TESTFN2)
- if __name__ == '__main__':
- unittest.main()
|