| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- """Test script for the dumbdbm module
- Original by Roger E. Masse
- """
- import contextlib
- import io
- import operator
- import os
- import stat
- import unittest
- import dbm.dumb as dumbdbm
- from test import support
- from test.support import os_helper
- from functools import partial
- _fname = os_helper.TESTFN
- def _delete_files():
- for ext in [".dir", ".dat", ".bak"]:
- try:
- os.unlink(_fname + ext)
- except OSError:
- pass
- class DumbDBMTestCase(unittest.TestCase):
- _dict = {b'0': b'',
- b'a': b'Python:',
- b'b': b'Programming',
- b'c': b'the',
- b'd': b'way',
- b'f': b'Guido',
- b'g': b'intended',
- '\u00fc'.encode('utf-8') : b'!',
- }
- def test_dumbdbm_creation(self):
- with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
- self.assertEqual(list(f.keys()), [])
- for key in self._dict:
- f[key] = self._dict[key]
- self.read_helper(f)
- @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
- @os_helper.skip_unless_working_chmod
- def test_dumbdbm_creation_mode(self):
- try:
- old_umask = os.umask(0o002)
- f = dumbdbm.open(_fname, 'c', 0o637)
- f.close()
- finally:
- os.umask(old_umask)
- expected_mode = 0o635
- if os.name != 'posix':
- # Windows only supports setting the read-only attribute.
- # This shouldn't fail, but doesn't work like Unix either.
- expected_mode = 0o666
- import stat
- st = os.stat(_fname + '.dat')
- self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
- st = os.stat(_fname + '.dir')
- self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
- def test_close_twice(self):
- f = dumbdbm.open(_fname)
- f[b'a'] = b'b'
- self.assertEqual(f[b'a'], b'b')
- f.close()
- f.close()
- def test_dumbdbm_modification(self):
- self.init_db()
- with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
- self._dict[b'g'] = f[b'g'] = b"indented"
- self.read_helper(f)
- # setdefault() works as in the dict interface
- self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
- self.assertEqual(f[b'xxx'], b'foo')
- def test_dumbdbm_read(self):
- self.init_db()
- with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
- self.read_helper(f)
- with self.assertRaisesRegex(dumbdbm.error,
- 'The database is opened for reading only'):
- f[b'g'] = b'x'
- with self.assertRaisesRegex(dumbdbm.error,
- 'The database is opened for reading only'):
- del f[b'a']
- # get() works as in the dict interface
- self.assertEqual(f.get(b'a'), self._dict[b'a'])
- self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
- self.assertIsNone(f.get(b'xxx'))
- with self.assertRaises(KeyError):
- f[b'xxx']
- def test_dumbdbm_keys(self):
- self.init_db()
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- keys = self.keys_helper(f)
- def test_write_contains(self):
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- f[b'1'] = b'hello'
- self.assertIn(b'1', f)
- def test_write_write_read(self):
- # test for bug #482460
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- f[b'1'] = b'hello'
- f[b'1'] = b'hello2'
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- self.assertEqual(f[b'1'], b'hello2')
- def test_str_read(self):
- self.init_db()
- with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
- self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
- def test_str_write_contains(self):
- self.init_db()
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- f['\u00fc'] = b'!'
- f['1'] = 'a'
- with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
- self.assertIn('\u00fc', f)
- self.assertEqual(f['\u00fc'.encode('utf-8')],
- self._dict['\u00fc'.encode('utf-8')])
- self.assertEqual(f[b'1'], b'a')
- def test_line_endings(self):
- # test for bug #1172763: dumbdbm would die if the line endings
- # weren't what was expected.
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- f[b'1'] = b'hello'
- f[b'2'] = b'hello2'
- # Mangle the file by changing the line separator to Windows or Unix
- with io.open(_fname + '.dir', 'rb') as file:
- data = file.read()
- if os.linesep == '\n':
- data = data.replace(b'\n', b'\r\n')
- else:
- data = data.replace(b'\r\n', b'\n')
- with io.open(_fname + '.dir', 'wb') as file:
- file.write(data)
- f = dumbdbm.open(_fname)
- self.assertEqual(f[b'1'], b'hello')
- self.assertEqual(f[b'2'], b'hello2')
- def read_helper(self, f):
- keys = self.keys_helper(f)
- for key in self._dict:
- self.assertEqual(self._dict[key], f[key])
- def init_db(self):
- with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
- for k in self._dict:
- f[k] = self._dict[k]
- def keys_helper(self, f):
- keys = sorted(f.keys())
- dkeys = sorted(self._dict.keys())
- self.assertEqual(keys, dkeys)
- return keys
- # Perform randomized operations. This doesn't make assumptions about
- # what *might* fail.
- def test_random(self):
- import random
- d = {} # mirror the database
- for dummy in range(5):
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- for dummy in range(100):
- k = random.choice('abcdefghijklm')
- if random.random() < 0.2:
- if k in d:
- del d[k]
- del f[k]
- else:
- v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
- d[k] = v
- f[k] = v
- self.assertEqual(f[k], v)
- with contextlib.closing(dumbdbm.open(_fname)) as f:
- expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
- got = sorted(f.items())
- self.assertEqual(expected, got)
- def test_context_manager(self):
- with dumbdbm.open(_fname, 'c') as db:
- db["dumbdbm context manager"] = "context manager"
- with dumbdbm.open(_fname, 'r') as db:
- self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
- with self.assertRaises(dumbdbm.error):
- db.keys()
- def test_check_closed(self):
- f = dumbdbm.open(_fname, 'c')
- f.close()
- for meth in (partial(operator.delitem, f),
- partial(operator.setitem, f, 'b'),
- partial(operator.getitem, f),
- partial(operator.contains, f)):
- with self.assertRaises(dumbdbm.error) as cm:
- meth('test')
- self.assertEqual(str(cm.exception),
- "DBM object has already been closed")
- for meth in (operator.methodcaller('keys'),
- operator.methodcaller('iterkeys'),
- operator.methodcaller('items'),
- len):
- with self.assertRaises(dumbdbm.error) as cm:
- meth(f)
- self.assertEqual(str(cm.exception),
- "DBM object has already been closed")
- def test_create_new(self):
- with dumbdbm.open(_fname, 'n') as f:
- for k in self._dict:
- f[k] = self._dict[k]
- with dumbdbm.open(_fname, 'n') as f:
- self.assertEqual(f.keys(), [])
- def test_eval(self):
- with open(_fname + '.dir', 'w', encoding="utf-8") as stream:
- stream.write("str(print('Hacked!')), 0\n")
- with support.captured_stdout() as stdout:
- with self.assertRaises(ValueError):
- with dumbdbm.open(_fname) as f:
- pass
- self.assertEqual(stdout.getvalue(), '')
- def test_missing_data(self):
- for value in ('r', 'w'):
- _delete_files()
- with self.assertRaises(FileNotFoundError):
- dumbdbm.open(_fname, value)
- self.assertFalse(os.path.exists(_fname + '.dir'))
- self.assertFalse(os.path.exists(_fname + '.bak'))
- def test_missing_index(self):
- with dumbdbm.open(_fname, 'n') as f:
- pass
- os.unlink(_fname + '.dir')
- for value in ('r', 'w'):
- with self.assertRaises(FileNotFoundError):
- dumbdbm.open(_fname, value)
- self.assertFalse(os.path.exists(_fname + '.dir'))
- self.assertFalse(os.path.exists(_fname + '.bak'))
- def test_invalid_flag(self):
- for flag in ('x', 'rf', None):
- with self.assertRaisesRegex(ValueError,
- "Flag must be one of "
- "'r', 'w', 'c', or 'n'"):
- dumbdbm.open(_fname, flag)
- @os_helper.skip_unless_working_chmod
- def test_readonly_files(self):
- with os_helper.temp_dir() as dir:
- fname = os.path.join(dir, 'db')
- with dumbdbm.open(fname, 'n') as f:
- self.assertEqual(list(f.keys()), [])
- for key in self._dict:
- f[key] = self._dict[key]
- os.chmod(fname + ".dir", stat.S_IRUSR)
- os.chmod(fname + ".dat", stat.S_IRUSR)
- os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
- with dumbdbm.open(fname, 'r') as f:
- self.assertEqual(sorted(f.keys()), sorted(self._dict))
- f.close() # don't write
- @unittest.skipUnless(os_helper.TESTFN_NONASCII,
- 'requires OS support of non-ASCII encodings')
- def test_nonascii_filename(self):
- filename = os_helper.TESTFN_NONASCII
- for suffix in ['.dir', '.dat', '.bak']:
- self.addCleanup(os_helper.unlink, filename + suffix)
- with dumbdbm.open(filename, 'c') as db:
- db[b'key'] = b'value'
- self.assertTrue(os.path.exists(filename + '.dat'))
- self.assertTrue(os.path.exists(filename + '.dir'))
- with dumbdbm.open(filename, 'r') as db:
- self.assertEqual(list(db.keys()), [b'key'])
- self.assertTrue(b'key' in db)
- self.assertEqual(db[b'key'], b'value')
- def test_open_with_pathlib_path(self):
- dumbdbm.open(os_helper.FakePath(_fname), "c").close()
- def test_open_with_bytes_path(self):
- dumbdbm.open(os.fsencode(_fname), "c").close()
- def test_open_with_pathlib_bytes_path(self):
- dumbdbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close()
- def tearDown(self):
- _delete_files()
- def setUp(self):
- _delete_files()
- if __name__ == "__main__":
- unittest.main()
|