test_shelve.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import unittest
  2. import dbm
  3. import shelve
  4. import glob
  5. import pickle
  6. import os
  7. from test import support
  8. from test.support import os_helper
  9. from collections.abc import MutableMapping
  10. from test.test_dbm import dbm_iterator
  11. def L1(s):
  12. return s.decode("latin-1")
  13. class byteskeydict(MutableMapping):
  14. "Mapping that supports bytes keys"
  15. def __init__(self):
  16. self.d = {}
  17. def __getitem__(self, key):
  18. return self.d[L1(key)]
  19. def __setitem__(self, key, value):
  20. self.d[L1(key)] = value
  21. def __delitem__(self, key):
  22. del self.d[L1(key)]
  23. def __len__(self):
  24. return len(self.d)
  25. def iterkeys(self):
  26. for k in self.d.keys():
  27. yield k.encode("latin-1")
  28. __iter__ = iterkeys
  29. def keys(self):
  30. return list(self.iterkeys())
  31. def copy(self):
  32. return byteskeydict(self.d)
  33. class TestCase(unittest.TestCase):
  34. dirname = os_helper.TESTFN
  35. fn = os.path.join(os_helper.TESTFN, "shelftemp.db")
  36. def test_close(self):
  37. d1 = {}
  38. s = shelve.Shelf(d1, protocol=2, writeback=False)
  39. s['key1'] = [1,2,3,4]
  40. self.assertEqual(s['key1'], [1,2,3,4])
  41. self.assertEqual(len(s), 1)
  42. s.close()
  43. self.assertRaises(ValueError, len, s)
  44. try:
  45. s['key1']
  46. except ValueError:
  47. pass
  48. else:
  49. self.fail('Closed shelf should not find a key')
  50. def test_open_template(self, filename=None, protocol=None):
  51. os.mkdir(self.dirname)
  52. self.addCleanup(os_helper.rmtree, self.dirname)
  53. s = shelve.open(filename=filename if filename is not None else self.fn,
  54. protocol=protocol)
  55. try:
  56. s['key1'] = (1,2,3,4)
  57. self.assertEqual(s['key1'], (1,2,3,4))
  58. finally:
  59. s.close()
  60. def test_ascii_file_shelf(self):
  61. self.test_open_template(protocol=0)
  62. def test_binary_file_shelf(self):
  63. self.test_open_template(protocol=1)
  64. def test_proto2_file_shelf(self):
  65. self.test_open_template(protocol=2)
  66. def test_pathlib_path_file_shelf(self):
  67. self.test_open_template(filename=os_helper.FakePath(self.fn))
  68. def test_bytes_path_file_shelf(self):
  69. self.test_open_template(filename=os.fsencode(self.fn))
  70. def test_pathlib_bytes_path_file_shelf(self):
  71. self.test_open_template(filename=os_helper.FakePath(os.fsencode(self.fn)))
  72. def test_in_memory_shelf(self):
  73. d1 = byteskeydict()
  74. with shelve.Shelf(d1, protocol=0) as s:
  75. s['key1'] = (1,2,3,4)
  76. self.assertEqual(s['key1'], (1,2,3,4))
  77. d2 = byteskeydict()
  78. with shelve.Shelf(d2, protocol=1) as s:
  79. s['key1'] = (1,2,3,4)
  80. self.assertEqual(s['key1'], (1,2,3,4))
  81. self.assertEqual(len(d1), 1)
  82. self.assertEqual(len(d2), 1)
  83. self.assertNotEqual(d1.items(), d2.items())
  84. def test_mutable_entry(self):
  85. d1 = byteskeydict()
  86. with shelve.Shelf(d1, protocol=2, writeback=False) as s:
  87. s['key1'] = [1,2,3,4]
  88. self.assertEqual(s['key1'], [1,2,3,4])
  89. s['key1'].append(5)
  90. self.assertEqual(s['key1'], [1,2,3,4])
  91. d2 = byteskeydict()
  92. with shelve.Shelf(d2, protocol=2, writeback=True) as s:
  93. s['key1'] = [1,2,3,4]
  94. self.assertEqual(s['key1'], [1,2,3,4])
  95. s['key1'].append(5)
  96. self.assertEqual(s['key1'], [1,2,3,4,5])
  97. self.assertEqual(len(d1), 1)
  98. self.assertEqual(len(d2), 1)
  99. def test_keyencoding(self):
  100. d = {}
  101. key = 'Pöp'
  102. # the default keyencoding is utf-8
  103. shelve.Shelf(d)[key] = [1]
  104. self.assertIn(key.encode('utf-8'), d)
  105. # but a different one can be given
  106. shelve.Shelf(d, keyencoding='latin-1')[key] = [1]
  107. self.assertIn(key.encode('latin-1'), d)
  108. # with all consequences
  109. s = shelve.Shelf(d, keyencoding='ascii')
  110. self.assertRaises(UnicodeEncodeError, s.__setitem__, key, [1])
  111. def test_writeback_also_writes_immediately(self):
  112. # Issue 5754
  113. d = {}
  114. key = 'key'
  115. encodedkey = key.encode('utf-8')
  116. with shelve.Shelf(d, writeback=True) as s:
  117. s[key] = [1]
  118. p1 = d[encodedkey] # Will give a KeyError if backing store not updated
  119. s['key'].append(2)
  120. p2 = d[encodedkey]
  121. self.assertNotEqual(p1, p2) # Write creates new object in store
  122. def test_with(self):
  123. d1 = {}
  124. with shelve.Shelf(d1, protocol=2, writeback=False) as s:
  125. s['key1'] = [1,2,3,4]
  126. self.assertEqual(s['key1'], [1,2,3,4])
  127. self.assertEqual(len(s), 1)
  128. self.assertRaises(ValueError, len, s)
  129. try:
  130. s['key1']
  131. except ValueError:
  132. pass
  133. else:
  134. self.fail('Closed shelf should not find a key')
  135. def test_default_protocol(self):
  136. with shelve.Shelf({}) as s:
  137. self.assertEqual(s._protocol, pickle.DEFAULT_PROTOCOL)
  138. class TestShelveBase:
  139. type2test = shelve.Shelf
  140. def _reference(self):
  141. return {"key1":"value1", "key2":2, "key3":(1,2,3)}
  142. class TestShelveInMemBase(TestShelveBase):
  143. def _empty_mapping(self):
  144. return shelve.Shelf(byteskeydict(), **self._args)
  145. class TestShelveFileBase(TestShelveBase):
  146. counter = 0
  147. def _empty_mapping(self):
  148. self.counter += 1
  149. x = shelve.open(self.base_path + str(self.counter), **self._args)
  150. self.addCleanup(x.close)
  151. return x
  152. def setUp(self):
  153. dirname = os_helper.TESTFN
  154. os.mkdir(dirname)
  155. self.addCleanup(os_helper.rmtree, dirname)
  156. self.base_path = os.path.join(dirname, "shelftemp.db")
  157. self.addCleanup(setattr, dbm, '_defaultmod', dbm._defaultmod)
  158. dbm._defaultmod = self.dbm_mod
  159. from test import mapping_tests
  160. for proto in range(pickle.HIGHEST_PROTOCOL + 1):
  161. bases = (TestShelveInMemBase, mapping_tests.BasicTestMappingProtocol)
  162. name = f'TestProto{proto}MemShelve'
  163. globals()[name] = type(name, bases,
  164. {'_args': {'protocol': proto}})
  165. bases = (TestShelveFileBase, mapping_tests.BasicTestMappingProtocol)
  166. for dbm_mod in dbm_iterator():
  167. assert dbm_mod.__name__.startswith('dbm.')
  168. suffix = dbm_mod.__name__[4:]
  169. name = f'TestProto{proto}File_{suffix}Shelve'
  170. globals()[name] = type(name, bases,
  171. {'dbm_mod': dbm_mod, '_args': {'protocol': proto}})
  172. if __name__ == "__main__":
  173. unittest.main()