multibytecodec_support.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. #
  2. # multibytecodec_support.py
  3. # Common Unittest Routines for CJK codecs
  4. #
  5. import codecs
  6. import os
  7. import re
  8. import sys
  9. import unittest
  10. from http.client import HTTPException
  11. from test import support
  12. from io import BytesIO
  13. class TestBase:
  14. encoding = '' # codec name
  15. codec = None # codec tuple (with 4 elements)
  16. tstring = None # must set. 2 strings to test StreamReader
  17. codectests = None # must set. codec test tuple
  18. roundtriptest = 1 # set if roundtrip is possible with unicode
  19. has_iso10646 = 0 # set if this encoding contains whole iso10646 map
  20. xmlcharnametest = None # string to test xmlcharrefreplace
  21. unmappedunicode = '\udeee' # a unicode code point that is not mapped.
  22. def setUp(self):
  23. if self.codec is None:
  24. self.codec = codecs.lookup(self.encoding)
  25. self.encode = self.codec.encode
  26. self.decode = self.codec.decode
  27. self.reader = self.codec.streamreader
  28. self.writer = self.codec.streamwriter
  29. self.incrementalencoder = self.codec.incrementalencoder
  30. self.incrementaldecoder = self.codec.incrementaldecoder
  31. def test_chunkcoding(self):
  32. tstring_lines = []
  33. for b in self.tstring:
  34. lines = b.split(b"\n")
  35. last = lines.pop()
  36. assert last == b""
  37. lines = [line + b"\n" for line in lines]
  38. tstring_lines.append(lines)
  39. for native, utf8 in zip(*tstring_lines):
  40. u = self.decode(native)[0]
  41. self.assertEqual(u, utf8.decode('utf-8'))
  42. if self.roundtriptest:
  43. self.assertEqual(native, self.encode(u)[0])
  44. def test_errorhandle(self):
  45. for source, scheme, expected in self.codectests:
  46. if isinstance(source, bytes):
  47. func = self.decode
  48. else:
  49. func = self.encode
  50. if expected:
  51. result = func(source, scheme)[0]
  52. if func is self.decode:
  53. self.assertTrue(type(result) is str, type(result))
  54. self.assertEqual(result, expected,
  55. '%a.decode(%r, %r)=%a != %a'
  56. % (source, self.encoding, scheme, result,
  57. expected))
  58. else:
  59. self.assertTrue(type(result) is bytes, type(result))
  60. self.assertEqual(result, expected,
  61. '%a.encode(%r, %r)=%a != %a'
  62. % (source, self.encoding, scheme, result,
  63. expected))
  64. else:
  65. self.assertRaises(UnicodeError, func, source, scheme)
  66. def test_xmlcharrefreplace(self):
  67. if self.has_iso10646:
  68. self.skipTest('encoding contains full ISO 10646 map')
  69. s = "\u0b13\u0b23\u0b60 nd eggs"
  70. self.assertEqual(
  71. self.encode(s, "xmlcharrefreplace")[0],
  72. b"ଓଣୠ nd eggs"
  73. )
  74. def test_customreplace_encode(self):
  75. if self.has_iso10646:
  76. self.skipTest('encoding contains full ISO 10646 map')
  77. from html.entities import codepoint2name
  78. def xmlcharnamereplace(exc):
  79. if not isinstance(exc, UnicodeEncodeError):
  80. raise TypeError("don't know how to handle %r" % exc)
  81. l = []
  82. for c in exc.object[exc.start:exc.end]:
  83. if ord(c) in codepoint2name:
  84. l.append("&%s;" % codepoint2name[ord(c)])
  85. else:
  86. l.append("&#%d;" % ord(c))
  87. return ("".join(l), exc.end)
  88. codecs.register_error("test.xmlcharnamereplace", xmlcharnamereplace)
  89. if self.xmlcharnametest:
  90. sin, sout = self.xmlcharnametest
  91. else:
  92. sin = "\xab\u211c\xbb = \u2329\u1234\u232a"
  93. sout = b"«ℜ» = ⟨ሴ⟩"
  94. self.assertEqual(self.encode(sin,
  95. "test.xmlcharnamereplace")[0], sout)
  96. def test_callback_returns_bytes(self):
  97. def myreplace(exc):
  98. return (b"1234", exc.end)
  99. codecs.register_error("test.cjktest", myreplace)
  100. enc = self.encode("abc" + self.unmappedunicode + "def", "test.cjktest")[0]
  101. self.assertEqual(enc, b"abc1234def")
  102. def test_callback_wrong_objects(self):
  103. def myreplace(exc):
  104. return (ret, exc.end)
  105. codecs.register_error("test.cjktest", myreplace)
  106. for ret in ([1, 2, 3], [], None, object()):
  107. self.assertRaises(TypeError, self.encode, self.unmappedunicode,
  108. 'test.cjktest')
  109. def test_callback_long_index(self):
  110. def myreplace(exc):
  111. return ('x', int(exc.end))
  112. codecs.register_error("test.cjktest", myreplace)
  113. self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
  114. 'test.cjktest'), (b'abcdxefgh', 9))
  115. def myreplace(exc):
  116. return ('x', sys.maxsize + 1)
  117. codecs.register_error("test.cjktest", myreplace)
  118. self.assertRaises(IndexError, self.encode, self.unmappedunicode,
  119. 'test.cjktest')
  120. def test_callback_None_index(self):
  121. def myreplace(exc):
  122. return ('x', None)
  123. codecs.register_error("test.cjktest", myreplace)
  124. self.assertRaises(TypeError, self.encode, self.unmappedunicode,
  125. 'test.cjktest')
  126. def test_callback_backward_index(self):
  127. def myreplace(exc):
  128. if myreplace.limit > 0:
  129. myreplace.limit -= 1
  130. return ('REPLACED', 0)
  131. else:
  132. return ('TERMINAL', exc.end)
  133. myreplace.limit = 3
  134. codecs.register_error("test.cjktest", myreplace)
  135. self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
  136. 'test.cjktest'),
  137. (b'abcdREPLACEDabcdREPLACEDabcdREPLACEDabcdTERMINALefgh', 9))
  138. def test_callback_forward_index(self):
  139. def myreplace(exc):
  140. return ('REPLACED', exc.end + 2)
  141. codecs.register_error("test.cjktest", myreplace)
  142. self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
  143. 'test.cjktest'), (b'abcdREPLACEDgh', 9))
  144. def test_callback_index_outofbound(self):
  145. def myreplace(exc):
  146. return ('TERM', 100)
  147. codecs.register_error("test.cjktest", myreplace)
  148. self.assertRaises(IndexError, self.encode, self.unmappedunicode,
  149. 'test.cjktest')
  150. def test_incrementalencoder(self):
  151. UTF8Reader = codecs.getreader('utf-8')
  152. for sizehint in [None] + list(range(1, 33)) + \
  153. [64, 128, 256, 512, 1024]:
  154. istream = UTF8Reader(BytesIO(self.tstring[1]))
  155. ostream = BytesIO()
  156. encoder = self.incrementalencoder()
  157. while 1:
  158. if sizehint is not None:
  159. data = istream.read(sizehint)
  160. else:
  161. data = istream.read()
  162. if not data:
  163. break
  164. e = encoder.encode(data)
  165. ostream.write(e)
  166. self.assertEqual(ostream.getvalue(), self.tstring[0])
  167. def test_incrementaldecoder(self):
  168. UTF8Writer = codecs.getwriter('utf-8')
  169. for sizehint in [None, -1] + list(range(1, 33)) + \
  170. [64, 128, 256, 512, 1024]:
  171. istream = BytesIO(self.tstring[0])
  172. ostream = UTF8Writer(BytesIO())
  173. decoder = self.incrementaldecoder()
  174. while 1:
  175. data = istream.read(sizehint)
  176. if not data:
  177. break
  178. else:
  179. u = decoder.decode(data)
  180. ostream.write(u)
  181. self.assertEqual(ostream.getvalue(), self.tstring[1])
  182. def test_incrementalencoder_error_callback(self):
  183. inv = self.unmappedunicode
  184. e = self.incrementalencoder()
  185. self.assertRaises(UnicodeEncodeError, e.encode, inv, True)
  186. e.errors = 'ignore'
  187. self.assertEqual(e.encode(inv, True), b'')
  188. e.reset()
  189. def tempreplace(exc):
  190. return ('called', exc.end)
  191. codecs.register_error('test.incremental_error_callback', tempreplace)
  192. e.errors = 'test.incremental_error_callback'
  193. self.assertEqual(e.encode(inv, True), b'called')
  194. # again
  195. e.errors = 'ignore'
  196. self.assertEqual(e.encode(inv, True), b'')
  197. def test_streamreader(self):
  198. UTF8Writer = codecs.getwriter('utf-8')
  199. for name in ["read", "readline", "readlines"]:
  200. for sizehint in [None, -1] + list(range(1, 33)) + \
  201. [64, 128, 256, 512, 1024]:
  202. istream = self.reader(BytesIO(self.tstring[0]))
  203. ostream = UTF8Writer(BytesIO())
  204. func = getattr(istream, name)
  205. while 1:
  206. data = func(sizehint)
  207. if not data:
  208. break
  209. if name == "readlines":
  210. ostream.writelines(data)
  211. else:
  212. ostream.write(data)
  213. self.assertEqual(ostream.getvalue(), self.tstring[1])
  214. def test_streamwriter(self):
  215. readfuncs = ('read', 'readline', 'readlines')
  216. UTF8Reader = codecs.getreader('utf-8')
  217. for name in readfuncs:
  218. for sizehint in [None] + list(range(1, 33)) + \
  219. [64, 128, 256, 512, 1024]:
  220. istream = UTF8Reader(BytesIO(self.tstring[1]))
  221. ostream = self.writer(BytesIO())
  222. func = getattr(istream, name)
  223. while 1:
  224. if sizehint is not None:
  225. data = func(sizehint)
  226. else:
  227. data = func()
  228. if not data:
  229. break
  230. if name == "readlines":
  231. ostream.writelines(data)
  232. else:
  233. ostream.write(data)
  234. self.assertEqual(ostream.getvalue(), self.tstring[0])
  235. def test_streamwriter_reset_no_pending(self):
  236. # Issue #23247: Calling reset() on a fresh StreamWriter instance
  237. # (without pending data) must not crash
  238. stream = BytesIO()
  239. writer = self.writer(stream)
  240. writer.reset()
  241. def test_incrementalencoder_del_segfault(self):
  242. e = self.incrementalencoder()
  243. with self.assertRaises(AttributeError):
  244. del e.errors
  245. class TestBase_Mapping(unittest.TestCase):
  246. pass_enctest = []
  247. pass_dectest = []
  248. supmaps = []
  249. codectests = []
  250. def setUp(self):
  251. try:
  252. self.open_mapping_file().close() # test it to report the error early
  253. except (OSError, HTTPException):
  254. self.skipTest("Could not retrieve "+self.mapfileurl)
  255. def open_mapping_file(self):
  256. return support.open_urlresource(self.mapfileurl, encoding="utf-8")
  257. def test_mapping_file(self):
  258. if self.mapfileurl.endswith('.xml'):
  259. self._test_mapping_file_ucm()
  260. else:
  261. self._test_mapping_file_plain()
  262. def _test_mapping_file_plain(self):
  263. def unichrs(s):
  264. return ''.join(chr(int(x, 16)) for x in s.split('+'))
  265. urt_wa = {}
  266. with self.open_mapping_file() as f:
  267. for line in f:
  268. if not line:
  269. break
  270. data = line.split('#')[0].split()
  271. if len(data) != 2:
  272. continue
  273. if data[0][:2] != '0x':
  274. self.fail(f"Invalid line: {line!r}")
  275. csetch = bytes.fromhex(data[0][2:])
  276. if len(csetch) == 1 and 0x80 <= csetch[0]:
  277. continue
  278. unich = unichrs(data[1])
  279. if ord(unich) == 0xfffd or unich in urt_wa:
  280. continue
  281. urt_wa[unich] = csetch
  282. self._testpoint(csetch, unich)
  283. def _test_mapping_file_ucm(self):
  284. with self.open_mapping_file() as f:
  285. ucmdata = f.read()
  286. uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata)
  287. for uni, coded in uc:
  288. unich = chr(int(uni, 16))
  289. codech = bytes.fromhex(coded)
  290. self._testpoint(codech, unich)
  291. def test_mapping_supplemental(self):
  292. for mapping in self.supmaps:
  293. self._testpoint(*mapping)
  294. def _testpoint(self, csetch, unich):
  295. if (csetch, unich) not in self.pass_enctest:
  296. self.assertEqual(unich.encode(self.encoding), csetch)
  297. if (csetch, unich) not in self.pass_dectest:
  298. self.assertEqual(str(csetch, self.encoding), unich)
  299. def test_errorhandle(self):
  300. for source, scheme, expected in self.codectests:
  301. if isinstance(source, bytes):
  302. func = source.decode
  303. else:
  304. func = source.encode
  305. if expected:
  306. if isinstance(source, bytes):
  307. result = func(self.encoding, scheme)
  308. self.assertTrue(type(result) is str, type(result))
  309. self.assertEqual(result, expected,
  310. '%a.decode(%r, %r)=%a != %a'
  311. % (source, self.encoding, scheme, result,
  312. expected))
  313. else:
  314. result = func(self.encoding, scheme)
  315. self.assertTrue(type(result) is bytes, type(result))
  316. self.assertEqual(result, expected,
  317. '%a.encode(%r, %r)=%a != %a'
  318. % (source, self.encoding, scheme, result,
  319. expected))
  320. else:
  321. self.assertRaises(UnicodeError, func, self.encoding, scheme)
  322. def load_teststring(name):
  323. dir = os.path.join(os.path.dirname(__file__), 'cjkencodings')
  324. with open(os.path.join(dir, name + '.txt'), 'rb') as f:
  325. encoded = f.read()
  326. with open(os.path.join(dir, name + '-utf8.txt'), 'rb') as f:
  327. utf8 = f.read()
  328. return encoded, utf8