test_multibytecodec.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. #
  2. # test_multibytecodec.py
  3. # Unit test for multibytecodec itself
  4. #
  5. import _multibytecodec
  6. import codecs
  7. import io
  8. import sys
  9. import textwrap
  10. import unittest
  11. from test import support
  12. from test.support import os_helper
  13. from test.support.os_helper import TESTFN
  14. ALL_CJKENCODINGS = [
  15. # _codecs_cn
  16. 'gb2312', 'gbk', 'gb18030', 'hz',
  17. # _codecs_hk
  18. 'big5hkscs',
  19. # _codecs_jp
  20. 'cp932', 'shift_jis', 'euc_jp', 'euc_jisx0213', 'shift_jisx0213',
  21. 'euc_jis_2004', 'shift_jis_2004',
  22. # _codecs_kr
  23. 'cp949', 'euc_kr', 'johab',
  24. # _codecs_tw
  25. 'big5', 'cp950',
  26. # _codecs_iso2022
  27. 'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004',
  28. 'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr',
  29. ]
  30. class Test_MultibyteCodec(unittest.TestCase):
  31. def test_nullcoding(self):
  32. for enc in ALL_CJKENCODINGS:
  33. self.assertEqual(b''.decode(enc), '')
  34. self.assertEqual(str(b'', enc), '')
  35. self.assertEqual(''.encode(enc), b'')
  36. def test_str_decode(self):
  37. for enc in ALL_CJKENCODINGS:
  38. self.assertEqual('abcd'.encode(enc), b'abcd')
  39. def test_errorcallback_longindex(self):
  40. dec = codecs.getdecoder('euc-kr')
  41. myreplace = lambda exc: ('', sys.maxsize+1)
  42. codecs.register_error('test.cjktest', myreplace)
  43. self.assertRaises(IndexError, dec,
  44. b'apple\x92ham\x93spam', 'test.cjktest')
  45. def test_errorcallback_custom_ignore(self):
  46. # Issue #23215: MemoryError with custom error handlers and multibyte codecs
  47. data = 100 * "\udc00"
  48. codecs.register_error("test.ignore", codecs.ignore_errors)
  49. for enc in ALL_CJKENCODINGS:
  50. self.assertEqual(data.encode(enc, "test.ignore"), b'')
  51. def test_codingspec(self):
  52. try:
  53. for enc in ALL_CJKENCODINGS:
  54. code = '# coding: {}\n'.format(enc)
  55. exec(code)
  56. finally:
  57. os_helper.unlink(TESTFN)
  58. def test_init_segfault(self):
  59. # bug #3305: this used to segfault
  60. self.assertRaises(AttributeError,
  61. _multibytecodec.MultibyteStreamReader, None)
  62. self.assertRaises(AttributeError,
  63. _multibytecodec.MultibyteStreamWriter, None)
  64. def test_decode_unicode(self):
  65. # Trying to decode a unicode string should raise a TypeError
  66. for enc in ALL_CJKENCODINGS:
  67. self.assertRaises(TypeError, codecs.getdecoder(enc), "")
  68. class Test_IncrementalEncoder(unittest.TestCase):
  69. def test_stateless(self):
  70. # cp949 encoder isn't stateful at all.
  71. encoder = codecs.getincrementalencoder('cp949')()
  72. self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
  73. b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
  74. self.assertEqual(encoder.reset(), None)
  75. self.assertEqual(encoder.encode('\u2606\u223c\u2606', True),
  76. b'\xa1\xd9\xa1\xad\xa1\xd9')
  77. self.assertEqual(encoder.reset(), None)
  78. self.assertEqual(encoder.encode('', True), b'')
  79. self.assertEqual(encoder.encode('', False), b'')
  80. self.assertEqual(encoder.reset(), None)
  81. def test_stateful(self):
  82. # jisx0213 encoder is stateful for a few code points. eg)
  83. # U+00E6 => A9DC
  84. # U+00E6 U+0300 => ABC4
  85. # U+0300 => ABDC
  86. encoder = codecs.getincrementalencoder('jisx0213')()
  87. self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
  88. self.assertEqual(encoder.encode('\u00e6'), b'')
  89. self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
  90. self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')
  91. self.assertEqual(encoder.reset(), None)
  92. self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
  93. self.assertEqual(encoder.encode('\u00e6'), b'')
  94. self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
  95. self.assertEqual(encoder.encode('', True), b'')
  96. def test_stateful_keep_buffer(self):
  97. encoder = codecs.getincrementalencoder('jisx0213')()
  98. self.assertEqual(encoder.encode('\u00e6'), b'')
  99. self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
  100. self.assertEqual(encoder.encode('\u0300\u00e6'), b'\xab\xc4')
  101. self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
  102. self.assertEqual(encoder.reset(), None)
  103. self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
  104. self.assertEqual(encoder.encode('\u00e6'), b'')
  105. self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
  106. self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
  107. def test_state_methods_with_buffer_state(self):
  108. # euc_jis_2004 stores state as a buffer of pending bytes
  109. encoder = codecs.getincrementalencoder('euc_jis_2004')()
  110. initial_state = encoder.getstate()
  111. self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
  112. encoder.setstate(initial_state)
  113. self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
  114. self.assertEqual(encoder.encode('\u00e6'), b'')
  115. partial_state = encoder.getstate()
  116. self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
  117. encoder.setstate(partial_state)
  118. self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
  119. def test_state_methods_with_non_buffer_state(self):
  120. # iso2022_jp stores state without using a buffer
  121. encoder = codecs.getincrementalencoder('iso2022_jp')()
  122. self.assertEqual(encoder.encode('z'), b'z')
  123. en_state = encoder.getstate()
  124. self.assertEqual(encoder.encode('\u3042'), b'\x1b\x24\x42\x24\x22')
  125. jp_state = encoder.getstate()
  126. self.assertEqual(encoder.encode('z'), b'\x1b\x28\x42z')
  127. encoder.setstate(jp_state)
  128. self.assertEqual(encoder.encode('\u3042'), b'\x24\x22')
  129. encoder.setstate(en_state)
  130. self.assertEqual(encoder.encode('z'), b'z')
  131. def test_getstate_returns_expected_value(self):
  132. # Note: getstate is implemented such that these state values
  133. # are expected to be the same across all builds of Python,
  134. # regardless of x32/64 bit, endianness and compiler.
  135. # euc_jis_2004 stores state as a buffer of pending bytes
  136. buffer_state_encoder = codecs.getincrementalencoder('euc_jis_2004')()
  137. self.assertEqual(buffer_state_encoder.getstate(), 0)
  138. buffer_state_encoder.encode('\u00e6')
  139. self.assertEqual(buffer_state_encoder.getstate(),
  140. int.from_bytes(
  141. b"\x02"
  142. b"\xc3\xa6"
  143. b"\x00\x00\x00\x00\x00\x00\x00\x00",
  144. 'little'))
  145. buffer_state_encoder.encode('\u0300')
  146. self.assertEqual(buffer_state_encoder.getstate(), 0)
  147. # iso2022_jp stores state without using a buffer
  148. non_buffer_state_encoder = codecs.getincrementalencoder('iso2022_jp')()
  149. self.assertEqual(non_buffer_state_encoder.getstate(),
  150. int.from_bytes(
  151. b"\x00"
  152. b"\x42\x42\x00\x00\x00\x00\x00\x00",
  153. 'little'))
  154. non_buffer_state_encoder.encode('\u3042')
  155. self.assertEqual(non_buffer_state_encoder.getstate(),
  156. int.from_bytes(
  157. b"\x00"
  158. b"\xc2\x42\x00\x00\x00\x00\x00\x00",
  159. 'little'))
  160. def test_setstate_validates_input_size(self):
  161. encoder = codecs.getincrementalencoder('euc_jp')()
  162. pending_size_nine = int.from_bytes(
  163. b"\x09"
  164. b"\x00\x00\x00\x00\x00\x00\x00\x00"
  165. b"\x00\x00\x00\x00\x00\x00\x00\x00",
  166. 'little')
  167. self.assertRaises(UnicodeError, encoder.setstate, pending_size_nine)
  168. def test_setstate_validates_input_bytes(self):
  169. encoder = codecs.getincrementalencoder('euc_jp')()
  170. invalid_utf8 = int.from_bytes(
  171. b"\x01"
  172. b"\xff"
  173. b"\x00\x00\x00\x00\x00\x00\x00\x00",
  174. 'little')
  175. self.assertRaises(UnicodeDecodeError, encoder.setstate, invalid_utf8)
  176. def test_issue5640(self):
  177. encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
  178. self.assertEqual(encoder.encode('\xff'), b'\\xff')
  179. self.assertEqual(encoder.encode('\n'), b'\n')
  180. @support.cpython_only
  181. def test_subinterp(self):
  182. # bpo-42846: Test a CJK codec in a subinterpreter
  183. import _testcapi
  184. encoding = 'cp932'
  185. text = "Python の開発は、1990 年ごろから開始されています。"
  186. code = textwrap.dedent("""
  187. import codecs
  188. encoding = %r
  189. text = %r
  190. encoder = codecs.getincrementalencoder(encoding)()
  191. text2 = encoder.encode(text).decode(encoding)
  192. if text2 != text:
  193. raise ValueError(f"encoding issue: {text2!a} != {text!a}")
  194. """) % (encoding, text)
  195. res = _testcapi.run_in_subinterp(code)
  196. self.assertEqual(res, 0)
  197. class Test_IncrementalDecoder(unittest.TestCase):
  198. def test_dbcs(self):
  199. # cp949 decoder is simple with only 1 or 2 bytes sequences.
  200. decoder = codecs.getincrementaldecoder('cp949')()
  201. self.assertEqual(decoder.decode(b'\xc6\xc4\xc0\xcc\xbd'),
  202. '\ud30c\uc774')
  203. self.assertEqual(decoder.decode(b'\xe3 \xb8\xb6\xc0\xbb'),
  204. '\uc36c \ub9c8\uc744')
  205. self.assertEqual(decoder.decode(b''), '')
  206. def test_dbcs_keep_buffer(self):
  207. decoder = codecs.getincrementaldecoder('cp949')()
  208. self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
  209. self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
  210. self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
  211. self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
  212. self.assertRaises(UnicodeDecodeError, decoder.decode,
  213. b'\xcc\xbd', True)
  214. self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
  215. def test_iso2022(self):
  216. decoder = codecs.getincrementaldecoder('iso2022-jp')()
  217. ESC = b'\x1b'
  218. self.assertEqual(decoder.decode(ESC + b'('), '')
  219. self.assertEqual(decoder.decode(b'B', True), '')
  220. self.assertEqual(decoder.decode(ESC + b'$'), '')
  221. self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
  222. self.assertEqual(decoder.decode(b'@$@'), '\u4e16')
  223. self.assertEqual(decoder.decode(b'$', True), '\u4e16')
  224. self.assertEqual(decoder.reset(), None)
  225. self.assertEqual(decoder.decode(b'@$'), '@$')
  226. self.assertEqual(decoder.decode(ESC + b'$'), '')
  227. self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
  228. self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
  229. def test_decode_unicode(self):
  230. # Trying to decode a unicode string should raise a TypeError
  231. for enc in ALL_CJKENCODINGS:
  232. decoder = codecs.getincrementaldecoder(enc)()
  233. self.assertRaises(TypeError, decoder.decode, "")
  234. def test_state_methods(self):
  235. decoder = codecs.getincrementaldecoder('euc_jp')()
  236. # Decode a complete input sequence
  237. self.assertEqual(decoder.decode(b'\xa4\xa6'), '\u3046')
  238. pending1, _ = decoder.getstate()
  239. self.assertEqual(pending1, b'')
  240. # Decode first half of a partial input sequence
  241. self.assertEqual(decoder.decode(b'\xa4'), '')
  242. pending2, flags2 = decoder.getstate()
  243. self.assertEqual(pending2, b'\xa4')
  244. # Decode second half of a partial input sequence
  245. self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
  246. pending3, _ = decoder.getstate()
  247. self.assertEqual(pending3, b'')
  248. # Jump back and decode second half of partial input sequence again
  249. decoder.setstate((pending2, flags2))
  250. self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
  251. pending4, _ = decoder.getstate()
  252. self.assertEqual(pending4, b'')
  253. # Ensure state values are preserved correctly
  254. decoder.setstate((b'abc', 123456789))
  255. self.assertEqual(decoder.getstate(), (b'abc', 123456789))
  256. def test_setstate_validates_input(self):
  257. decoder = codecs.getincrementaldecoder('euc_jp')()
  258. self.assertRaises(TypeError, decoder.setstate, 123)
  259. self.assertRaises(TypeError, decoder.setstate, ("invalid", 0))
  260. self.assertRaises(TypeError, decoder.setstate, (b"1234", "invalid"))
  261. self.assertRaises(UnicodeError, decoder.setstate, (b"123456789", 0))
  262. class Test_StreamReader(unittest.TestCase):
  263. def test_bug1728403(self):
  264. try:
  265. f = open(TESTFN, 'wb')
  266. try:
  267. f.write(b'\xa1')
  268. finally:
  269. f.close()
  270. f = codecs.open(TESTFN, encoding='cp949')
  271. try:
  272. self.assertRaises(UnicodeDecodeError, f.read, 2)
  273. finally:
  274. f.close()
  275. finally:
  276. os_helper.unlink(TESTFN)
  277. class Test_StreamWriter(unittest.TestCase):
  278. def test_gb18030(self):
  279. s= io.BytesIO()
  280. c = codecs.getwriter('gb18030')(s)
  281. c.write('123')
  282. self.assertEqual(s.getvalue(), b'123')
  283. c.write('\U00012345')
  284. self.assertEqual(s.getvalue(), b'123\x907\x959')
  285. c.write('\uac00\u00ac')
  286. self.assertEqual(s.getvalue(),
  287. b'123\x907\x959\x827\xcf5\x810\x851')
  288. def test_utf_8(self):
  289. s= io.BytesIO()
  290. c = codecs.getwriter('utf-8')(s)
  291. c.write('123')
  292. self.assertEqual(s.getvalue(), b'123')
  293. c.write('\U00012345')
  294. self.assertEqual(s.getvalue(), b'123\xf0\x92\x8d\x85')
  295. c.write('\uac00\u00ac')
  296. self.assertEqual(s.getvalue(),
  297. b'123\xf0\x92\x8d\x85'
  298. b'\xea\xb0\x80\xc2\xac')
  299. def test_streamwriter_strwrite(self):
  300. s = io.BytesIO()
  301. wr = codecs.getwriter('gb18030')(s)
  302. wr.write('abcd')
  303. self.assertEqual(s.getvalue(), b'abcd')
  304. class Test_ISO2022(unittest.TestCase):
  305. def test_g2(self):
  306. iso2022jp2 = b'\x1b(B:hu4:unit\x1b.A\x1bNi de famille'
  307. uni = ':hu4:unit\xe9 de famille'
  308. self.assertEqual(iso2022jp2.decode('iso2022-jp-2'), uni)
  309. def test_iso2022_jp_g0(self):
  310. self.assertNotIn(b'\x0e', '\N{SOFT HYPHEN}'.encode('iso-2022-jp-2'))
  311. for encoding in ('iso-2022-jp-2004', 'iso-2022-jp-3'):
  312. e = '\u3406'.encode(encoding)
  313. self.assertFalse(any(x > 0x80 for x in e))
  314. def test_bug1572832(self):
  315. for x in range(0x10000, 0x110000):
  316. # Any ISO 2022 codec will cause the segfault
  317. chr(x).encode('iso_2022_jp', 'ignore')
  318. class TestStateful(unittest.TestCase):
  319. text = '\u4E16\u4E16'
  320. encoding = 'iso-2022-jp'
  321. expected = b'\x1b$B@$@$'
  322. reset = b'\x1b(B'
  323. expected_reset = expected + reset
  324. def test_encode(self):
  325. self.assertEqual(self.text.encode(self.encoding), self.expected_reset)
  326. def test_incrementalencoder(self):
  327. encoder = codecs.getincrementalencoder(self.encoding)()
  328. output = b''.join(
  329. encoder.encode(char)
  330. for char in self.text)
  331. self.assertEqual(output, self.expected)
  332. self.assertEqual(encoder.encode('', final=True), self.reset)
  333. self.assertEqual(encoder.encode('', final=True), b'')
  334. def test_incrementalencoder_final(self):
  335. encoder = codecs.getincrementalencoder(self.encoding)()
  336. last_index = len(self.text) - 1
  337. output = b''.join(
  338. encoder.encode(char, index == last_index)
  339. for index, char in enumerate(self.text))
  340. self.assertEqual(output, self.expected_reset)
  341. self.assertEqual(encoder.encode('', final=True), b'')
  342. class TestHZStateful(TestStateful):
  343. text = '\u804a\u804a'
  344. encoding = 'hz'
  345. expected = b'~{ADAD'
  346. reset = b'~}'
  347. expected_reset = expected + reset
  348. if __name__ == "__main__":
  349. unittest.main()