test_io.py 173 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696
  1. """Unit tests for the io module."""
  2. # Tests of io are scattered over the test suite:
  3. # * test_bufio - tests file buffering
  4. # * test_memoryio - tests BytesIO and StringIO
  5. # * test_fileio - tests FileIO
  6. # * test_file - tests the file interface
  7. # * test_io - tests everything else in the io module
  8. # * test_univnewlines - tests universal newline support
  9. # * test_largefile - tests operations on a file greater than 2**32 bytes
  10. # (only enabled with -ulargefile)
  11. ################################################################################
  12. # ATTENTION TEST WRITERS!!!
  13. ################################################################################
  14. # When writing tests for io, it's important to test both the C and Python
  15. # implementations. This is usually done by writing a base test that refers to
  16. # the type it is testing as an attribute. Then it provides custom subclasses to
  17. # test both implementations. This file has lots of examples.
  18. ################################################################################
  19. import abc
  20. import array
  21. import errno
  22. import locale
  23. import os
  24. import pickle
  25. import random
  26. import signal
  27. import sys
  28. import textwrap
  29. import threading
  30. import time
  31. import unittest
  32. import warnings
  33. import weakref
  34. from collections import deque, UserList
  35. from itertools import cycle, count
  36. from test import support
  37. from test.support.script_helper import (
  38. assert_python_ok, assert_python_failure, run_python_until_end)
  39. from test.support import import_helper
  40. from test.support import os_helper
  41. from test.support import threading_helper
  42. from test.support import warnings_helper
  43. from test.support import skip_if_sanitizer
  44. from test.support.os_helper import FakePath
  45. import codecs
  46. import io # C implementation of io
  47. import _pyio as pyio # Python implementation of io
  48. try:
  49. import ctypes
  50. except ImportError:
  51. def byteslike(*pos, **kw):
  52. return array.array("b", bytes(*pos, **kw))
  53. else:
  54. def byteslike(*pos, **kw):
  55. """Create a bytes-like object having no string or sequence methods"""
  56. data = bytes(*pos, **kw)
  57. obj = EmptyStruct()
  58. ctypes.resize(obj, len(data))
  59. memoryview(obj).cast("B")[:] = data
  60. return obj
  61. class EmptyStruct(ctypes.Structure):
  62. pass
  63. # Does io.IOBase finalizer log the exception if the close() method fails?
  64. # The exception is ignored silently by default in release build.
  65. IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
  66. def _default_chunk_size():
  67. """Get the default TextIOWrapper chunk size"""
  68. with open(__file__, "r", encoding="latin-1") as f:
  69. return f._CHUNK_SIZE
  70. requires_alarm = unittest.skipUnless(
  71. hasattr(signal, "alarm"), "test requires signal.alarm()"
  72. )
  73. class MockRawIOWithoutRead:
  74. """A RawIO implementation without read(), so as to exercise the default
  75. RawIO.read() which calls readinto()."""
  76. def __init__(self, read_stack=()):
  77. self._read_stack = list(read_stack)
  78. self._write_stack = []
  79. self._reads = 0
  80. self._extraneous_reads = 0
  81. def write(self, b):
  82. self._write_stack.append(bytes(b))
  83. return len(b)
  84. def writable(self):
  85. return True
  86. def fileno(self):
  87. return 42
  88. def readable(self):
  89. return True
  90. def seekable(self):
  91. return True
  92. def seek(self, pos, whence):
  93. return 0 # wrong but we gotta return something
  94. def tell(self):
  95. return 0 # same comment as above
  96. def readinto(self, buf):
  97. self._reads += 1
  98. max_len = len(buf)
  99. try:
  100. data = self._read_stack[0]
  101. except IndexError:
  102. self._extraneous_reads += 1
  103. return 0
  104. if data is None:
  105. del self._read_stack[0]
  106. return None
  107. n = len(data)
  108. if len(data) <= max_len:
  109. del self._read_stack[0]
  110. buf[:n] = data
  111. return n
  112. else:
  113. buf[:] = data[:max_len]
  114. self._read_stack[0] = data[max_len:]
  115. return max_len
  116. def truncate(self, pos=None):
  117. return pos
  118. class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
  119. pass
  120. class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
  121. pass
  122. class MockRawIO(MockRawIOWithoutRead):
  123. def read(self, n=None):
  124. self._reads += 1
  125. try:
  126. return self._read_stack.pop(0)
  127. except:
  128. self._extraneous_reads += 1
  129. return b""
  130. class CMockRawIO(MockRawIO, io.RawIOBase):
  131. pass
  132. class PyMockRawIO(MockRawIO, pyio.RawIOBase):
  133. pass
  134. class MisbehavedRawIO(MockRawIO):
  135. def write(self, b):
  136. return super().write(b) * 2
  137. def read(self, n=None):
  138. return super().read(n) * 2
  139. def seek(self, pos, whence):
  140. return -123
  141. def tell(self):
  142. return -456
  143. def readinto(self, buf):
  144. super().readinto(buf)
  145. return len(buf) * 5
  146. class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
  147. pass
  148. class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
  149. pass
  150. class SlowFlushRawIO(MockRawIO):
  151. def __init__(self):
  152. super().__init__()
  153. self.in_flush = threading.Event()
  154. def flush(self):
  155. self.in_flush.set()
  156. time.sleep(0.25)
  157. class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
  158. pass
  159. class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
  160. pass
  161. class CloseFailureIO(MockRawIO):
  162. closed = 0
  163. def close(self):
  164. if not self.closed:
  165. self.closed = 1
  166. raise OSError
  167. class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
  168. pass
  169. class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
  170. pass
  171. class MockFileIO:
  172. def __init__(self, data):
  173. self.read_history = []
  174. super().__init__(data)
  175. def read(self, n=None):
  176. res = super().read(n)
  177. self.read_history.append(None if res is None else len(res))
  178. return res
  179. def readinto(self, b):
  180. res = super().readinto(b)
  181. self.read_history.append(res)
  182. return res
  183. class CMockFileIO(MockFileIO, io.BytesIO):
  184. pass
  185. class PyMockFileIO(MockFileIO, pyio.BytesIO):
  186. pass
  187. class MockUnseekableIO:
  188. def seekable(self):
  189. return False
  190. def seek(self, *args):
  191. raise self.UnsupportedOperation("not seekable")
  192. def tell(self, *args):
  193. raise self.UnsupportedOperation("not seekable")
  194. def truncate(self, *args):
  195. raise self.UnsupportedOperation("not seekable")
  196. class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
  197. UnsupportedOperation = io.UnsupportedOperation
  198. class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
  199. UnsupportedOperation = pyio.UnsupportedOperation
  200. class MockNonBlockWriterIO:
  201. def __init__(self):
  202. self._write_stack = []
  203. self._blocker_char = None
  204. def pop_written(self):
  205. s = b"".join(self._write_stack)
  206. self._write_stack[:] = []
  207. return s
  208. def block_on(self, char):
  209. """Block when a given char is encountered."""
  210. self._blocker_char = char
  211. def readable(self):
  212. return True
  213. def seekable(self):
  214. return True
  215. def seek(self, pos, whence=0):
  216. # naive implementation, enough for tests
  217. return 0
  218. def writable(self):
  219. return True
  220. def write(self, b):
  221. b = bytes(b)
  222. n = -1
  223. if self._blocker_char:
  224. try:
  225. n = b.index(self._blocker_char)
  226. except ValueError:
  227. pass
  228. else:
  229. if n > 0:
  230. # write data up to the first blocker
  231. self._write_stack.append(b[:n])
  232. return n
  233. else:
  234. # cancel blocker and indicate would block
  235. self._blocker_char = None
  236. return None
  237. self._write_stack.append(b)
  238. return len(b)
  239. class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
  240. BlockingIOError = io.BlockingIOError
  241. class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
  242. BlockingIOError = pyio.BlockingIOError
  243. class IOTest(unittest.TestCase):
  244. def setUp(self):
  245. os_helper.unlink(os_helper.TESTFN)
  246. def tearDown(self):
  247. os_helper.unlink(os_helper.TESTFN)
  248. def write_ops(self, f):
  249. self.assertEqual(f.write(b"blah."), 5)
  250. f.truncate(0)
  251. self.assertEqual(f.tell(), 5)
  252. f.seek(0)
  253. self.assertEqual(f.write(b"blah."), 5)
  254. self.assertEqual(f.seek(0), 0)
  255. self.assertEqual(f.write(b"Hello."), 6)
  256. self.assertEqual(f.tell(), 6)
  257. self.assertEqual(f.seek(-1, 1), 5)
  258. self.assertEqual(f.tell(), 5)
  259. buffer = bytearray(b" world\n\n\n")
  260. self.assertEqual(f.write(buffer), 9)
  261. buffer[:] = b"*" * 9 # Overwrite our copy of the data
  262. self.assertEqual(f.seek(0), 0)
  263. self.assertEqual(f.write(b"h"), 1)
  264. self.assertEqual(f.seek(-1, 2), 13)
  265. self.assertEqual(f.tell(), 13)
  266. self.assertEqual(f.truncate(12), 12)
  267. self.assertEqual(f.tell(), 13)
  268. self.assertRaises(TypeError, f.seek, 0.0)
  269. def read_ops(self, f, buffered=False):
  270. data = f.read(5)
  271. self.assertEqual(data, b"hello")
  272. data = byteslike(data)
  273. self.assertEqual(f.readinto(data), 5)
  274. self.assertEqual(bytes(data), b" worl")
  275. data = bytearray(5)
  276. self.assertEqual(f.readinto(data), 2)
  277. self.assertEqual(len(data), 5)
  278. self.assertEqual(data[:2], b"d\n")
  279. self.assertEqual(f.seek(0), 0)
  280. self.assertEqual(f.read(20), b"hello world\n")
  281. self.assertEqual(f.read(1), b"")
  282. self.assertEqual(f.readinto(byteslike(b"x")), 0)
  283. self.assertEqual(f.seek(-6, 2), 6)
  284. self.assertEqual(f.read(5), b"world")
  285. self.assertEqual(f.read(0), b"")
  286. self.assertEqual(f.readinto(byteslike()), 0)
  287. self.assertEqual(f.seek(-6, 1), 5)
  288. self.assertEqual(f.read(5), b" worl")
  289. self.assertEqual(f.tell(), 10)
  290. self.assertRaises(TypeError, f.seek, 0.0)
  291. if buffered:
  292. f.seek(0)
  293. self.assertEqual(f.read(), b"hello world\n")
  294. f.seek(6)
  295. self.assertEqual(f.read(), b"world\n")
  296. self.assertEqual(f.read(), b"")
  297. f.seek(0)
  298. data = byteslike(5)
  299. self.assertEqual(f.readinto1(data), 5)
  300. self.assertEqual(bytes(data), b"hello")
  301. LARGE = 2**31
  302. def large_file_ops(self, f):
  303. assert f.readable()
  304. assert f.writable()
  305. try:
  306. self.assertEqual(f.seek(self.LARGE), self.LARGE)
  307. except (OverflowError, ValueError):
  308. self.skipTest("no largefile support")
  309. self.assertEqual(f.tell(), self.LARGE)
  310. self.assertEqual(f.write(b"xxx"), 3)
  311. self.assertEqual(f.tell(), self.LARGE + 3)
  312. self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
  313. self.assertEqual(f.truncate(), self.LARGE + 2)
  314. self.assertEqual(f.tell(), self.LARGE + 2)
  315. self.assertEqual(f.seek(0, 2), self.LARGE + 2)
  316. self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
  317. self.assertEqual(f.tell(), self.LARGE + 2)
  318. self.assertEqual(f.seek(0, 2), self.LARGE + 1)
  319. self.assertEqual(f.seek(-1, 2), self.LARGE)
  320. self.assertEqual(f.read(2), b"x")
  321. def test_invalid_operations(self):
  322. # Try writing on a file opened in read mode and vice-versa.
  323. exc = self.UnsupportedOperation
  324. with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
  325. self.assertRaises(exc, fp.read)
  326. self.assertRaises(exc, fp.readline)
  327. with self.open(os_helper.TESTFN, "wb") as fp:
  328. self.assertRaises(exc, fp.read)
  329. self.assertRaises(exc, fp.readline)
  330. with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
  331. self.assertRaises(exc, fp.read)
  332. self.assertRaises(exc, fp.readline)
  333. with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
  334. self.assertRaises(exc, fp.write, b"blah")
  335. self.assertRaises(exc, fp.writelines, [b"blah\n"])
  336. with self.open(os_helper.TESTFN, "rb") as fp:
  337. self.assertRaises(exc, fp.write, b"blah")
  338. self.assertRaises(exc, fp.writelines, [b"blah\n"])
  339. with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
  340. self.assertRaises(exc, fp.write, "blah")
  341. self.assertRaises(exc, fp.writelines, ["blah\n"])
  342. # Non-zero seeking from current or end pos
  343. self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
  344. self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
  345. @unittest.skipIf(
  346. support.is_emscripten, "fstat() of a pipe fd is not supported"
  347. )
  348. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  349. def test_optional_abilities(self):
  350. # Test for OSError when optional APIs are not supported
  351. # The purpose of this test is to try fileno(), reading, writing and
  352. # seeking operations with various objects that indicate they do not
  353. # support these operations.
  354. def pipe_reader():
  355. [r, w] = os.pipe()
  356. os.close(w) # So that read() is harmless
  357. return self.FileIO(r, "r")
  358. def pipe_writer():
  359. [r, w] = os.pipe()
  360. self.addCleanup(os.close, r)
  361. # Guarantee that we can write into the pipe without blocking
  362. thread = threading.Thread(target=os.read, args=(r, 100))
  363. thread.start()
  364. self.addCleanup(thread.join)
  365. return self.FileIO(w, "w")
  366. def buffered_reader():
  367. return self.BufferedReader(self.MockUnseekableIO())
  368. def buffered_writer():
  369. return self.BufferedWriter(self.MockUnseekableIO())
  370. def buffered_random():
  371. return self.BufferedRandom(self.BytesIO())
  372. def buffered_rw_pair():
  373. return self.BufferedRWPair(self.MockUnseekableIO(),
  374. self.MockUnseekableIO())
  375. def text_reader():
  376. class UnseekableReader(self.MockUnseekableIO):
  377. writable = self.BufferedIOBase.writable
  378. write = self.BufferedIOBase.write
  379. return self.TextIOWrapper(UnseekableReader(), "ascii")
  380. def text_writer():
  381. class UnseekableWriter(self.MockUnseekableIO):
  382. readable = self.BufferedIOBase.readable
  383. read = self.BufferedIOBase.read
  384. return self.TextIOWrapper(UnseekableWriter(), "ascii")
  385. tests = (
  386. (pipe_reader, "fr"), (pipe_writer, "fw"),
  387. (buffered_reader, "r"), (buffered_writer, "w"),
  388. (buffered_random, "rws"), (buffered_rw_pair, "rw"),
  389. (text_reader, "r"), (text_writer, "w"),
  390. (self.BytesIO, "rws"), (self.StringIO, "rws"),
  391. )
  392. for [test, abilities] in tests:
  393. with self.subTest(test), test() as obj:
  394. readable = "r" in abilities
  395. self.assertEqual(obj.readable(), readable)
  396. writable = "w" in abilities
  397. self.assertEqual(obj.writable(), writable)
  398. if isinstance(obj, self.TextIOBase):
  399. data = "3"
  400. elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
  401. data = b"3"
  402. else:
  403. self.fail("Unknown base class")
  404. if "f" in abilities:
  405. obj.fileno()
  406. else:
  407. self.assertRaises(OSError, obj.fileno)
  408. if readable:
  409. obj.read(1)
  410. obj.read()
  411. else:
  412. self.assertRaises(OSError, obj.read, 1)
  413. self.assertRaises(OSError, obj.read)
  414. if writable:
  415. obj.write(data)
  416. else:
  417. self.assertRaises(OSError, obj.write, data)
  418. if sys.platform.startswith("win") and test in (
  419. pipe_reader, pipe_writer):
  420. # Pipes seem to appear as seekable on Windows
  421. continue
  422. seekable = "s" in abilities
  423. self.assertEqual(obj.seekable(), seekable)
  424. if seekable:
  425. obj.tell()
  426. obj.seek(0)
  427. else:
  428. self.assertRaises(OSError, obj.tell)
  429. self.assertRaises(OSError, obj.seek, 0)
  430. if writable and seekable:
  431. obj.truncate()
  432. obj.truncate(0)
  433. else:
  434. self.assertRaises(OSError, obj.truncate)
  435. self.assertRaises(OSError, obj.truncate, 0)
  436. def test_open_handles_NUL_chars(self):
  437. fn_with_NUL = 'foo\0bar'
  438. self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
  439. bytes_fn = bytes(fn_with_NUL, 'ascii')
  440. with warnings.catch_warnings():
  441. warnings.simplefilter("ignore", DeprecationWarning)
  442. self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
  443. def test_raw_file_io(self):
  444. with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
  445. self.assertEqual(f.readable(), False)
  446. self.assertEqual(f.writable(), True)
  447. self.assertEqual(f.seekable(), True)
  448. self.write_ops(f)
  449. with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
  450. self.assertEqual(f.readable(), True)
  451. self.assertEqual(f.writable(), False)
  452. self.assertEqual(f.seekable(), True)
  453. self.read_ops(f)
  454. def test_buffered_file_io(self):
  455. with self.open(os_helper.TESTFN, "wb") as f:
  456. self.assertEqual(f.readable(), False)
  457. self.assertEqual(f.writable(), True)
  458. self.assertEqual(f.seekable(), True)
  459. self.write_ops(f)
  460. with self.open(os_helper.TESTFN, "rb") as f:
  461. self.assertEqual(f.readable(), True)
  462. self.assertEqual(f.writable(), False)
  463. self.assertEqual(f.seekable(), True)
  464. self.read_ops(f, True)
  465. def test_readline(self):
  466. with self.open(os_helper.TESTFN, "wb") as f:
  467. f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
  468. with self.open(os_helper.TESTFN, "rb") as f:
  469. self.assertEqual(f.readline(), b"abc\n")
  470. self.assertEqual(f.readline(10), b"def\n")
  471. self.assertEqual(f.readline(2), b"xy")
  472. self.assertEqual(f.readline(4), b"zzy\n")
  473. self.assertEqual(f.readline(), b"foo\x00bar\n")
  474. self.assertEqual(f.readline(None), b"another line")
  475. self.assertRaises(TypeError, f.readline, 5.3)
  476. with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
  477. self.assertRaises(TypeError, f.readline, 5.3)
  478. def test_readline_nonsizeable(self):
  479. # Issue #30061
  480. # Crash when readline() returns an object without __len__
  481. class R(self.IOBase):
  482. def readline(self):
  483. return None
  484. self.assertRaises((TypeError, StopIteration), next, R())
  485. def test_next_nonsizeable(self):
  486. # Issue #30061
  487. # Crash when __next__() returns an object without __len__
  488. class R(self.IOBase):
  489. def __next__(self):
  490. return None
  491. self.assertRaises(TypeError, R().readlines, 1)
  492. def test_raw_bytes_io(self):
  493. f = self.BytesIO()
  494. self.write_ops(f)
  495. data = f.getvalue()
  496. self.assertEqual(data, b"hello world\n")
  497. f = self.BytesIO(data)
  498. self.read_ops(f, True)
  499. def test_large_file_ops(self):
  500. # On Windows and Mac OSX this test consumes large resources; It takes
  501. # a long time to build the >2 GiB file and takes >2 GiB of disk space
  502. # therefore the resource must be enabled to run this test.
  503. if sys.platform[:3] == 'win' or sys.platform == 'darwin':
  504. support.requires(
  505. 'largefile',
  506. 'test requires %s bytes and a long time to run' % self.LARGE)
  507. with self.open(os_helper.TESTFN, "w+b", 0) as f:
  508. self.large_file_ops(f)
  509. with self.open(os_helper.TESTFN, "w+b") as f:
  510. self.large_file_ops(f)
  511. def test_with_open(self):
  512. for bufsize in (0, 100):
  513. f = None
  514. with self.open(os_helper.TESTFN, "wb", bufsize) as f:
  515. f.write(b"xxx")
  516. self.assertEqual(f.closed, True)
  517. f = None
  518. try:
  519. with self.open(os_helper.TESTFN, "wb", bufsize) as f:
  520. 1/0
  521. except ZeroDivisionError:
  522. self.assertEqual(f.closed, True)
  523. else:
  524. self.fail("1/0 didn't raise an exception")
  525. # issue 5008
  526. def test_append_mode_tell(self):
  527. with self.open(os_helper.TESTFN, "wb") as f:
  528. f.write(b"xxx")
  529. with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
  530. self.assertEqual(f.tell(), 3)
  531. with self.open(os_helper.TESTFN, "ab") as f:
  532. self.assertEqual(f.tell(), 3)
  533. with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
  534. self.assertGreater(f.tell(), 0)
  535. def test_destructor(self):
  536. record = []
  537. class MyFileIO(self.FileIO):
  538. def __del__(self):
  539. record.append(1)
  540. try:
  541. f = super().__del__
  542. except AttributeError:
  543. pass
  544. else:
  545. f()
  546. def close(self):
  547. record.append(2)
  548. super().close()
  549. def flush(self):
  550. record.append(3)
  551. super().flush()
  552. with warnings_helper.check_warnings(('', ResourceWarning)):
  553. f = MyFileIO(os_helper.TESTFN, "wb")
  554. f.write(b"xxx")
  555. del f
  556. support.gc_collect()
  557. self.assertEqual(record, [1, 2, 3])
  558. with self.open(os_helper.TESTFN, "rb") as f:
  559. self.assertEqual(f.read(), b"xxx")
  560. def _check_base_destructor(self, base):
  561. record = []
  562. class MyIO(base):
  563. def __init__(self):
  564. # This exercises the availability of attributes on object
  565. # destruction.
  566. # (in the C version, close() is called by the tp_dealloc
  567. # function, not by __del__)
  568. self.on_del = 1
  569. self.on_close = 2
  570. self.on_flush = 3
  571. def __del__(self):
  572. record.append(self.on_del)
  573. try:
  574. f = super().__del__
  575. except AttributeError:
  576. pass
  577. else:
  578. f()
  579. def close(self):
  580. record.append(self.on_close)
  581. super().close()
  582. def flush(self):
  583. record.append(self.on_flush)
  584. super().flush()
  585. f = MyIO()
  586. del f
  587. support.gc_collect()
  588. self.assertEqual(record, [1, 2, 3])
  589. def test_IOBase_destructor(self):
  590. self._check_base_destructor(self.IOBase)
  591. def test_RawIOBase_destructor(self):
  592. self._check_base_destructor(self.RawIOBase)
  593. def test_BufferedIOBase_destructor(self):
  594. self._check_base_destructor(self.BufferedIOBase)
  595. def test_TextIOBase_destructor(self):
  596. self._check_base_destructor(self.TextIOBase)
  597. def test_close_flushes(self):
  598. with self.open(os_helper.TESTFN, "wb") as f:
  599. f.write(b"xxx")
  600. with self.open(os_helper.TESTFN, "rb") as f:
  601. self.assertEqual(f.read(), b"xxx")
  602. def test_array_writes(self):
  603. a = array.array('i', range(10))
  604. n = len(a.tobytes())
  605. def check(f):
  606. with f:
  607. self.assertEqual(f.write(a), n)
  608. f.writelines((a,))
  609. check(self.BytesIO())
  610. check(self.FileIO(os_helper.TESTFN, "w"))
  611. check(self.BufferedWriter(self.MockRawIO()))
  612. check(self.BufferedRandom(self.MockRawIO()))
  613. check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
  614. def test_closefd(self):
  615. self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
  616. encoding="utf-8", closefd=False)
  617. def test_read_closed(self):
  618. with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
  619. f.write("egg\n")
  620. with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
  621. file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
  622. self.assertEqual(file.read(), "egg\n")
  623. file.seek(0)
  624. file.close()
  625. self.assertRaises(ValueError, file.read)
  626. with self.open(os_helper.TESTFN, "rb") as f:
  627. file = self.open(f.fileno(), "rb", closefd=False)
  628. self.assertEqual(file.read()[:3], b"egg")
  629. file.close()
  630. self.assertRaises(ValueError, file.readinto, bytearray(1))
  631. def test_no_closefd_with_filename(self):
  632. # can't use closefd in combination with a file name
  633. self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
  634. encoding="utf-8", closefd=False)
  635. def test_closefd_attr(self):
  636. with self.open(os_helper.TESTFN, "wb") as f:
  637. f.write(b"egg\n")
  638. with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
  639. self.assertEqual(f.buffer.raw.closefd, True)
  640. file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
  641. self.assertEqual(file.buffer.raw.closefd, False)
  642. def test_garbage_collection(self):
  643. # FileIO objects are collected, and collecting them flushes
  644. # all data to disk.
  645. with warnings_helper.check_warnings(('', ResourceWarning)):
  646. f = self.FileIO(os_helper.TESTFN, "wb")
  647. f.write(b"abcxxx")
  648. f.f = f
  649. wr = weakref.ref(f)
  650. del f
  651. support.gc_collect()
  652. self.assertIsNone(wr(), wr)
  653. with self.open(os_helper.TESTFN, "rb") as f:
  654. self.assertEqual(f.read(), b"abcxxx")
  655. def test_unbounded_file(self):
  656. # Issue #1174606: reading from an unbounded stream such as /dev/zero.
  657. zero = "/dev/zero"
  658. if not os.path.exists(zero):
  659. self.skipTest("{0} does not exist".format(zero))
  660. if sys.maxsize > 0x7FFFFFFF:
  661. self.skipTest("test can only run in a 32-bit address space")
  662. if support.real_max_memuse < support._2G:
  663. self.skipTest("test requires at least 2 GiB of memory")
  664. with self.open(zero, "rb", buffering=0) as f:
  665. self.assertRaises(OverflowError, f.read)
  666. with self.open(zero, "rb") as f:
  667. self.assertRaises(OverflowError, f.read)
  668. with self.open(zero, "r") as f:
  669. self.assertRaises(OverflowError, f.read)
  670. def check_flush_error_on_close(self, *args, **kwargs):
  671. # Test that the file is closed despite failed flush
  672. # and that flush() is called before file closed.
  673. f = self.open(*args, **kwargs)
  674. closed = []
  675. def bad_flush():
  676. closed[:] = [f.closed]
  677. raise OSError()
  678. f.flush = bad_flush
  679. self.assertRaises(OSError, f.close) # exception not swallowed
  680. self.assertTrue(f.closed)
  681. self.assertTrue(closed) # flush() called
  682. self.assertFalse(closed[0]) # flush() called before file closed
  683. f.flush = lambda: None # break reference loop
  684. def test_flush_error_on_close(self):
  685. # raw file
  686. # Issue #5700: io.FileIO calls flush() after file closed
  687. self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
  688. fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
  689. self.check_flush_error_on_close(fd, 'wb', buffering=0)
  690. fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
  691. self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
  692. os.close(fd)
  693. # buffered io
  694. self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
  695. fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
  696. self.check_flush_error_on_close(fd, 'wb')
  697. fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
  698. self.check_flush_error_on_close(fd, 'wb', closefd=False)
  699. os.close(fd)
  700. # text io
  701. self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
  702. fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
  703. self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
  704. fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
  705. self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
  706. os.close(fd)
  707. def test_multi_close(self):
  708. f = self.open(os_helper.TESTFN, "wb", buffering=0)
  709. f.close()
  710. f.close()
  711. f.close()
  712. self.assertRaises(ValueError, f.flush)
  713. def test_RawIOBase_read(self):
  714. # Exercise the default limited RawIOBase.read(n) implementation (which
  715. # calls readinto() internally).
  716. rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
  717. self.assertEqual(rawio.read(2), b"ab")
  718. self.assertEqual(rawio.read(2), b"c")
  719. self.assertEqual(rawio.read(2), b"d")
  720. self.assertEqual(rawio.read(2), None)
  721. self.assertEqual(rawio.read(2), b"ef")
  722. self.assertEqual(rawio.read(2), b"g")
  723. self.assertEqual(rawio.read(2), None)
  724. self.assertEqual(rawio.read(2), b"")
  725. def test_types_have_dict(self):
  726. test = (
  727. self.IOBase(),
  728. self.RawIOBase(),
  729. self.TextIOBase(),
  730. self.StringIO(),
  731. self.BytesIO()
  732. )
  733. for obj in test:
  734. self.assertTrue(hasattr(obj, "__dict__"))
  735. def test_opener(self):
  736. with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
  737. f.write("egg\n")
  738. fd = os.open(os_helper.TESTFN, os.O_RDONLY)
  739. def opener(path, flags):
  740. return fd
  741. with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
  742. self.assertEqual(f.read(), "egg\n")
  743. def test_bad_opener_negative_1(self):
  744. # Issue #27066.
  745. def badopener(fname, flags):
  746. return -1
  747. with self.assertRaises(ValueError) as cm:
  748. open('non-existent', 'r', opener=badopener)
  749. self.assertEqual(str(cm.exception), 'opener returned -1')
  750. def test_bad_opener_other_negative(self):
  751. # Issue #27066.
  752. def badopener(fname, flags):
  753. return -2
  754. with self.assertRaises(ValueError) as cm:
  755. open('non-existent', 'r', opener=badopener)
  756. self.assertEqual(str(cm.exception), 'opener returned -2')
  757. def test_opener_invalid_fd(self):
  758. # Check that OSError is raised with error code EBADF if the
  759. # opener returns an invalid file descriptor (see gh-82212).
  760. fd = os_helper.make_bad_fd()
  761. with self.assertRaises(OSError) as cm:
  762. self.open('foo', opener=lambda name, flags: fd)
  763. self.assertEqual(cm.exception.errno, errno.EBADF)
  764. def test_fileio_closefd(self):
  765. # Issue #4841
  766. with self.open(__file__, 'rb') as f1, \
  767. self.open(__file__, 'rb') as f2:
  768. fileio = self.FileIO(f1.fileno(), closefd=False)
  769. # .__init__() must not close f1
  770. fileio.__init__(f2.fileno(), closefd=False)
  771. f1.readline()
  772. # .close() must not close f2
  773. fileio.close()
  774. f2.readline()
  775. def test_nonbuffered_textio(self):
  776. with warnings_helper.check_no_resource_warning(self):
  777. with self.assertRaises(ValueError):
  778. self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
  779. def test_invalid_newline(self):
  780. with warnings_helper.check_no_resource_warning(self):
  781. with self.assertRaises(ValueError):
  782. self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
  783. def test_buffered_readinto_mixin(self):
  784. # Test the implementation provided by BufferedIOBase
  785. class Stream(self.BufferedIOBase):
  786. def read(self, size):
  787. return b"12345"
  788. read1 = read
  789. stream = Stream()
  790. for method in ("readinto", "readinto1"):
  791. with self.subTest(method):
  792. buffer = byteslike(5)
  793. self.assertEqual(getattr(stream, method)(buffer), 5)
  794. self.assertEqual(bytes(buffer), b"12345")
  795. def test_fspath_support(self):
  796. def check_path_succeeds(path):
  797. with self.open(path, "w", encoding="utf-8") as f:
  798. f.write("egg\n")
  799. with self.open(path, "r", encoding="utf-8") as f:
  800. self.assertEqual(f.read(), "egg\n")
  801. check_path_succeeds(FakePath(os_helper.TESTFN))
  802. check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
  803. with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
  804. bad_path = FakePath(f.fileno())
  805. with self.assertRaises(TypeError):
  806. self.open(bad_path, 'w', encoding="utf-8")
  807. bad_path = FakePath(None)
  808. with self.assertRaises(TypeError):
  809. self.open(bad_path, 'w', encoding="utf-8")
  810. bad_path = FakePath(FloatingPointError)
  811. with self.assertRaises(FloatingPointError):
  812. self.open(bad_path, 'w', encoding="utf-8")
  813. # ensure that refcounting is correct with some error conditions
  814. with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
  815. self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
  816. def test_RawIOBase_readall(self):
  817. # Exercise the default unlimited RawIOBase.read() and readall()
  818. # implementations.
  819. rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
  820. self.assertEqual(rawio.read(), b"abcdefg")
  821. rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
  822. self.assertEqual(rawio.readall(), b"abcdefg")
  823. def test_BufferedIOBase_readinto(self):
  824. # Exercise the default BufferedIOBase.readinto() and readinto1()
  825. # implementations (which call read() or read1() internally).
  826. class Reader(self.BufferedIOBase):
  827. def __init__(self, avail):
  828. self.avail = avail
  829. def read(self, size):
  830. result = self.avail[:size]
  831. self.avail = self.avail[size:]
  832. return result
  833. def read1(self, size):
  834. """Returns no more than 5 bytes at once"""
  835. return self.read(min(size, 5))
  836. tests = (
  837. # (test method, total data available, read buffer size, expected
  838. # read size)
  839. ("readinto", 10, 5, 5),
  840. ("readinto", 10, 6, 6), # More than read1() can return
  841. ("readinto", 5, 6, 5), # Buffer larger than total available
  842. ("readinto", 6, 7, 6),
  843. ("readinto", 10, 0, 0), # Empty buffer
  844. ("readinto1", 10, 5, 5), # Result limited to single read1() call
  845. ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
  846. ("readinto1", 5, 6, 5), # Buffer larger than total available
  847. ("readinto1", 6, 7, 5),
  848. ("readinto1", 10, 0, 0), # Empty buffer
  849. )
  850. UNUSED_BYTE = 0x81
  851. for test in tests:
  852. with self.subTest(test):
  853. method, avail, request, result = test
  854. reader = Reader(bytes(range(avail)))
  855. buffer = bytearray((UNUSED_BYTE,) * request)
  856. method = getattr(reader, method)
  857. self.assertEqual(method(buffer), result)
  858. self.assertEqual(len(buffer), request)
  859. self.assertSequenceEqual(buffer[:result], range(result))
  860. unused = (UNUSED_BYTE,) * (request - result)
  861. self.assertSequenceEqual(buffer[result:], unused)
  862. self.assertEqual(len(reader.avail), avail - result)
  863. def test_close_assert(self):
  864. class R(self.IOBase):
  865. def __setattr__(self, name, value):
  866. pass
  867. def flush(self):
  868. raise OSError()
  869. f = R()
  870. # This would cause an assertion failure.
  871. self.assertRaises(OSError, f.close)
  872. # Silence destructor error
  873. R.flush = lambda self: None
  874. class CIOTest(IOTest):
  875. def test_IOBase_finalize(self):
  876. # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
  877. # class which inherits IOBase and an object of this class are caught
  878. # in a reference cycle and close() is already in the method cache.
  879. class MyIO(self.IOBase):
  880. def close(self):
  881. pass
  882. # create an instance to populate the method cache
  883. MyIO()
  884. obj = MyIO()
  885. obj.obj = obj
  886. wr = weakref.ref(obj)
  887. del MyIO
  888. del obj
  889. support.gc_collect()
  890. self.assertIsNone(wr(), wr)
  891. class PyIOTest(IOTest):
  892. pass
  893. @support.cpython_only
  894. class APIMismatchTest(unittest.TestCase):
  895. def test_RawIOBase_io_in_pyio_match(self):
  896. """Test that pyio RawIOBase class has all c RawIOBase methods"""
  897. mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
  898. ignore=('__weakref__',))
  899. self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
  900. def test_RawIOBase_pyio_in_io_match(self):
  901. """Test that c RawIOBase class has all pyio RawIOBase methods"""
  902. mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
  903. self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
  904. class CommonBufferedTests:
  905. # Tests common to BufferedReader, BufferedWriter and BufferedRandom
  906. def test_detach(self):
  907. raw = self.MockRawIO()
  908. buf = self.tp(raw)
  909. self.assertIs(buf.detach(), raw)
  910. self.assertRaises(ValueError, buf.detach)
  911. repr(buf) # Should still work
  912. def test_fileno(self):
  913. rawio = self.MockRawIO()
  914. bufio = self.tp(rawio)
  915. self.assertEqual(42, bufio.fileno())
  916. def test_invalid_args(self):
  917. rawio = self.MockRawIO()
  918. bufio = self.tp(rawio)
  919. # Invalid whence
  920. self.assertRaises(ValueError, bufio.seek, 0, -1)
  921. self.assertRaises(ValueError, bufio.seek, 0, 9)
  922. def test_override_destructor(self):
  923. tp = self.tp
  924. record = []
  925. class MyBufferedIO(tp):
  926. def __del__(self):
  927. record.append(1)
  928. try:
  929. f = super().__del__
  930. except AttributeError:
  931. pass
  932. else:
  933. f()
  934. def close(self):
  935. record.append(2)
  936. super().close()
  937. def flush(self):
  938. record.append(3)
  939. super().flush()
  940. rawio = self.MockRawIO()
  941. bufio = MyBufferedIO(rawio)
  942. del bufio
  943. support.gc_collect()
  944. self.assertEqual(record, [1, 2, 3])
  945. def test_context_manager(self):
  946. # Test usability as a context manager
  947. rawio = self.MockRawIO()
  948. bufio = self.tp(rawio)
  949. def _with():
  950. with bufio:
  951. pass
  952. _with()
  953. # bufio should now be closed, and using it a second time should raise
  954. # a ValueError.
  955. self.assertRaises(ValueError, _with)
  956. def test_error_through_destructor(self):
  957. # Test that the exception state is not modified by a destructor,
  958. # even if close() fails.
  959. rawio = self.CloseFailureIO()
  960. with support.catch_unraisable_exception() as cm:
  961. with self.assertRaises(AttributeError):
  962. self.tp(rawio).xyzzy
  963. if not IOBASE_EMITS_UNRAISABLE:
  964. self.assertIsNone(cm.unraisable)
  965. elif cm.unraisable is not None:
  966. self.assertEqual(cm.unraisable.exc_type, OSError)
  967. def test_repr(self):
  968. raw = self.MockRawIO()
  969. b = self.tp(raw)
  970. clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
  971. self.assertRegex(repr(b), "<%s>" % clsname)
  972. raw.name = "dummy"
  973. self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
  974. raw.name = b"dummy"
  975. self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
  976. def test_recursive_repr(self):
  977. # Issue #25455
  978. raw = self.MockRawIO()
  979. b = self.tp(raw)
  980. with support.swap_attr(raw, 'name', b):
  981. try:
  982. repr(b) # Should not crash
  983. except RuntimeError:
  984. pass
  985. def test_flush_error_on_close(self):
  986. # Test that buffered file is closed despite failed flush
  987. # and that flush() is called before file closed.
  988. raw = self.MockRawIO()
  989. closed = []
  990. def bad_flush():
  991. closed[:] = [b.closed, raw.closed]
  992. raise OSError()
  993. raw.flush = bad_flush
  994. b = self.tp(raw)
  995. self.assertRaises(OSError, b.close) # exception not swallowed
  996. self.assertTrue(b.closed)
  997. self.assertTrue(raw.closed)
  998. self.assertTrue(closed) # flush() called
  999. self.assertFalse(closed[0]) # flush() called before file closed
  1000. self.assertFalse(closed[1])
  1001. raw.flush = lambda: None # break reference loop
  1002. def test_close_error_on_close(self):
  1003. raw = self.MockRawIO()
  1004. def bad_flush():
  1005. raise OSError('flush')
  1006. def bad_close():
  1007. raise OSError('close')
  1008. raw.close = bad_close
  1009. b = self.tp(raw)
  1010. b.flush = bad_flush
  1011. with self.assertRaises(OSError) as err: # exception not swallowed
  1012. b.close()
  1013. self.assertEqual(err.exception.args, ('close',))
  1014. self.assertIsInstance(err.exception.__context__, OSError)
  1015. self.assertEqual(err.exception.__context__.args, ('flush',))
  1016. self.assertFalse(b.closed)
  1017. # Silence destructor error
  1018. raw.close = lambda: None
  1019. b.flush = lambda: None
  1020. def test_nonnormalized_close_error_on_close(self):
  1021. # Issue #21677
  1022. raw = self.MockRawIO()
  1023. def bad_flush():
  1024. raise non_existing_flush
  1025. def bad_close():
  1026. raise non_existing_close
  1027. raw.close = bad_close
  1028. b = self.tp(raw)
  1029. b.flush = bad_flush
  1030. with self.assertRaises(NameError) as err: # exception not swallowed
  1031. b.close()
  1032. self.assertIn('non_existing_close', str(err.exception))
  1033. self.assertIsInstance(err.exception.__context__, NameError)
  1034. self.assertIn('non_existing_flush', str(err.exception.__context__))
  1035. self.assertFalse(b.closed)
  1036. # Silence destructor error
  1037. b.flush = lambda: None
  1038. raw.close = lambda: None
  1039. def test_multi_close(self):
  1040. raw = self.MockRawIO()
  1041. b = self.tp(raw)
  1042. b.close()
  1043. b.close()
  1044. b.close()
  1045. self.assertRaises(ValueError, b.flush)
  1046. def test_unseekable(self):
  1047. bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
  1048. self.assertRaises(self.UnsupportedOperation, bufio.tell)
  1049. self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
  1050. def test_readonly_attributes(self):
  1051. raw = self.MockRawIO()
  1052. buf = self.tp(raw)
  1053. x = self.MockRawIO()
  1054. with self.assertRaises(AttributeError):
  1055. buf.raw = x
  1056. class SizeofTest:
  1057. @support.cpython_only
  1058. def test_sizeof(self):
  1059. bufsize1 = 4096
  1060. bufsize2 = 8192
  1061. rawio = self.MockRawIO()
  1062. bufio = self.tp(rawio, buffer_size=bufsize1)
  1063. size = sys.getsizeof(bufio) - bufsize1
  1064. rawio = self.MockRawIO()
  1065. bufio = self.tp(rawio, buffer_size=bufsize2)
  1066. self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
  1067. @support.cpython_only
  1068. def test_buffer_freeing(self) :
  1069. bufsize = 4096
  1070. rawio = self.MockRawIO()
  1071. bufio = self.tp(rawio, buffer_size=bufsize)
  1072. size = sys.getsizeof(bufio) - bufsize
  1073. bufio.close()
  1074. self.assertEqual(sys.getsizeof(bufio), size)
  1075. class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
  1076. read_mode = "rb"
  1077. def test_constructor(self):
  1078. rawio = self.MockRawIO([b"abc"])
  1079. bufio = self.tp(rawio)
  1080. bufio.__init__(rawio)
  1081. bufio.__init__(rawio, buffer_size=1024)
  1082. bufio.__init__(rawio, buffer_size=16)
  1083. self.assertEqual(b"abc", bufio.read())
  1084. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  1085. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  1086. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  1087. rawio = self.MockRawIO([b"abc"])
  1088. bufio.__init__(rawio)
  1089. self.assertEqual(b"abc", bufio.read())
  1090. def test_uninitialized(self):
  1091. bufio = self.tp.__new__(self.tp)
  1092. del bufio
  1093. bufio = self.tp.__new__(self.tp)
  1094. self.assertRaisesRegex((ValueError, AttributeError),
  1095. 'uninitialized|has no attribute',
  1096. bufio.read, 0)
  1097. bufio.__init__(self.MockRawIO())
  1098. self.assertEqual(bufio.read(0), b'')
  1099. def test_read(self):
  1100. for arg in (None, 7):
  1101. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  1102. bufio = self.tp(rawio)
  1103. self.assertEqual(b"abcdefg", bufio.read(arg))
  1104. # Invalid args
  1105. self.assertRaises(ValueError, bufio.read, -2)
  1106. def test_read1(self):
  1107. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  1108. bufio = self.tp(rawio)
  1109. self.assertEqual(b"a", bufio.read(1))
  1110. self.assertEqual(b"b", bufio.read1(1))
  1111. self.assertEqual(rawio._reads, 1)
  1112. self.assertEqual(b"", bufio.read1(0))
  1113. self.assertEqual(b"c", bufio.read1(100))
  1114. self.assertEqual(rawio._reads, 1)
  1115. self.assertEqual(b"d", bufio.read1(100))
  1116. self.assertEqual(rawio._reads, 2)
  1117. self.assertEqual(b"efg", bufio.read1(100))
  1118. self.assertEqual(rawio._reads, 3)
  1119. self.assertEqual(b"", bufio.read1(100))
  1120. self.assertEqual(rawio._reads, 4)
  1121. def test_read1_arbitrary(self):
  1122. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  1123. bufio = self.tp(rawio)
  1124. self.assertEqual(b"a", bufio.read(1))
  1125. self.assertEqual(b"bc", bufio.read1())
  1126. self.assertEqual(b"d", bufio.read1())
  1127. self.assertEqual(b"efg", bufio.read1(-1))
  1128. self.assertEqual(rawio._reads, 3)
  1129. self.assertEqual(b"", bufio.read1())
  1130. self.assertEqual(rawio._reads, 4)
  1131. def test_readinto(self):
  1132. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  1133. bufio = self.tp(rawio)
  1134. b = bytearray(2)
  1135. self.assertEqual(bufio.readinto(b), 2)
  1136. self.assertEqual(b, b"ab")
  1137. self.assertEqual(bufio.readinto(b), 2)
  1138. self.assertEqual(b, b"cd")
  1139. self.assertEqual(bufio.readinto(b), 2)
  1140. self.assertEqual(b, b"ef")
  1141. self.assertEqual(bufio.readinto(b), 1)
  1142. self.assertEqual(b, b"gf")
  1143. self.assertEqual(bufio.readinto(b), 0)
  1144. self.assertEqual(b, b"gf")
  1145. rawio = self.MockRawIO((b"abc", None))
  1146. bufio = self.tp(rawio)
  1147. self.assertEqual(bufio.readinto(b), 2)
  1148. self.assertEqual(b, b"ab")
  1149. self.assertEqual(bufio.readinto(b), 1)
  1150. self.assertEqual(b, b"cb")
  1151. def test_readinto1(self):
  1152. buffer_size = 10
  1153. rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
  1154. bufio = self.tp(rawio, buffer_size=buffer_size)
  1155. b = bytearray(2)
  1156. self.assertEqual(bufio.peek(3), b'abc')
  1157. self.assertEqual(rawio._reads, 1)
  1158. self.assertEqual(bufio.readinto1(b), 2)
  1159. self.assertEqual(b, b"ab")
  1160. self.assertEqual(rawio._reads, 1)
  1161. self.assertEqual(bufio.readinto1(b), 1)
  1162. self.assertEqual(b[:1], b"c")
  1163. self.assertEqual(rawio._reads, 1)
  1164. self.assertEqual(bufio.readinto1(b), 2)
  1165. self.assertEqual(b, b"de")
  1166. self.assertEqual(rawio._reads, 2)
  1167. b = bytearray(2*buffer_size)
  1168. self.assertEqual(bufio.peek(3), b'fgh')
  1169. self.assertEqual(rawio._reads, 3)
  1170. self.assertEqual(bufio.readinto1(b), 6)
  1171. self.assertEqual(b[:6], b"fghjkl")
  1172. self.assertEqual(rawio._reads, 4)
  1173. def test_readinto_array(self):
  1174. buffer_size = 60
  1175. data = b"a" * 26
  1176. rawio = self.MockRawIO((data,))
  1177. bufio = self.tp(rawio, buffer_size=buffer_size)
  1178. # Create an array with element size > 1 byte
  1179. b = array.array('i', b'x' * 32)
  1180. assert len(b) != 16
  1181. # Read into it. We should get as many *bytes* as we can fit into b
  1182. # (which is more than the number of elements)
  1183. n = bufio.readinto(b)
  1184. self.assertGreater(n, len(b))
  1185. # Check that old contents of b are preserved
  1186. bm = memoryview(b).cast('B')
  1187. self.assertLess(n, len(bm))
  1188. self.assertEqual(bm[:n], data[:n])
  1189. self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
  1190. def test_readinto1_array(self):
  1191. buffer_size = 60
  1192. data = b"a" * 26
  1193. rawio = self.MockRawIO((data,))
  1194. bufio = self.tp(rawio, buffer_size=buffer_size)
  1195. # Create an array with element size > 1 byte
  1196. b = array.array('i', b'x' * 32)
  1197. assert len(b) != 16
  1198. # Read into it. We should get as many *bytes* as we can fit into b
  1199. # (which is more than the number of elements)
  1200. n = bufio.readinto1(b)
  1201. self.assertGreater(n, len(b))
  1202. # Check that old contents of b are preserved
  1203. bm = memoryview(b).cast('B')
  1204. self.assertLess(n, len(bm))
  1205. self.assertEqual(bm[:n], data[:n])
  1206. self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
  1207. def test_readlines(self):
  1208. def bufio():
  1209. rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
  1210. return self.tp(rawio)
  1211. self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
  1212. self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
  1213. self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
  1214. def test_buffering(self):
  1215. data = b"abcdefghi"
  1216. dlen = len(data)
  1217. tests = [
  1218. [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
  1219. [ 100, [ 3, 3, 3], [ dlen ] ],
  1220. [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
  1221. ]
  1222. for bufsize, buf_read_sizes, raw_read_sizes in tests:
  1223. rawio = self.MockFileIO(data)
  1224. bufio = self.tp(rawio, buffer_size=bufsize)
  1225. pos = 0
  1226. for nbytes in buf_read_sizes:
  1227. self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
  1228. pos += nbytes
  1229. # this is mildly implementation-dependent
  1230. self.assertEqual(rawio.read_history, raw_read_sizes)
  1231. def test_read_non_blocking(self):
  1232. # Inject some None's in there to simulate EWOULDBLOCK
  1233. rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
  1234. bufio = self.tp(rawio)
  1235. self.assertEqual(b"abcd", bufio.read(6))
  1236. self.assertEqual(b"e", bufio.read(1))
  1237. self.assertEqual(b"fg", bufio.read())
  1238. self.assertEqual(b"", bufio.peek(1))
  1239. self.assertIsNone(bufio.read())
  1240. self.assertEqual(b"", bufio.read())
  1241. rawio = self.MockRawIO((b"a", None, None))
  1242. self.assertEqual(b"a", rawio.readall())
  1243. self.assertIsNone(rawio.readall())
  1244. def test_read_past_eof(self):
  1245. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  1246. bufio = self.tp(rawio)
  1247. self.assertEqual(b"abcdefg", bufio.read(9000))
  1248. def test_read_all(self):
  1249. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  1250. bufio = self.tp(rawio)
  1251. self.assertEqual(b"abcdefg", bufio.read())
  1252. @support.requires_resource('cpu')
  1253. @threading_helper.requires_working_threading()
  1254. def test_threads(self):
  1255. try:
  1256. # Write out many bytes with exactly the same number of 0's,
  1257. # 1's... 255's. This will help us check that concurrent reading
  1258. # doesn't duplicate or forget contents.
  1259. N = 1000
  1260. l = list(range(256)) * N
  1261. random.shuffle(l)
  1262. s = bytes(bytearray(l))
  1263. with self.open(os_helper.TESTFN, "wb") as f:
  1264. f.write(s)
  1265. with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
  1266. bufio = self.tp(raw, 8)
  1267. errors = []
  1268. results = []
  1269. def f():
  1270. try:
  1271. # Intra-buffer read then buffer-flushing read
  1272. for n in cycle([1, 19]):
  1273. s = bufio.read(n)
  1274. if not s:
  1275. break
  1276. # list.append() is atomic
  1277. results.append(s)
  1278. except Exception as e:
  1279. errors.append(e)
  1280. raise
  1281. threads = [threading.Thread(target=f) for x in range(20)]
  1282. with threading_helper.start_threads(threads):
  1283. time.sleep(0.02) # yield
  1284. self.assertFalse(errors,
  1285. "the following exceptions were caught: %r" % errors)
  1286. s = b''.join(results)
  1287. for i in range(256):
  1288. c = bytes(bytearray([i]))
  1289. self.assertEqual(s.count(c), N)
  1290. finally:
  1291. os_helper.unlink(os_helper.TESTFN)
  1292. def test_unseekable(self):
  1293. bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
  1294. self.assertRaises(self.UnsupportedOperation, bufio.tell)
  1295. self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
  1296. bufio.read(1)
  1297. self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
  1298. self.assertRaises(self.UnsupportedOperation, bufio.tell)
  1299. def test_misbehaved_io(self):
  1300. rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
  1301. bufio = self.tp(rawio)
  1302. self.assertRaises(OSError, bufio.seek, 0)
  1303. self.assertRaises(OSError, bufio.tell)
  1304. # Silence destructor error
  1305. bufio.close = lambda: None
  1306. def test_no_extraneous_read(self):
  1307. # Issue #9550; when the raw IO object has satisfied the read request,
  1308. # we should not issue any additional reads, otherwise it may block
  1309. # (e.g. socket).
  1310. bufsize = 16
  1311. for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
  1312. rawio = self.MockRawIO([b"x" * n])
  1313. bufio = self.tp(rawio, bufsize)
  1314. self.assertEqual(bufio.read(n), b"x" * n)
  1315. # Simple case: one raw read is enough to satisfy the request.
  1316. self.assertEqual(rawio._extraneous_reads, 0,
  1317. "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
  1318. # A more complex case where two raw reads are needed to satisfy
  1319. # the request.
  1320. rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
  1321. bufio = self.tp(rawio, bufsize)
  1322. self.assertEqual(bufio.read(n), b"x" * n)
  1323. self.assertEqual(rawio._extraneous_reads, 0,
  1324. "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
  1325. def test_read_on_closed(self):
  1326. # Issue #23796
  1327. b = io.BufferedReader(io.BytesIO(b"12"))
  1328. b.read(1)
  1329. b.close()
  1330. self.assertRaises(ValueError, b.peek)
  1331. self.assertRaises(ValueError, b.read1, 1)
  1332. def test_truncate_on_read_only(self):
  1333. rawio = self.MockFileIO(b"abc")
  1334. bufio = self.tp(rawio)
  1335. self.assertFalse(bufio.writable())
  1336. self.assertRaises(self.UnsupportedOperation, bufio.truncate)
  1337. self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
  1338. class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
  1339. tp = io.BufferedReader
  1340. @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
  1341. "instead of returning NULL for malloc failure.")
  1342. def test_constructor(self):
  1343. BufferedReaderTest.test_constructor(self)
  1344. # The allocation can succeed on 32-bit builds, e.g. with more
  1345. # than 2 GiB RAM and a 64-bit kernel.
  1346. if sys.maxsize > 0x7FFFFFFF:
  1347. rawio = self.MockRawIO()
  1348. bufio = self.tp(rawio)
  1349. self.assertRaises((OverflowError, MemoryError, ValueError),
  1350. bufio.__init__, rawio, sys.maxsize)
  1351. def test_initialization(self):
  1352. rawio = self.MockRawIO([b"abc"])
  1353. bufio = self.tp(rawio)
  1354. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  1355. self.assertRaises(ValueError, bufio.read)
  1356. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  1357. self.assertRaises(ValueError, bufio.read)
  1358. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  1359. self.assertRaises(ValueError, bufio.read)
  1360. def test_misbehaved_io_read(self):
  1361. rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
  1362. bufio = self.tp(rawio)
  1363. # _pyio.BufferedReader seems to implement reading different, so that
  1364. # checking this is not so easy.
  1365. self.assertRaises(OSError, bufio.read, 10)
  1366. def test_garbage_collection(self):
  1367. # C BufferedReader objects are collected.
  1368. # The Python version has __del__, so it ends into gc.garbage instead
  1369. self.addCleanup(os_helper.unlink, os_helper.TESTFN)
  1370. with warnings_helper.check_warnings(('', ResourceWarning)):
  1371. rawio = self.FileIO(os_helper.TESTFN, "w+b")
  1372. f = self.tp(rawio)
  1373. f.f = f
  1374. wr = weakref.ref(f)
  1375. del f
  1376. support.gc_collect()
  1377. self.assertIsNone(wr(), wr)
  1378. def test_args_error(self):
  1379. # Issue #17275
  1380. with self.assertRaisesRegex(TypeError, "BufferedReader"):
  1381. self.tp(io.BytesIO(), 1024, 1024, 1024)
  1382. def test_bad_readinto_value(self):
  1383. rawio = io.BufferedReader(io.BytesIO(b"12"))
  1384. rawio.readinto = lambda buf: -1
  1385. bufio = self.tp(rawio)
  1386. with self.assertRaises(OSError) as cm:
  1387. bufio.readline()
  1388. self.assertIsNone(cm.exception.__cause__)
  1389. def test_bad_readinto_type(self):
  1390. rawio = io.BufferedReader(io.BytesIO(b"12"))
  1391. rawio.readinto = lambda buf: b''
  1392. bufio = self.tp(rawio)
  1393. with self.assertRaises(OSError) as cm:
  1394. bufio.readline()
  1395. self.assertIsInstance(cm.exception.__cause__, TypeError)
  1396. class PyBufferedReaderTest(BufferedReaderTest):
  1397. tp = pyio.BufferedReader
  1398. class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
  1399. write_mode = "wb"
  1400. def test_constructor(self):
  1401. rawio = self.MockRawIO()
  1402. bufio = self.tp(rawio)
  1403. bufio.__init__(rawio)
  1404. bufio.__init__(rawio, buffer_size=1024)
  1405. bufio.__init__(rawio, buffer_size=16)
  1406. self.assertEqual(3, bufio.write(b"abc"))
  1407. bufio.flush()
  1408. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  1409. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  1410. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  1411. bufio.__init__(rawio)
  1412. self.assertEqual(3, bufio.write(b"ghi"))
  1413. bufio.flush()
  1414. self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
  1415. def test_uninitialized(self):
  1416. bufio = self.tp.__new__(self.tp)
  1417. del bufio
  1418. bufio = self.tp.__new__(self.tp)
  1419. self.assertRaisesRegex((ValueError, AttributeError),
  1420. 'uninitialized|has no attribute',
  1421. bufio.write, b'')
  1422. bufio.__init__(self.MockRawIO())
  1423. self.assertEqual(bufio.write(b''), 0)
  1424. def test_detach_flush(self):
  1425. raw = self.MockRawIO()
  1426. buf = self.tp(raw)
  1427. buf.write(b"howdy!")
  1428. self.assertFalse(raw._write_stack)
  1429. buf.detach()
  1430. self.assertEqual(raw._write_stack, [b"howdy!"])
  1431. def test_write(self):
  1432. # Write to the buffered IO but don't overflow the buffer.
  1433. writer = self.MockRawIO()
  1434. bufio = self.tp(writer, 8)
  1435. bufio.write(b"abc")
  1436. self.assertFalse(writer._write_stack)
  1437. buffer = bytearray(b"def")
  1438. bufio.write(buffer)
  1439. buffer[:] = b"***" # Overwrite our copy of the data
  1440. bufio.flush()
  1441. self.assertEqual(b"".join(writer._write_stack), b"abcdef")
  1442. def test_write_overflow(self):
  1443. writer = self.MockRawIO()
  1444. bufio = self.tp(writer, 8)
  1445. contents = b"abcdefghijklmnop"
  1446. for n in range(0, len(contents), 3):
  1447. bufio.write(contents[n:n+3])
  1448. flushed = b"".join(writer._write_stack)
  1449. # At least (total - 8) bytes were implicitly flushed, perhaps more
  1450. # depending on the implementation.
  1451. self.assertTrue(flushed.startswith(contents[:-8]), flushed)
  1452. def check_writes(self, intermediate_func):
  1453. # Lots of writes, test the flushed output is as expected.
  1454. contents = bytes(range(256)) * 1000
  1455. n = 0
  1456. writer = self.MockRawIO()
  1457. bufio = self.tp(writer, 13)
  1458. # Generator of write sizes: repeat each N 15 times then proceed to N+1
  1459. def gen_sizes():
  1460. for size in count(1):
  1461. for i in range(15):
  1462. yield size
  1463. sizes = gen_sizes()
  1464. while n < len(contents):
  1465. size = min(next(sizes), len(contents) - n)
  1466. self.assertEqual(bufio.write(contents[n:n+size]), size)
  1467. intermediate_func(bufio)
  1468. n += size
  1469. bufio.flush()
  1470. self.assertEqual(contents, b"".join(writer._write_stack))
  1471. def test_writes(self):
  1472. self.check_writes(lambda bufio: None)
  1473. def test_writes_and_flushes(self):
  1474. self.check_writes(lambda bufio: bufio.flush())
  1475. def test_writes_and_seeks(self):
  1476. def _seekabs(bufio):
  1477. pos = bufio.tell()
  1478. bufio.seek(pos + 1, 0)
  1479. bufio.seek(pos - 1, 0)
  1480. bufio.seek(pos, 0)
  1481. self.check_writes(_seekabs)
  1482. def _seekrel(bufio):
  1483. pos = bufio.seek(0, 1)
  1484. bufio.seek(+1, 1)
  1485. bufio.seek(-1, 1)
  1486. bufio.seek(pos, 0)
  1487. self.check_writes(_seekrel)
  1488. def test_writes_and_truncates(self):
  1489. self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
  1490. def test_write_non_blocking(self):
  1491. raw = self.MockNonBlockWriterIO()
  1492. bufio = self.tp(raw, 8)
  1493. self.assertEqual(bufio.write(b"abcd"), 4)
  1494. self.assertEqual(bufio.write(b"efghi"), 5)
  1495. # 1 byte will be written, the rest will be buffered
  1496. raw.block_on(b"k")
  1497. self.assertEqual(bufio.write(b"jklmn"), 5)
  1498. # 8 bytes will be written, 8 will be buffered and the rest will be lost
  1499. raw.block_on(b"0")
  1500. try:
  1501. bufio.write(b"opqrwxyz0123456789")
  1502. except self.BlockingIOError as e:
  1503. written = e.characters_written
  1504. else:
  1505. self.fail("BlockingIOError should have been raised")
  1506. self.assertEqual(written, 16)
  1507. self.assertEqual(raw.pop_written(),
  1508. b"abcdefghijklmnopqrwxyz")
  1509. self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
  1510. s = raw.pop_written()
  1511. # Previously buffered bytes were flushed
  1512. self.assertTrue(s.startswith(b"01234567A"), s)
  1513. def test_write_and_rewind(self):
  1514. raw = io.BytesIO()
  1515. bufio = self.tp(raw, 4)
  1516. self.assertEqual(bufio.write(b"abcdef"), 6)
  1517. self.assertEqual(bufio.tell(), 6)
  1518. bufio.seek(0, 0)
  1519. self.assertEqual(bufio.write(b"XY"), 2)
  1520. bufio.seek(6, 0)
  1521. self.assertEqual(raw.getvalue(), b"XYcdef")
  1522. self.assertEqual(bufio.write(b"123456"), 6)
  1523. bufio.flush()
  1524. self.assertEqual(raw.getvalue(), b"XYcdef123456")
  1525. def test_flush(self):
  1526. writer = self.MockRawIO()
  1527. bufio = self.tp(writer, 8)
  1528. bufio.write(b"abc")
  1529. bufio.flush()
  1530. self.assertEqual(b"abc", writer._write_stack[0])
  1531. def test_writelines(self):
  1532. l = [b'ab', b'cd', b'ef']
  1533. writer = self.MockRawIO()
  1534. bufio = self.tp(writer, 8)
  1535. bufio.writelines(l)
  1536. bufio.flush()
  1537. self.assertEqual(b''.join(writer._write_stack), b'abcdef')
  1538. def test_writelines_userlist(self):
  1539. l = UserList([b'ab', b'cd', b'ef'])
  1540. writer = self.MockRawIO()
  1541. bufio = self.tp(writer, 8)
  1542. bufio.writelines(l)
  1543. bufio.flush()
  1544. self.assertEqual(b''.join(writer._write_stack), b'abcdef')
  1545. def test_writelines_error(self):
  1546. writer = self.MockRawIO()
  1547. bufio = self.tp(writer, 8)
  1548. self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
  1549. self.assertRaises(TypeError, bufio.writelines, None)
  1550. self.assertRaises(TypeError, bufio.writelines, 'abc')
  1551. def test_destructor(self):
  1552. writer = self.MockRawIO()
  1553. bufio = self.tp(writer, 8)
  1554. bufio.write(b"abc")
  1555. del bufio
  1556. support.gc_collect()
  1557. self.assertEqual(b"abc", writer._write_stack[0])
  1558. def test_truncate(self):
  1559. # Truncate implicitly flushes the buffer.
  1560. self.addCleanup(os_helper.unlink, os_helper.TESTFN)
  1561. with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
  1562. bufio = self.tp(raw, 8)
  1563. bufio.write(b"abcdef")
  1564. self.assertEqual(bufio.truncate(3), 3)
  1565. self.assertEqual(bufio.tell(), 6)
  1566. with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
  1567. self.assertEqual(f.read(), b"abc")
  1568. def test_truncate_after_write(self):
  1569. # Ensure that truncate preserves the file position after
  1570. # writes longer than the buffer size.
  1571. # Issue: https://bugs.python.org/issue32228
  1572. self.addCleanup(os_helper.unlink, os_helper.TESTFN)
  1573. with self.open(os_helper.TESTFN, "wb") as f:
  1574. # Fill with some buffer
  1575. f.write(b'\x00' * 10000)
  1576. buffer_sizes = [8192, 4096, 200]
  1577. for buffer_size in buffer_sizes:
  1578. with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
  1579. f.write(b'\x00' * (buffer_size + 1))
  1580. # After write write_pos and write_end are set to 0
  1581. f.read(1)
  1582. # read operation makes sure that pos != raw_pos
  1583. f.truncate()
  1584. self.assertEqual(f.tell(), buffer_size + 2)
  1585. @support.requires_resource('cpu')
  1586. @threading_helper.requires_working_threading()
  1587. def test_threads(self):
  1588. try:
  1589. # Write out many bytes from many threads and test they were
  1590. # all flushed.
  1591. N = 1000
  1592. contents = bytes(range(256)) * N
  1593. sizes = cycle([1, 19])
  1594. n = 0
  1595. queue = deque()
  1596. while n < len(contents):
  1597. size = next(sizes)
  1598. queue.append(contents[n:n+size])
  1599. n += size
  1600. del contents
  1601. # We use a real file object because it allows us to
  1602. # exercise situations where the GIL is released before
  1603. # writing the buffer to the raw streams. This is in addition
  1604. # to concurrency issues due to switching threads in the middle
  1605. # of Python code.
  1606. with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
  1607. bufio = self.tp(raw, 8)
  1608. errors = []
  1609. def f():
  1610. try:
  1611. while True:
  1612. try:
  1613. s = queue.popleft()
  1614. except IndexError:
  1615. return
  1616. bufio.write(s)
  1617. except Exception as e:
  1618. errors.append(e)
  1619. raise
  1620. threads = [threading.Thread(target=f) for x in range(20)]
  1621. with threading_helper.start_threads(threads):
  1622. time.sleep(0.02) # yield
  1623. self.assertFalse(errors,
  1624. "the following exceptions were caught: %r" % errors)
  1625. bufio.close()
  1626. with self.open(os_helper.TESTFN, "rb") as f:
  1627. s = f.read()
  1628. for i in range(256):
  1629. self.assertEqual(s.count(bytes([i])), N)
  1630. finally:
  1631. os_helper.unlink(os_helper.TESTFN)
  1632. def test_misbehaved_io(self):
  1633. rawio = self.MisbehavedRawIO()
  1634. bufio = self.tp(rawio, 5)
  1635. self.assertRaises(OSError, bufio.seek, 0)
  1636. self.assertRaises(OSError, bufio.tell)
  1637. self.assertRaises(OSError, bufio.write, b"abcdef")
  1638. # Silence destructor error
  1639. bufio.close = lambda: None
  1640. def test_max_buffer_size_removal(self):
  1641. with self.assertRaises(TypeError):
  1642. self.tp(self.MockRawIO(), 8, 12)
  1643. def test_write_error_on_close(self):
  1644. raw = self.MockRawIO()
  1645. def bad_write(b):
  1646. raise OSError()
  1647. raw.write = bad_write
  1648. b = self.tp(raw)
  1649. b.write(b'spam')
  1650. self.assertRaises(OSError, b.close) # exception not swallowed
  1651. self.assertTrue(b.closed)
  1652. @threading_helper.requires_working_threading()
  1653. def test_slow_close_from_thread(self):
  1654. # Issue #31976
  1655. rawio = self.SlowFlushRawIO()
  1656. bufio = self.tp(rawio, 8)
  1657. t = threading.Thread(target=bufio.close)
  1658. t.start()
  1659. rawio.in_flush.wait()
  1660. self.assertRaises(ValueError, bufio.write, b'spam')
  1661. self.assertTrue(bufio.closed)
  1662. t.join()
  1663. class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
  1664. tp = io.BufferedWriter
  1665. @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
  1666. "instead of returning NULL for malloc failure.")
  1667. def test_constructor(self):
  1668. BufferedWriterTest.test_constructor(self)
  1669. # The allocation can succeed on 32-bit builds, e.g. with more
  1670. # than 2 GiB RAM and a 64-bit kernel.
  1671. if sys.maxsize > 0x7FFFFFFF:
  1672. rawio = self.MockRawIO()
  1673. bufio = self.tp(rawio)
  1674. self.assertRaises((OverflowError, MemoryError, ValueError),
  1675. bufio.__init__, rawio, sys.maxsize)
  1676. def test_initialization(self):
  1677. rawio = self.MockRawIO()
  1678. bufio = self.tp(rawio)
  1679. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  1680. self.assertRaises(ValueError, bufio.write, b"def")
  1681. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  1682. self.assertRaises(ValueError, bufio.write, b"def")
  1683. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  1684. self.assertRaises(ValueError, bufio.write, b"def")
  1685. def test_garbage_collection(self):
  1686. # C BufferedWriter objects are collected, and collecting them flushes
  1687. # all data to disk.
  1688. # The Python version has __del__, so it ends into gc.garbage instead
  1689. self.addCleanup(os_helper.unlink, os_helper.TESTFN)
  1690. with warnings_helper.check_warnings(('', ResourceWarning)):
  1691. rawio = self.FileIO(os_helper.TESTFN, "w+b")
  1692. f = self.tp(rawio)
  1693. f.write(b"123xxx")
  1694. f.x = f
  1695. wr = weakref.ref(f)
  1696. del f
  1697. support.gc_collect()
  1698. self.assertIsNone(wr(), wr)
  1699. with self.open(os_helper.TESTFN, "rb") as f:
  1700. self.assertEqual(f.read(), b"123xxx")
  1701. def test_args_error(self):
  1702. # Issue #17275
  1703. with self.assertRaisesRegex(TypeError, "BufferedWriter"):
  1704. self.tp(io.BytesIO(), 1024, 1024, 1024)
  1705. class PyBufferedWriterTest(BufferedWriterTest):
  1706. tp = pyio.BufferedWriter
  1707. class BufferedRWPairTest(unittest.TestCase):
  1708. def test_constructor(self):
  1709. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1710. self.assertFalse(pair.closed)
  1711. def test_uninitialized(self):
  1712. pair = self.tp.__new__(self.tp)
  1713. del pair
  1714. pair = self.tp.__new__(self.tp)
  1715. self.assertRaisesRegex((ValueError, AttributeError),
  1716. 'uninitialized|has no attribute',
  1717. pair.read, 0)
  1718. self.assertRaisesRegex((ValueError, AttributeError),
  1719. 'uninitialized|has no attribute',
  1720. pair.write, b'')
  1721. pair.__init__(self.MockRawIO(), self.MockRawIO())
  1722. self.assertEqual(pair.read(0), b'')
  1723. self.assertEqual(pair.write(b''), 0)
  1724. def test_detach(self):
  1725. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1726. self.assertRaises(self.UnsupportedOperation, pair.detach)
  1727. def test_constructor_max_buffer_size_removal(self):
  1728. with self.assertRaises(TypeError):
  1729. self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
  1730. def test_constructor_with_not_readable(self):
  1731. class NotReadable(MockRawIO):
  1732. def readable(self):
  1733. return False
  1734. self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
  1735. def test_constructor_with_not_writeable(self):
  1736. class NotWriteable(MockRawIO):
  1737. def writable(self):
  1738. return False
  1739. self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
  1740. def test_read(self):
  1741. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1742. self.assertEqual(pair.read(3), b"abc")
  1743. self.assertEqual(pair.read(1), b"d")
  1744. self.assertEqual(pair.read(), b"ef")
  1745. pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
  1746. self.assertEqual(pair.read(None), b"abc")
  1747. def test_readlines(self):
  1748. pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
  1749. self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
  1750. self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
  1751. self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
  1752. def test_read1(self):
  1753. # .read1() is delegated to the underlying reader object, so this test
  1754. # can be shallow.
  1755. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1756. self.assertEqual(pair.read1(3), b"abc")
  1757. self.assertEqual(pair.read1(), b"def")
  1758. def test_readinto(self):
  1759. for method in ("readinto", "readinto1"):
  1760. with self.subTest(method):
  1761. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1762. data = byteslike(b'\0' * 5)
  1763. self.assertEqual(getattr(pair, method)(data), 5)
  1764. self.assertEqual(bytes(data), b"abcde")
  1765. def test_write(self):
  1766. w = self.MockRawIO()
  1767. pair = self.tp(self.MockRawIO(), w)
  1768. pair.write(b"abc")
  1769. pair.flush()
  1770. buffer = bytearray(b"def")
  1771. pair.write(buffer)
  1772. buffer[:] = b"***" # Overwrite our copy of the data
  1773. pair.flush()
  1774. self.assertEqual(w._write_stack, [b"abc", b"def"])
  1775. def test_peek(self):
  1776. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1777. self.assertTrue(pair.peek(3).startswith(b"abc"))
  1778. self.assertEqual(pair.read(3), b"abc")
  1779. def test_readable(self):
  1780. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1781. self.assertTrue(pair.readable())
  1782. def test_writeable(self):
  1783. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1784. self.assertTrue(pair.writable())
  1785. def test_seekable(self):
  1786. # BufferedRWPairs are never seekable, even if their readers and writers
  1787. # are.
  1788. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1789. self.assertFalse(pair.seekable())
  1790. # .flush() is delegated to the underlying writer object and has been
  1791. # tested in the test_write method.
  1792. def test_close_and_closed(self):
  1793. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1794. self.assertFalse(pair.closed)
  1795. pair.close()
  1796. self.assertTrue(pair.closed)
  1797. def test_reader_close_error_on_close(self):
  1798. def reader_close():
  1799. reader_non_existing
  1800. reader = self.MockRawIO()
  1801. reader.close = reader_close
  1802. writer = self.MockRawIO()
  1803. pair = self.tp(reader, writer)
  1804. with self.assertRaises(NameError) as err:
  1805. pair.close()
  1806. self.assertIn('reader_non_existing', str(err.exception))
  1807. self.assertTrue(pair.closed)
  1808. self.assertFalse(reader.closed)
  1809. self.assertTrue(writer.closed)
  1810. # Silence destructor error
  1811. reader.close = lambda: None
  1812. def test_writer_close_error_on_close(self):
  1813. def writer_close():
  1814. writer_non_existing
  1815. reader = self.MockRawIO()
  1816. writer = self.MockRawIO()
  1817. writer.close = writer_close
  1818. pair = self.tp(reader, writer)
  1819. with self.assertRaises(NameError) as err:
  1820. pair.close()
  1821. self.assertIn('writer_non_existing', str(err.exception))
  1822. self.assertFalse(pair.closed)
  1823. self.assertTrue(reader.closed)
  1824. self.assertFalse(writer.closed)
  1825. # Silence destructor error
  1826. writer.close = lambda: None
  1827. writer = None
  1828. # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
  1829. with support.catch_unraisable_exception():
  1830. # Ignore BufferedRWPair unraisable exception
  1831. with support.catch_unraisable_exception():
  1832. pair = None
  1833. support.gc_collect()
  1834. support.gc_collect()
  1835. def test_reader_writer_close_error_on_close(self):
  1836. def reader_close():
  1837. reader_non_existing
  1838. def writer_close():
  1839. writer_non_existing
  1840. reader = self.MockRawIO()
  1841. reader.close = reader_close
  1842. writer = self.MockRawIO()
  1843. writer.close = writer_close
  1844. pair = self.tp(reader, writer)
  1845. with self.assertRaises(NameError) as err:
  1846. pair.close()
  1847. self.assertIn('reader_non_existing', str(err.exception))
  1848. self.assertIsInstance(err.exception.__context__, NameError)
  1849. self.assertIn('writer_non_existing', str(err.exception.__context__))
  1850. self.assertFalse(pair.closed)
  1851. self.assertFalse(reader.closed)
  1852. self.assertFalse(writer.closed)
  1853. # Silence destructor error
  1854. reader.close = lambda: None
  1855. writer.close = lambda: None
  1856. def test_isatty(self):
  1857. class SelectableIsAtty(MockRawIO):
  1858. def __init__(self, isatty):
  1859. MockRawIO.__init__(self)
  1860. self._isatty = isatty
  1861. def isatty(self):
  1862. return self._isatty
  1863. pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
  1864. self.assertFalse(pair.isatty())
  1865. pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
  1866. self.assertTrue(pair.isatty())
  1867. pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
  1868. self.assertTrue(pair.isatty())
  1869. pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
  1870. self.assertTrue(pair.isatty())
  1871. def test_weakref_clearing(self):
  1872. brw = self.tp(self.MockRawIO(), self.MockRawIO())
  1873. ref = weakref.ref(brw)
  1874. brw = None
  1875. ref = None # Shouldn't segfault.
  1876. class CBufferedRWPairTest(BufferedRWPairTest):
  1877. tp = io.BufferedRWPair
  1878. class PyBufferedRWPairTest(BufferedRWPairTest):
  1879. tp = pyio.BufferedRWPair
  1880. class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
  1881. read_mode = "rb+"
  1882. write_mode = "wb+"
  1883. def test_constructor(self):
  1884. BufferedReaderTest.test_constructor(self)
  1885. BufferedWriterTest.test_constructor(self)
  1886. def test_uninitialized(self):
  1887. BufferedReaderTest.test_uninitialized(self)
  1888. BufferedWriterTest.test_uninitialized(self)
  1889. def test_read_and_write(self):
  1890. raw = self.MockRawIO((b"asdf", b"ghjk"))
  1891. rw = self.tp(raw, 8)
  1892. self.assertEqual(b"as", rw.read(2))
  1893. rw.write(b"ddd")
  1894. rw.write(b"eee")
  1895. self.assertFalse(raw._write_stack) # Buffer writes
  1896. self.assertEqual(b"ghjk", rw.read())
  1897. self.assertEqual(b"dddeee", raw._write_stack[0])
  1898. def test_seek_and_tell(self):
  1899. raw = self.BytesIO(b"asdfghjkl")
  1900. rw = self.tp(raw)
  1901. self.assertEqual(b"as", rw.read(2))
  1902. self.assertEqual(2, rw.tell())
  1903. rw.seek(0, 0)
  1904. self.assertEqual(b"asdf", rw.read(4))
  1905. rw.write(b"123f")
  1906. rw.seek(0, 0)
  1907. self.assertEqual(b"asdf123fl", rw.read())
  1908. self.assertEqual(9, rw.tell())
  1909. rw.seek(-4, 2)
  1910. self.assertEqual(5, rw.tell())
  1911. rw.seek(2, 1)
  1912. self.assertEqual(7, rw.tell())
  1913. self.assertEqual(b"fl", rw.read(11))
  1914. rw.flush()
  1915. self.assertEqual(b"asdf123fl", raw.getvalue())
  1916. self.assertRaises(TypeError, rw.seek, 0.0)
  1917. def check_flush_and_read(self, read_func):
  1918. raw = self.BytesIO(b"abcdefghi")
  1919. bufio = self.tp(raw)
  1920. self.assertEqual(b"ab", read_func(bufio, 2))
  1921. bufio.write(b"12")
  1922. self.assertEqual(b"ef", read_func(bufio, 2))
  1923. self.assertEqual(6, bufio.tell())
  1924. bufio.flush()
  1925. self.assertEqual(6, bufio.tell())
  1926. self.assertEqual(b"ghi", read_func(bufio))
  1927. raw.seek(0, 0)
  1928. raw.write(b"XYZ")
  1929. # flush() resets the read buffer
  1930. bufio.flush()
  1931. bufio.seek(0, 0)
  1932. self.assertEqual(b"XYZ", read_func(bufio, 3))
  1933. def test_flush_and_read(self):
  1934. self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
  1935. def test_flush_and_readinto(self):
  1936. def _readinto(bufio, n=-1):
  1937. b = bytearray(n if n >= 0 else 9999)
  1938. n = bufio.readinto(b)
  1939. return bytes(b[:n])
  1940. self.check_flush_and_read(_readinto)
  1941. def test_flush_and_peek(self):
  1942. def _peek(bufio, n=-1):
  1943. # This relies on the fact that the buffer can contain the whole
  1944. # raw stream, otherwise peek() can return less.
  1945. b = bufio.peek(n)
  1946. if n != -1:
  1947. b = b[:n]
  1948. bufio.seek(len(b), 1)
  1949. return b
  1950. self.check_flush_and_read(_peek)
  1951. def test_flush_and_write(self):
  1952. raw = self.BytesIO(b"abcdefghi")
  1953. bufio = self.tp(raw)
  1954. bufio.write(b"123")
  1955. bufio.flush()
  1956. bufio.write(b"45")
  1957. bufio.flush()
  1958. bufio.seek(0, 0)
  1959. self.assertEqual(b"12345fghi", raw.getvalue())
  1960. self.assertEqual(b"12345fghi", bufio.read())
  1961. def test_threads(self):
  1962. BufferedReaderTest.test_threads(self)
  1963. BufferedWriterTest.test_threads(self)
  1964. def test_writes_and_peek(self):
  1965. def _peek(bufio):
  1966. bufio.peek(1)
  1967. self.check_writes(_peek)
  1968. def _peek(bufio):
  1969. pos = bufio.tell()
  1970. bufio.seek(-1, 1)
  1971. bufio.peek(1)
  1972. bufio.seek(pos, 0)
  1973. self.check_writes(_peek)
  1974. def test_writes_and_reads(self):
  1975. def _read(bufio):
  1976. bufio.seek(-1, 1)
  1977. bufio.read(1)
  1978. self.check_writes(_read)
  1979. def test_writes_and_read1s(self):
  1980. def _read1(bufio):
  1981. bufio.seek(-1, 1)
  1982. bufio.read1(1)
  1983. self.check_writes(_read1)
  1984. def test_writes_and_readintos(self):
  1985. def _read(bufio):
  1986. bufio.seek(-1, 1)
  1987. bufio.readinto(bytearray(1))
  1988. self.check_writes(_read)
  1989. def test_write_after_readahead(self):
  1990. # Issue #6629: writing after the buffer was filled by readahead should
  1991. # first rewind the raw stream.
  1992. for overwrite_size in [1, 5]:
  1993. raw = self.BytesIO(b"A" * 10)
  1994. bufio = self.tp(raw, 4)
  1995. # Trigger readahead
  1996. self.assertEqual(bufio.read(1), b"A")
  1997. self.assertEqual(bufio.tell(), 1)
  1998. # Overwriting should rewind the raw stream if it needs so
  1999. bufio.write(b"B" * overwrite_size)
  2000. self.assertEqual(bufio.tell(), overwrite_size + 1)
  2001. # If the write size was smaller than the buffer size, flush() and
  2002. # check that rewind happens.
  2003. bufio.flush()
  2004. self.assertEqual(bufio.tell(), overwrite_size + 1)
  2005. s = raw.getvalue()
  2006. self.assertEqual(s,
  2007. b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
  2008. def test_write_rewind_write(self):
  2009. # Various combinations of reading / writing / seeking backwards / writing again
  2010. def mutate(bufio, pos1, pos2):
  2011. assert pos2 >= pos1
  2012. # Fill the buffer
  2013. bufio.seek(pos1)
  2014. bufio.read(pos2 - pos1)
  2015. bufio.write(b'\x02')
  2016. # This writes earlier than the previous write, but still inside
  2017. # the buffer.
  2018. bufio.seek(pos1)
  2019. bufio.write(b'\x01')
  2020. b = b"\x80\x81\x82\x83\x84"
  2021. for i in range(0, len(b)):
  2022. for j in range(i, len(b)):
  2023. raw = self.BytesIO(b)
  2024. bufio = self.tp(raw, 100)
  2025. mutate(bufio, i, j)
  2026. bufio.flush()
  2027. expected = bytearray(b)
  2028. expected[j] = 2
  2029. expected[i] = 1
  2030. self.assertEqual(raw.getvalue(), expected,
  2031. "failed result for i=%d, j=%d" % (i, j))
  2032. def test_truncate_after_read_or_write(self):
  2033. raw = self.BytesIO(b"A" * 10)
  2034. bufio = self.tp(raw, 100)
  2035. self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
  2036. self.assertEqual(bufio.truncate(), 2)
  2037. self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
  2038. self.assertEqual(bufio.truncate(), 4)
  2039. def test_misbehaved_io(self):
  2040. BufferedReaderTest.test_misbehaved_io(self)
  2041. BufferedWriterTest.test_misbehaved_io(self)
  2042. def test_interleaved_read_write(self):
  2043. # Test for issue #12213
  2044. with self.BytesIO(b'abcdefgh') as raw:
  2045. with self.tp(raw, 100) as f:
  2046. f.write(b"1")
  2047. self.assertEqual(f.read(1), b'b')
  2048. f.write(b'2')
  2049. self.assertEqual(f.read1(1), b'd')
  2050. f.write(b'3')
  2051. buf = bytearray(1)
  2052. f.readinto(buf)
  2053. self.assertEqual(buf, b'f')
  2054. f.write(b'4')
  2055. self.assertEqual(f.peek(1), b'h')
  2056. f.flush()
  2057. self.assertEqual(raw.getvalue(), b'1b2d3f4h')
  2058. with self.BytesIO(b'abc') as raw:
  2059. with self.tp(raw, 100) as f:
  2060. self.assertEqual(f.read(1), b'a')
  2061. f.write(b"2")
  2062. self.assertEqual(f.read(1), b'c')
  2063. f.flush()
  2064. self.assertEqual(raw.getvalue(), b'a2c')
  2065. def test_interleaved_readline_write(self):
  2066. with self.BytesIO(b'ab\ncdef\ng\n') as raw:
  2067. with self.tp(raw) as f:
  2068. f.write(b'1')
  2069. self.assertEqual(f.readline(), b'b\n')
  2070. f.write(b'2')
  2071. self.assertEqual(f.readline(), b'def\n')
  2072. f.write(b'3')
  2073. self.assertEqual(f.readline(), b'\n')
  2074. f.flush()
  2075. self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
  2076. # You can't construct a BufferedRandom over a non-seekable stream.
  2077. test_unseekable = None
  2078. # writable() returns True, so there's no point to test it over
  2079. # a writable stream.
  2080. test_truncate_on_read_only = None
  2081. class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
  2082. tp = io.BufferedRandom
  2083. @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
  2084. "instead of returning NULL for malloc failure.")
  2085. def test_constructor(self):
  2086. BufferedRandomTest.test_constructor(self)
  2087. # The allocation can succeed on 32-bit builds, e.g. with more
  2088. # than 2 GiB RAM and a 64-bit kernel.
  2089. if sys.maxsize > 0x7FFFFFFF:
  2090. rawio = self.MockRawIO()
  2091. bufio = self.tp(rawio)
  2092. self.assertRaises((OverflowError, MemoryError, ValueError),
  2093. bufio.__init__, rawio, sys.maxsize)
  2094. def test_garbage_collection(self):
  2095. CBufferedReaderTest.test_garbage_collection(self)
  2096. CBufferedWriterTest.test_garbage_collection(self)
  2097. def test_args_error(self):
  2098. # Issue #17275
  2099. with self.assertRaisesRegex(TypeError, "BufferedRandom"):
  2100. self.tp(io.BytesIO(), 1024, 1024, 1024)
  2101. class PyBufferedRandomTest(BufferedRandomTest):
  2102. tp = pyio.BufferedRandom
  2103. # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
  2104. # properties:
  2105. # - A single output character can correspond to many bytes of input.
  2106. # - The number of input bytes to complete the character can be
  2107. # undetermined until the last input byte is received.
  2108. # - The number of input bytes can vary depending on previous input.
  2109. # - A single input byte can correspond to many characters of output.
  2110. # - The number of output characters can be undetermined until the
  2111. # last input byte is received.
  2112. # - The number of output characters can vary depending on previous input.
  2113. class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
  2114. """
  2115. For testing seek/tell behavior with a stateful, buffering decoder.
  2116. Input is a sequence of words. Words may be fixed-length (length set
  2117. by input) or variable-length (period-terminated). In variable-length
  2118. mode, extra periods are ignored. Possible words are:
  2119. - 'i' followed by a number sets the input length, I (maximum 99).
  2120. When I is set to 0, words are space-terminated.
  2121. - 'o' followed by a number sets the output length, O (maximum 99).
  2122. - Any other word is converted into a word followed by a period on
  2123. the output. The output word consists of the input word truncated
  2124. or padded out with hyphens to make its length equal to O. If O
  2125. is 0, the word is output verbatim without truncating or padding.
  2126. I and O are initially set to 1. When I changes, any buffered input is
  2127. re-scanned according to the new I. EOF also terminates the last word.
  2128. """
  2129. def __init__(self, errors='strict'):
  2130. codecs.IncrementalDecoder.__init__(self, errors)
  2131. self.reset()
  2132. def __repr__(self):
  2133. return '<SID %x>' % id(self)
  2134. def reset(self):
  2135. self.i = 1
  2136. self.o = 1
  2137. self.buffer = bytearray()
  2138. def getstate(self):
  2139. i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
  2140. return bytes(self.buffer), i*100 + o
  2141. def setstate(self, state):
  2142. buffer, io = state
  2143. self.buffer = bytearray(buffer)
  2144. i, o = divmod(io, 100)
  2145. self.i, self.o = i ^ 1, o ^ 1
  2146. def decode(self, input, final=False):
  2147. output = ''
  2148. for b in input:
  2149. if self.i == 0: # variable-length, terminated with period
  2150. if b == ord('.'):
  2151. if self.buffer:
  2152. output += self.process_word()
  2153. else:
  2154. self.buffer.append(b)
  2155. else: # fixed-length, terminate after self.i bytes
  2156. self.buffer.append(b)
  2157. if len(self.buffer) == self.i:
  2158. output += self.process_word()
  2159. if final and self.buffer: # EOF terminates the last word
  2160. output += self.process_word()
  2161. return output
  2162. def process_word(self):
  2163. output = ''
  2164. if self.buffer[0] == ord('i'):
  2165. self.i = min(99, int(self.buffer[1:] or 0)) # set input length
  2166. elif self.buffer[0] == ord('o'):
  2167. self.o = min(99, int(self.buffer[1:] or 0)) # set output length
  2168. else:
  2169. output = self.buffer.decode('ascii')
  2170. if len(output) < self.o:
  2171. output += '-'*self.o # pad out with hyphens
  2172. if self.o:
  2173. output = output[:self.o] # truncate to output length
  2174. output += '.'
  2175. self.buffer = bytearray()
  2176. return output
  2177. codecEnabled = False
  2178. # bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
  2179. # when registering codecs and cleanup functions.
  2180. def lookupTestDecoder(name):
  2181. if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
  2182. latin1 = codecs.lookup('latin-1')
  2183. return codecs.CodecInfo(
  2184. name='test_decoder', encode=latin1.encode, decode=None,
  2185. incrementalencoder=None,
  2186. streamreader=None, streamwriter=None,
  2187. incrementaldecoder=StatefulIncrementalDecoder)
  2188. class StatefulIncrementalDecoderTest(unittest.TestCase):
  2189. """
  2190. Make sure the StatefulIncrementalDecoder actually works.
  2191. """
  2192. test_cases = [
  2193. # I=1, O=1 (fixed-length input == fixed-length output)
  2194. (b'abcd', False, 'a.b.c.d.'),
  2195. # I=0, O=0 (variable-length input, variable-length output)
  2196. (b'oiabcd', True, 'abcd.'),
  2197. # I=0, O=0 (should ignore extra periods)
  2198. (b'oi...abcd...', True, 'abcd.'),
  2199. # I=0, O=6 (variable-length input, fixed-length output)
  2200. (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
  2201. # I=2, O=6 (fixed-length input < fixed-length output)
  2202. (b'i.i2.o6xyz', True, 'xy----.z-----.'),
  2203. # I=6, O=3 (fixed-length input > fixed-length output)
  2204. (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
  2205. # I=0, then 3; O=29, then 15 (with longer output)
  2206. (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
  2207. 'a----------------------------.' +
  2208. 'b----------------------------.' +
  2209. 'cde--------------------------.' +
  2210. 'abcdefghijabcde.' +
  2211. 'a.b------------.' +
  2212. '.c.------------.' +
  2213. 'd.e------------.' +
  2214. 'k--------------.' +
  2215. 'l--------------.' +
  2216. 'm--------------.')
  2217. ]
  2218. def test_decoder(self):
  2219. # Try a few one-shot test cases.
  2220. for input, eof, output in self.test_cases:
  2221. d = StatefulIncrementalDecoder()
  2222. self.assertEqual(d.decode(input, eof), output)
  2223. # Also test an unfinished decode, followed by forcing EOF.
  2224. d = StatefulIncrementalDecoder()
  2225. self.assertEqual(d.decode(b'oiabcd'), '')
  2226. self.assertEqual(d.decode(b'', 1), 'abcd.')
  2227. class TextIOWrapperTest(unittest.TestCase):
  2228. def setUp(self):
  2229. self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
  2230. self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
  2231. os_helper.unlink(os_helper.TESTFN)
  2232. codecs.register(lookupTestDecoder)
  2233. self.addCleanup(codecs.unregister, lookupTestDecoder)
  2234. def tearDown(self):
  2235. os_helper.unlink(os_helper.TESTFN)
  2236. def test_constructor(self):
  2237. r = self.BytesIO(b"\xc3\xa9\n\n")
  2238. b = self.BufferedReader(r, 1000)
  2239. t = self.TextIOWrapper(b, encoding="utf-8")
  2240. t.__init__(b, encoding="latin-1", newline="\r\n")
  2241. self.assertEqual(t.encoding, "latin-1")
  2242. self.assertEqual(t.line_buffering, False)
  2243. t.__init__(b, encoding="utf-8", line_buffering=True)
  2244. self.assertEqual(t.encoding, "utf-8")
  2245. self.assertEqual(t.line_buffering, True)
  2246. self.assertEqual("\xe9\n", t.readline())
  2247. self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
  2248. self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
  2249. def test_uninitialized(self):
  2250. t = self.TextIOWrapper.__new__(self.TextIOWrapper)
  2251. del t
  2252. t = self.TextIOWrapper.__new__(self.TextIOWrapper)
  2253. self.assertRaises(Exception, repr, t)
  2254. self.assertRaisesRegex((ValueError, AttributeError),
  2255. 'uninitialized|has no attribute',
  2256. t.read, 0)
  2257. t.__init__(self.MockRawIO(), encoding="utf-8")
  2258. self.assertEqual(t.read(0), '')
  2259. def test_non_text_encoding_codecs_are_rejected(self):
  2260. # Ensure the constructor complains if passed a codec that isn't
  2261. # marked as a text encoding
  2262. # http://bugs.python.org/issue20404
  2263. r = self.BytesIO()
  2264. b = self.BufferedWriter(r)
  2265. with self.assertRaisesRegex(LookupError, "is not a text encoding"):
  2266. self.TextIOWrapper(b, encoding="hex")
  2267. def test_detach(self):
  2268. r = self.BytesIO()
  2269. b = self.BufferedWriter(r)
  2270. t = self.TextIOWrapper(b, encoding="ascii")
  2271. self.assertIs(t.detach(), b)
  2272. t = self.TextIOWrapper(b, encoding="ascii")
  2273. t.write("howdy")
  2274. self.assertFalse(r.getvalue())
  2275. t.detach()
  2276. self.assertEqual(r.getvalue(), b"howdy")
  2277. self.assertRaises(ValueError, t.detach)
  2278. # Operations independent of the detached stream should still work
  2279. repr(t)
  2280. self.assertEqual(t.encoding, "ascii")
  2281. self.assertEqual(t.errors, "strict")
  2282. self.assertFalse(t.line_buffering)
  2283. self.assertFalse(t.write_through)
  2284. def test_repr(self):
  2285. raw = self.BytesIO("hello".encode("utf-8"))
  2286. b = self.BufferedReader(raw)
  2287. t = self.TextIOWrapper(b, encoding="utf-8")
  2288. modname = self.TextIOWrapper.__module__
  2289. self.assertRegex(repr(t),
  2290. r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
  2291. raw.name = "dummy"
  2292. self.assertRegex(repr(t),
  2293. r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
  2294. t.mode = "r"
  2295. self.assertRegex(repr(t),
  2296. r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
  2297. raw.name = b"dummy"
  2298. self.assertRegex(repr(t),
  2299. r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
  2300. t.buffer.detach()
  2301. repr(t) # Should not raise an exception
  2302. def test_recursive_repr(self):
  2303. # Issue #25455
  2304. raw = self.BytesIO()
  2305. t = self.TextIOWrapper(raw, encoding="utf-8")
  2306. with support.swap_attr(raw, 'name', t):
  2307. try:
  2308. repr(t) # Should not crash
  2309. except RuntimeError:
  2310. pass
  2311. def test_line_buffering(self):
  2312. r = self.BytesIO()
  2313. b = self.BufferedWriter(r, 1000)
  2314. t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
  2315. t.write("X")
  2316. self.assertEqual(r.getvalue(), b"") # No flush happened
  2317. t.write("Y\nZ")
  2318. self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
  2319. t.write("A\rB")
  2320. self.assertEqual(r.getvalue(), b"XY\nZA\rB")
  2321. def test_reconfigure_line_buffering(self):
  2322. r = self.BytesIO()
  2323. b = self.BufferedWriter(r, 1000)
  2324. t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
  2325. t.write("AB\nC")
  2326. self.assertEqual(r.getvalue(), b"")
  2327. t.reconfigure(line_buffering=True) # implicit flush
  2328. self.assertEqual(r.getvalue(), b"AB\nC")
  2329. t.write("DEF\nG")
  2330. self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
  2331. t.write("H")
  2332. self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
  2333. t.reconfigure(line_buffering=False) # implicit flush
  2334. self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
  2335. t.write("IJ")
  2336. self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
  2337. # Keeping default value
  2338. t.reconfigure()
  2339. t.reconfigure(line_buffering=None)
  2340. self.assertEqual(t.line_buffering, False)
  2341. t.reconfigure(line_buffering=True)
  2342. t.reconfigure()
  2343. t.reconfigure(line_buffering=None)
  2344. self.assertEqual(t.line_buffering, True)
  2345. @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
  2346. def test_default_encoding(self):
  2347. old_environ = dict(os.environ)
  2348. try:
  2349. # try to get a user preferred encoding different than the current
  2350. # locale encoding to check that TextIOWrapper() uses the current
  2351. # locale encoding and not the user preferred encoding
  2352. for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
  2353. if key in os.environ:
  2354. del os.environ[key]
  2355. current_locale_encoding = locale.getencoding()
  2356. b = self.BytesIO()
  2357. with warnings.catch_warnings():
  2358. warnings.simplefilter("ignore", EncodingWarning)
  2359. t = self.TextIOWrapper(b)
  2360. self.assertEqual(t.encoding, current_locale_encoding)
  2361. finally:
  2362. os.environ.clear()
  2363. os.environ.update(old_environ)
  2364. def test_encoding(self):
  2365. # Check the encoding attribute is always set, and valid
  2366. b = self.BytesIO()
  2367. t = self.TextIOWrapper(b, encoding="utf-8")
  2368. self.assertEqual(t.encoding, "utf-8")
  2369. with warnings.catch_warnings():
  2370. warnings.simplefilter("ignore", EncodingWarning)
  2371. t = self.TextIOWrapper(b)
  2372. self.assertIsNotNone(t.encoding)
  2373. codecs.lookup(t.encoding)
  2374. def test_encoding_errors_reading(self):
  2375. # (1) default
  2376. b = self.BytesIO(b"abc\n\xff\n")
  2377. t = self.TextIOWrapper(b, encoding="ascii")
  2378. self.assertRaises(UnicodeError, t.read)
  2379. # (2) explicit strict
  2380. b = self.BytesIO(b"abc\n\xff\n")
  2381. t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
  2382. self.assertRaises(UnicodeError, t.read)
  2383. # (3) ignore
  2384. b = self.BytesIO(b"abc\n\xff\n")
  2385. t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
  2386. self.assertEqual(t.read(), "abc\n\n")
  2387. # (4) replace
  2388. b = self.BytesIO(b"abc\n\xff\n")
  2389. t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
  2390. self.assertEqual(t.read(), "abc\n\ufffd\n")
  2391. def test_encoding_errors_writing(self):
  2392. # (1) default
  2393. b = self.BytesIO()
  2394. t = self.TextIOWrapper(b, encoding="ascii")
  2395. self.assertRaises(UnicodeError, t.write, "\xff")
  2396. # (2) explicit strict
  2397. b = self.BytesIO()
  2398. t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
  2399. self.assertRaises(UnicodeError, t.write, "\xff")
  2400. # (3) ignore
  2401. b = self.BytesIO()
  2402. t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
  2403. newline="\n")
  2404. t.write("abc\xffdef\n")
  2405. t.flush()
  2406. self.assertEqual(b.getvalue(), b"abcdef\n")
  2407. # (4) replace
  2408. b = self.BytesIO()
  2409. t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
  2410. newline="\n")
  2411. t.write("abc\xffdef\n")
  2412. t.flush()
  2413. self.assertEqual(b.getvalue(), b"abc?def\n")
  2414. def test_newlines(self):
  2415. input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
  2416. tests = [
  2417. [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
  2418. [ '', input_lines ],
  2419. [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
  2420. [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
  2421. [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
  2422. ]
  2423. encodings = (
  2424. 'utf-8', 'latin-1',
  2425. 'utf-16', 'utf-16-le', 'utf-16-be',
  2426. 'utf-32', 'utf-32-le', 'utf-32-be',
  2427. )
  2428. # Try a range of buffer sizes to test the case where \r is the last
  2429. # character in TextIOWrapper._pending_line.
  2430. for encoding in encodings:
  2431. # XXX: str.encode() should return bytes
  2432. data = bytes(''.join(input_lines).encode(encoding))
  2433. for do_reads in (False, True):
  2434. for bufsize in range(1, 10):
  2435. for newline, exp_lines in tests:
  2436. bufio = self.BufferedReader(self.BytesIO(data), bufsize)
  2437. textio = self.TextIOWrapper(bufio, newline=newline,
  2438. encoding=encoding)
  2439. if do_reads:
  2440. got_lines = []
  2441. while True:
  2442. c2 = textio.read(2)
  2443. if c2 == '':
  2444. break
  2445. self.assertEqual(len(c2), 2)
  2446. got_lines.append(c2 + textio.readline())
  2447. else:
  2448. got_lines = list(textio)
  2449. for got_line, exp_line in zip(got_lines, exp_lines):
  2450. self.assertEqual(got_line, exp_line)
  2451. self.assertEqual(len(got_lines), len(exp_lines))
  2452. def test_newlines_input(self):
  2453. testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
  2454. normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
  2455. for newline, expected in [
  2456. (None, normalized.decode("ascii").splitlines(keepends=True)),
  2457. ("", testdata.decode("ascii").splitlines(keepends=True)),
  2458. ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  2459. ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  2460. ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
  2461. ]:
  2462. buf = self.BytesIO(testdata)
  2463. txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
  2464. self.assertEqual(txt.readlines(), expected)
  2465. txt.seek(0)
  2466. self.assertEqual(txt.read(), "".join(expected))
  2467. def test_newlines_output(self):
  2468. testdict = {
  2469. "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
  2470. "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
  2471. "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
  2472. "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
  2473. }
  2474. tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
  2475. for newline, expected in tests:
  2476. buf = self.BytesIO()
  2477. txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
  2478. txt.write("AAA\nB")
  2479. txt.write("BB\nCCC\n")
  2480. txt.write("X\rY\r\nZ")
  2481. txt.flush()
  2482. self.assertEqual(buf.closed, False)
  2483. self.assertEqual(buf.getvalue(), expected)
  2484. def test_destructor(self):
  2485. l = []
  2486. base = self.BytesIO
  2487. class MyBytesIO(base):
  2488. def close(self):
  2489. l.append(self.getvalue())
  2490. base.close(self)
  2491. b = MyBytesIO()
  2492. t = self.TextIOWrapper(b, encoding="ascii")
  2493. t.write("abc")
  2494. del t
  2495. support.gc_collect()
  2496. self.assertEqual([b"abc"], l)
  2497. def test_override_destructor(self):
  2498. record = []
  2499. class MyTextIO(self.TextIOWrapper):
  2500. def __del__(self):
  2501. record.append(1)
  2502. try:
  2503. f = super().__del__
  2504. except AttributeError:
  2505. pass
  2506. else:
  2507. f()
  2508. def close(self):
  2509. record.append(2)
  2510. super().close()
  2511. def flush(self):
  2512. record.append(3)
  2513. super().flush()
  2514. b = self.BytesIO()
  2515. t = MyTextIO(b, encoding="ascii")
  2516. del t
  2517. support.gc_collect()
  2518. self.assertEqual(record, [1, 2, 3])
  2519. def test_error_through_destructor(self):
  2520. # Test that the exception state is not modified by a destructor,
  2521. # even if close() fails.
  2522. rawio = self.CloseFailureIO()
  2523. with support.catch_unraisable_exception() as cm:
  2524. with self.assertRaises(AttributeError):
  2525. self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
  2526. if not IOBASE_EMITS_UNRAISABLE:
  2527. self.assertIsNone(cm.unraisable)
  2528. elif cm.unraisable is not None:
  2529. self.assertEqual(cm.unraisable.exc_type, OSError)
  2530. # Systematic tests of the text I/O API
  2531. def test_basic_io(self):
  2532. for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
  2533. for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
  2534. f = self.open(os_helper.TESTFN, "w+", encoding=enc)
  2535. f._CHUNK_SIZE = chunksize
  2536. self.assertEqual(f.write("abc"), 3)
  2537. f.close()
  2538. f = self.open(os_helper.TESTFN, "r+", encoding=enc)
  2539. f._CHUNK_SIZE = chunksize
  2540. self.assertEqual(f.tell(), 0)
  2541. self.assertEqual(f.read(), "abc")
  2542. cookie = f.tell()
  2543. self.assertEqual(f.seek(0), 0)
  2544. self.assertEqual(f.read(None), "abc")
  2545. f.seek(0)
  2546. self.assertEqual(f.read(2), "ab")
  2547. self.assertEqual(f.read(1), "c")
  2548. self.assertEqual(f.read(1), "")
  2549. self.assertEqual(f.read(), "")
  2550. self.assertEqual(f.tell(), cookie)
  2551. self.assertEqual(f.seek(0), 0)
  2552. self.assertEqual(f.seek(0, 2), cookie)
  2553. self.assertEqual(f.write("def"), 3)
  2554. self.assertEqual(f.seek(cookie), cookie)
  2555. self.assertEqual(f.read(), "def")
  2556. if enc.startswith("utf"):
  2557. self.multi_line_test(f, enc)
  2558. f.close()
  2559. def multi_line_test(self, f, enc):
  2560. f.seek(0)
  2561. f.truncate()
  2562. sample = "s\xff\u0fff\uffff"
  2563. wlines = []
  2564. for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
  2565. chars = []
  2566. for i in range(size):
  2567. chars.append(sample[i % len(sample)])
  2568. line = "".join(chars) + "\n"
  2569. wlines.append((f.tell(), line))
  2570. f.write(line)
  2571. f.seek(0)
  2572. rlines = []
  2573. while True:
  2574. pos = f.tell()
  2575. line = f.readline()
  2576. if not line:
  2577. break
  2578. rlines.append((pos, line))
  2579. self.assertEqual(rlines, wlines)
  2580. def test_telling(self):
  2581. f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
  2582. p0 = f.tell()
  2583. f.write("\xff\n")
  2584. p1 = f.tell()
  2585. f.write("\xff\n")
  2586. p2 = f.tell()
  2587. f.seek(0)
  2588. self.assertEqual(f.tell(), p0)
  2589. self.assertEqual(f.readline(), "\xff\n")
  2590. self.assertEqual(f.tell(), p1)
  2591. self.assertEqual(f.readline(), "\xff\n")
  2592. self.assertEqual(f.tell(), p2)
  2593. f.seek(0)
  2594. for line in f:
  2595. self.assertEqual(line, "\xff\n")
  2596. self.assertRaises(OSError, f.tell)
  2597. self.assertEqual(f.tell(), p2)
  2598. f.close()
  2599. def test_seeking(self):
  2600. chunk_size = _default_chunk_size()
  2601. prefix_size = chunk_size - 2
  2602. u_prefix = "a" * prefix_size
  2603. prefix = bytes(u_prefix.encode("utf-8"))
  2604. self.assertEqual(len(u_prefix), len(prefix))
  2605. u_suffix = "\u8888\n"
  2606. suffix = bytes(u_suffix.encode("utf-8"))
  2607. line = prefix + suffix
  2608. with self.open(os_helper.TESTFN, "wb") as f:
  2609. f.write(line*2)
  2610. with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
  2611. s = f.read(prefix_size)
  2612. self.assertEqual(s, str(prefix, "ascii"))
  2613. self.assertEqual(f.tell(), prefix_size)
  2614. self.assertEqual(f.readline(), u_suffix)
  2615. def test_seeking_too(self):
  2616. # Regression test for a specific bug
  2617. data = b'\xe0\xbf\xbf\n'
  2618. with self.open(os_helper.TESTFN, "wb") as f:
  2619. f.write(data)
  2620. with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
  2621. f._CHUNK_SIZE # Just test that it exists
  2622. f._CHUNK_SIZE = 2
  2623. f.readline()
  2624. f.tell()
  2625. def test_seek_and_tell(self):
  2626. #Test seek/tell using the StatefulIncrementalDecoder.
  2627. # Make test faster by doing smaller seeks
  2628. CHUNK_SIZE = 128
  2629. def test_seek_and_tell_with_data(data, min_pos=0):
  2630. """Tell/seek to various points within a data stream and ensure
  2631. that the decoded data returned by read() is consistent."""
  2632. f = self.open(os_helper.TESTFN, 'wb')
  2633. f.write(data)
  2634. f.close()
  2635. f = self.open(os_helper.TESTFN, encoding='test_decoder')
  2636. f._CHUNK_SIZE = CHUNK_SIZE
  2637. decoded = f.read()
  2638. f.close()
  2639. for i in range(min_pos, len(decoded) + 1): # seek positions
  2640. for j in [1, 5, len(decoded) - i]: # read lengths
  2641. f = self.open(os_helper.TESTFN, encoding='test_decoder')
  2642. self.assertEqual(f.read(i), decoded[:i])
  2643. cookie = f.tell()
  2644. self.assertEqual(f.read(j), decoded[i:i + j])
  2645. f.seek(cookie)
  2646. self.assertEqual(f.read(), decoded[i:])
  2647. f.close()
  2648. # Enable the test decoder.
  2649. StatefulIncrementalDecoder.codecEnabled = 1
  2650. # Run the tests.
  2651. try:
  2652. # Try each test case.
  2653. for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
  2654. test_seek_and_tell_with_data(input)
  2655. # Position each test case so that it crosses a chunk boundary.
  2656. for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
  2657. offset = CHUNK_SIZE - len(input)//2
  2658. prefix = b'.'*offset
  2659. # Don't bother seeking into the prefix (takes too long).
  2660. min_pos = offset*2
  2661. test_seek_and_tell_with_data(prefix + input, min_pos)
  2662. # Ensure our test decoder won't interfere with subsequent tests.
  2663. finally:
  2664. StatefulIncrementalDecoder.codecEnabled = 0
  2665. def test_multibyte_seek_and_tell(self):
  2666. f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
  2667. f.write("AB\n\u3046\u3048\n")
  2668. f.close()
  2669. f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
  2670. self.assertEqual(f.readline(), "AB\n")
  2671. p0 = f.tell()
  2672. self.assertEqual(f.readline(), "\u3046\u3048\n")
  2673. p1 = f.tell()
  2674. f.seek(p0)
  2675. self.assertEqual(f.readline(), "\u3046\u3048\n")
  2676. self.assertEqual(f.tell(), p1)
  2677. f.close()
  2678. def test_seek_with_encoder_state(self):
  2679. f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
  2680. f.write("\u00e6\u0300")
  2681. p0 = f.tell()
  2682. f.write("\u00e6")
  2683. f.seek(p0)
  2684. f.write("\u0300")
  2685. f.close()
  2686. f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
  2687. self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
  2688. f.close()
  2689. def test_encoded_writes(self):
  2690. data = "1234567890"
  2691. tests = ("utf-16",
  2692. "utf-16-le",
  2693. "utf-16-be",
  2694. "utf-32",
  2695. "utf-32-le",
  2696. "utf-32-be")
  2697. for encoding in tests:
  2698. buf = self.BytesIO()
  2699. f = self.TextIOWrapper(buf, encoding=encoding)
  2700. # Check if the BOM is written only once (see issue1753).
  2701. f.write(data)
  2702. f.write(data)
  2703. f.seek(0)
  2704. self.assertEqual(f.read(), data * 2)
  2705. f.seek(0)
  2706. self.assertEqual(f.read(), data * 2)
  2707. self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
  2708. def test_unreadable(self):
  2709. class UnReadable(self.BytesIO):
  2710. def readable(self):
  2711. return False
  2712. txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
  2713. self.assertRaises(OSError, txt.read)
  2714. def test_read_one_by_one(self):
  2715. txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
  2716. reads = ""
  2717. while True:
  2718. c = txt.read(1)
  2719. if not c:
  2720. break
  2721. reads += c
  2722. self.assertEqual(reads, "AA\nBB")
  2723. def test_readlines(self):
  2724. txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
  2725. self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
  2726. txt.seek(0)
  2727. self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
  2728. txt.seek(0)
  2729. self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
  2730. # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
  2731. def test_read_by_chunk(self):
  2732. # make sure "\r\n" straddles 128 char boundary.
  2733. txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
  2734. reads = ""
  2735. while True:
  2736. c = txt.read(128)
  2737. if not c:
  2738. break
  2739. reads += c
  2740. self.assertEqual(reads, "A"*127+"\nB")
  2741. def test_writelines(self):
  2742. l = ['ab', 'cd', 'ef']
  2743. buf = self.BytesIO()
  2744. txt = self.TextIOWrapper(buf, encoding="utf-8")
  2745. txt.writelines(l)
  2746. txt.flush()
  2747. self.assertEqual(buf.getvalue(), b'abcdef')
  2748. def test_writelines_userlist(self):
  2749. l = UserList(['ab', 'cd', 'ef'])
  2750. buf = self.BytesIO()
  2751. txt = self.TextIOWrapper(buf, encoding="utf-8")
  2752. txt.writelines(l)
  2753. txt.flush()
  2754. self.assertEqual(buf.getvalue(), b'abcdef')
  2755. def test_writelines_error(self):
  2756. txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
  2757. self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
  2758. self.assertRaises(TypeError, txt.writelines, None)
  2759. self.assertRaises(TypeError, txt.writelines, b'abc')
  2760. def test_issue1395_1(self):
  2761. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2762. # read one char at a time
  2763. reads = ""
  2764. while True:
  2765. c = txt.read(1)
  2766. if not c:
  2767. break
  2768. reads += c
  2769. self.assertEqual(reads, self.normalized)
  2770. def test_issue1395_2(self):
  2771. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2772. txt._CHUNK_SIZE = 4
  2773. reads = ""
  2774. while True:
  2775. c = txt.read(4)
  2776. if not c:
  2777. break
  2778. reads += c
  2779. self.assertEqual(reads, self.normalized)
  2780. def test_issue1395_3(self):
  2781. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2782. txt._CHUNK_SIZE = 4
  2783. reads = txt.read(4)
  2784. reads += txt.read(4)
  2785. reads += txt.readline()
  2786. reads += txt.readline()
  2787. reads += txt.readline()
  2788. self.assertEqual(reads, self.normalized)
  2789. def test_issue1395_4(self):
  2790. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2791. txt._CHUNK_SIZE = 4
  2792. reads = txt.read(4)
  2793. reads += txt.read()
  2794. self.assertEqual(reads, self.normalized)
  2795. def test_issue1395_5(self):
  2796. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2797. txt._CHUNK_SIZE = 4
  2798. reads = txt.read(4)
  2799. pos = txt.tell()
  2800. txt.seek(0)
  2801. txt.seek(pos)
  2802. self.assertEqual(txt.read(4), "BBB\n")
  2803. def test_issue2282(self):
  2804. buffer = self.BytesIO(self.testdata)
  2805. txt = self.TextIOWrapper(buffer, encoding="ascii")
  2806. self.assertEqual(buffer.seekable(), txt.seekable())
  2807. def test_append_bom(self):
  2808. # The BOM is not written again when appending to a non-empty file
  2809. filename = os_helper.TESTFN
  2810. for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
  2811. with self.open(filename, 'w', encoding=charset) as f:
  2812. f.write('aaa')
  2813. pos = f.tell()
  2814. with self.open(filename, 'rb') as f:
  2815. self.assertEqual(f.read(), 'aaa'.encode(charset))
  2816. with self.open(filename, 'a', encoding=charset) as f:
  2817. f.write('xxx')
  2818. with self.open(filename, 'rb') as f:
  2819. self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
  2820. def test_seek_bom(self):
  2821. # Same test, but when seeking manually
  2822. filename = os_helper.TESTFN
  2823. for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
  2824. with self.open(filename, 'w', encoding=charset) as f:
  2825. f.write('aaa')
  2826. pos = f.tell()
  2827. with self.open(filename, 'r+', encoding=charset) as f:
  2828. f.seek(pos)
  2829. f.write('zzz')
  2830. f.seek(0)
  2831. f.write('bbb')
  2832. with self.open(filename, 'rb') as f:
  2833. self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
  2834. def test_seek_append_bom(self):
  2835. # Same test, but first seek to the start and then to the end
  2836. filename = os_helper.TESTFN
  2837. for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
  2838. with self.open(filename, 'w', encoding=charset) as f:
  2839. f.write('aaa')
  2840. with self.open(filename, 'a', encoding=charset) as f:
  2841. f.seek(0)
  2842. f.seek(0, self.SEEK_END)
  2843. f.write('xxx')
  2844. with self.open(filename, 'rb') as f:
  2845. self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
  2846. def test_errors_property(self):
  2847. with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
  2848. self.assertEqual(f.errors, "strict")
  2849. with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
  2850. self.assertEqual(f.errors, "replace")
  2851. @support.no_tracing
  2852. @threading_helper.requires_working_threading()
  2853. def test_threads_write(self):
  2854. # Issue6750: concurrent writes could duplicate data
  2855. event = threading.Event()
  2856. with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
  2857. def run(n):
  2858. text = "Thread%03d\n" % n
  2859. event.wait()
  2860. f.write(text)
  2861. threads = [threading.Thread(target=run, args=(x,))
  2862. for x in range(20)]
  2863. with threading_helper.start_threads(threads, event.set):
  2864. time.sleep(0.02)
  2865. with self.open(os_helper.TESTFN, encoding="utf-8") as f:
  2866. content = f.read()
  2867. for n in range(20):
  2868. self.assertEqual(content.count("Thread%03d\n" % n), 1)
  2869. def test_flush_error_on_close(self):
  2870. # Test that text file is closed despite failed flush
  2871. # and that flush() is called before file closed.
  2872. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2873. closed = []
  2874. def bad_flush():
  2875. closed[:] = [txt.closed, txt.buffer.closed]
  2876. raise OSError()
  2877. txt.flush = bad_flush
  2878. self.assertRaises(OSError, txt.close) # exception not swallowed
  2879. self.assertTrue(txt.closed)
  2880. self.assertTrue(txt.buffer.closed)
  2881. self.assertTrue(closed) # flush() called
  2882. self.assertFalse(closed[0]) # flush() called before file closed
  2883. self.assertFalse(closed[1])
  2884. txt.flush = lambda: None # break reference loop
  2885. def test_close_error_on_close(self):
  2886. buffer = self.BytesIO(self.testdata)
  2887. def bad_flush():
  2888. raise OSError('flush')
  2889. def bad_close():
  2890. raise OSError('close')
  2891. buffer.close = bad_close
  2892. txt = self.TextIOWrapper(buffer, encoding="ascii")
  2893. txt.flush = bad_flush
  2894. with self.assertRaises(OSError) as err: # exception not swallowed
  2895. txt.close()
  2896. self.assertEqual(err.exception.args, ('close',))
  2897. self.assertIsInstance(err.exception.__context__, OSError)
  2898. self.assertEqual(err.exception.__context__.args, ('flush',))
  2899. self.assertFalse(txt.closed)
  2900. # Silence destructor error
  2901. buffer.close = lambda: None
  2902. txt.flush = lambda: None
  2903. def test_nonnormalized_close_error_on_close(self):
  2904. # Issue #21677
  2905. buffer = self.BytesIO(self.testdata)
  2906. def bad_flush():
  2907. raise non_existing_flush
  2908. def bad_close():
  2909. raise non_existing_close
  2910. buffer.close = bad_close
  2911. txt = self.TextIOWrapper(buffer, encoding="ascii")
  2912. txt.flush = bad_flush
  2913. with self.assertRaises(NameError) as err: # exception not swallowed
  2914. txt.close()
  2915. self.assertIn('non_existing_close', str(err.exception))
  2916. self.assertIsInstance(err.exception.__context__, NameError)
  2917. self.assertIn('non_existing_flush', str(err.exception.__context__))
  2918. self.assertFalse(txt.closed)
  2919. # Silence destructor error
  2920. buffer.close = lambda: None
  2921. txt.flush = lambda: None
  2922. def test_multi_close(self):
  2923. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2924. txt.close()
  2925. txt.close()
  2926. txt.close()
  2927. self.assertRaises(ValueError, txt.flush)
  2928. def test_unseekable(self):
  2929. txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
  2930. self.assertRaises(self.UnsupportedOperation, txt.tell)
  2931. self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
  2932. def test_readonly_attributes(self):
  2933. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2934. buf = self.BytesIO(self.testdata)
  2935. with self.assertRaises(AttributeError):
  2936. txt.buffer = buf
  2937. def test_rawio(self):
  2938. # Issue #12591: TextIOWrapper must work with raw I/O objects, so
  2939. # that subprocess.Popen() can have the required unbuffered
  2940. # semantics with universal_newlines=True.
  2941. raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
  2942. txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
  2943. # Reads
  2944. self.assertEqual(txt.read(4), 'abcd')
  2945. self.assertEqual(txt.readline(), 'efghi\n')
  2946. self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
  2947. def test_rawio_write_through(self):
  2948. # Issue #12591: with write_through=True, writes don't need a flush
  2949. raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
  2950. txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
  2951. write_through=True)
  2952. txt.write('1')
  2953. txt.write('23\n4')
  2954. txt.write('5')
  2955. self.assertEqual(b''.join(raw._write_stack), b'123\n45')
  2956. def test_bufio_write_through(self):
  2957. # Issue #21396: write_through=True doesn't force a flush()
  2958. # on the underlying binary buffered object.
  2959. flush_called, write_called = [], []
  2960. class BufferedWriter(self.BufferedWriter):
  2961. def flush(self, *args, **kwargs):
  2962. flush_called.append(True)
  2963. return super().flush(*args, **kwargs)
  2964. def write(self, *args, **kwargs):
  2965. write_called.append(True)
  2966. return super().write(*args, **kwargs)
  2967. rawio = self.BytesIO()
  2968. data = b"a"
  2969. bufio = BufferedWriter(rawio, len(data)*2)
  2970. textio = self.TextIOWrapper(bufio, encoding='ascii',
  2971. write_through=True)
  2972. # write to the buffered io but don't overflow the buffer
  2973. text = data.decode('ascii')
  2974. textio.write(text)
  2975. # buffer.flush is not called with write_through=True
  2976. self.assertFalse(flush_called)
  2977. # buffer.write *is* called with write_through=True
  2978. self.assertTrue(write_called)
  2979. self.assertEqual(rawio.getvalue(), b"") # no flush
  2980. write_called = [] # reset
  2981. textio.write(text * 10) # total content is larger than bufio buffer
  2982. self.assertTrue(write_called)
  2983. self.assertEqual(rawio.getvalue(), data * 11) # all flushed
  2984. def test_reconfigure_write_through(self):
  2985. raw = self.MockRawIO([])
  2986. t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
  2987. t.write('1')
  2988. t.reconfigure(write_through=True) # implied flush
  2989. self.assertEqual(t.write_through, True)
  2990. self.assertEqual(b''.join(raw._write_stack), b'1')
  2991. t.write('23')
  2992. self.assertEqual(b''.join(raw._write_stack), b'123')
  2993. t.reconfigure(write_through=False)
  2994. self.assertEqual(t.write_through, False)
  2995. t.write('45')
  2996. t.flush()
  2997. self.assertEqual(b''.join(raw._write_stack), b'12345')
  2998. # Keeping default value
  2999. t.reconfigure()
  3000. t.reconfigure(write_through=None)
  3001. self.assertEqual(t.write_through, False)
  3002. t.reconfigure(write_through=True)
  3003. t.reconfigure()
  3004. t.reconfigure(write_through=None)
  3005. self.assertEqual(t.write_through, True)
  3006. def test_read_nonbytes(self):
  3007. # Issue #17106
  3008. # Crash when underlying read() returns non-bytes
  3009. t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
  3010. self.assertRaises(TypeError, t.read, 1)
  3011. t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
  3012. self.assertRaises(TypeError, t.readline)
  3013. t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
  3014. self.assertRaises(TypeError, t.read)
  3015. def test_illegal_encoder(self):
  3016. # Issue 31271: Calling write() while the return value of encoder's
  3017. # encode() is invalid shouldn't cause an assertion failure.
  3018. rot13 = codecs.lookup("rot13")
  3019. with support.swap_attr(rot13, '_is_text_encoding', True):
  3020. t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
  3021. self.assertRaises(TypeError, t.write, 'bar')
  3022. def test_illegal_decoder(self):
  3023. # Issue #17106
  3024. # Bypass the early encoding check added in issue 20404
  3025. def _make_illegal_wrapper():
  3026. quopri = codecs.lookup("quopri")
  3027. quopri._is_text_encoding = True
  3028. try:
  3029. t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
  3030. newline='\n', encoding="quopri")
  3031. finally:
  3032. quopri._is_text_encoding = False
  3033. return t
  3034. # Crash when decoder returns non-string
  3035. t = _make_illegal_wrapper()
  3036. self.assertRaises(TypeError, t.read, 1)
  3037. t = _make_illegal_wrapper()
  3038. self.assertRaises(TypeError, t.readline)
  3039. t = _make_illegal_wrapper()
  3040. self.assertRaises(TypeError, t.read)
  3041. # Issue 31243: calling read() while the return value of decoder's
  3042. # getstate() is invalid should neither crash the interpreter nor
  3043. # raise a SystemError.
  3044. def _make_very_illegal_wrapper(getstate_ret_val):
  3045. class BadDecoder:
  3046. def getstate(self):
  3047. return getstate_ret_val
  3048. def _get_bad_decoder(dummy):
  3049. return BadDecoder()
  3050. quopri = codecs.lookup("quopri")
  3051. with support.swap_attr(quopri, 'incrementaldecoder',
  3052. _get_bad_decoder):
  3053. return _make_illegal_wrapper()
  3054. t = _make_very_illegal_wrapper(42)
  3055. self.assertRaises(TypeError, t.read, 42)
  3056. t = _make_very_illegal_wrapper(())
  3057. self.assertRaises(TypeError, t.read, 42)
  3058. t = _make_very_illegal_wrapper((1, 2))
  3059. self.assertRaises(TypeError, t.read, 42)
  3060. def _check_create_at_shutdown(self, **kwargs):
  3061. # Issue #20037: creating a TextIOWrapper at shutdown
  3062. # shouldn't crash the interpreter.
  3063. iomod = self.io.__name__
  3064. code = """if 1:
  3065. import codecs
  3066. import {iomod} as io
  3067. # Avoid looking up codecs at shutdown
  3068. codecs.lookup('utf-8')
  3069. class C:
  3070. def __init__(self):
  3071. self.buf = io.BytesIO()
  3072. def __del__(self):
  3073. io.TextIOWrapper(self.buf, **{kwargs})
  3074. print("ok")
  3075. c = C()
  3076. """.format(iomod=iomod, kwargs=kwargs)
  3077. return assert_python_ok("-c", code)
  3078. def test_create_at_shutdown_without_encoding(self):
  3079. rc, out, err = self._check_create_at_shutdown()
  3080. if err:
  3081. # Can error out with a RuntimeError if the module state
  3082. # isn't found.
  3083. self.assertIn(self.shutdown_error, err.decode())
  3084. else:
  3085. self.assertEqual("ok", out.decode().strip())
  3086. def test_create_at_shutdown_with_encoding(self):
  3087. rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
  3088. errors='strict')
  3089. self.assertFalse(err)
  3090. self.assertEqual("ok", out.decode().strip())
  3091. def test_read_byteslike(self):
  3092. r = MemviewBytesIO(b'Just some random string\n')
  3093. t = self.TextIOWrapper(r, 'utf-8')
  3094. # TextIOwrapper will not read the full string, because
  3095. # we truncate it to a multiple of the native int size
  3096. # so that we can construct a more complex memoryview.
  3097. bytes_val = _to_memoryview(r.getvalue()).tobytes()
  3098. self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
  3099. def test_issue22849(self):
  3100. class F(object):
  3101. def readable(self): return True
  3102. def writable(self): return True
  3103. def seekable(self): return True
  3104. for i in range(10):
  3105. try:
  3106. self.TextIOWrapper(F(), encoding='utf-8')
  3107. except Exception:
  3108. pass
  3109. F.tell = lambda x: 0
  3110. t = self.TextIOWrapper(F(), encoding='utf-8')
  3111. def test_reconfigure_locale(self):
  3112. wrapper = io.TextIOWrapper(io.BytesIO(b"test"))
  3113. wrapper.reconfigure(encoding="locale")
  3114. def test_reconfigure_encoding_read(self):
  3115. # latin1 -> utf8
  3116. # (latin1 can decode utf-8 encoded string)
  3117. data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
  3118. raw = self.BytesIO(data)
  3119. txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
  3120. self.assertEqual(txt.readline(), 'abc\xe9\n')
  3121. with self.assertRaises(self.UnsupportedOperation):
  3122. txt.reconfigure(encoding='utf-8')
  3123. with self.assertRaises(self.UnsupportedOperation):
  3124. txt.reconfigure(newline=None)
  3125. def test_reconfigure_write_fromascii(self):
  3126. # ascii has a specific encodefunc in the C implementation,
  3127. # but utf-8-sig has not. Make sure that we get rid of the
  3128. # cached encodefunc when we switch encoders.
  3129. raw = self.BytesIO()
  3130. txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
  3131. txt.write('foo\n')
  3132. txt.reconfigure(encoding='utf-8-sig')
  3133. txt.write('\xe9\n')
  3134. txt.flush()
  3135. self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
  3136. def test_reconfigure_write(self):
  3137. # latin -> utf8
  3138. raw = self.BytesIO()
  3139. txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
  3140. txt.write('abc\xe9\n')
  3141. txt.reconfigure(encoding='utf-8')
  3142. self.assertEqual(raw.getvalue(), b'abc\xe9\n')
  3143. txt.write('d\xe9f\n')
  3144. txt.flush()
  3145. self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
  3146. # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
  3147. # the file
  3148. raw = self.BytesIO()
  3149. txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
  3150. txt.write('abc\n')
  3151. txt.reconfigure(encoding='utf-8-sig')
  3152. txt.write('d\xe9f\n')
  3153. txt.flush()
  3154. self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
  3155. def test_reconfigure_write_non_seekable(self):
  3156. raw = self.BytesIO()
  3157. raw.seekable = lambda: False
  3158. raw.seek = None
  3159. txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
  3160. txt.write('abc\n')
  3161. txt.reconfigure(encoding='utf-8-sig')
  3162. txt.write('d\xe9f\n')
  3163. txt.flush()
  3164. # If the raw stream is not seekable, there'll be a BOM
  3165. self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
  3166. def test_reconfigure_defaults(self):
  3167. txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
  3168. txt.reconfigure(encoding=None)
  3169. self.assertEqual(txt.encoding, 'ascii')
  3170. self.assertEqual(txt.errors, 'replace')
  3171. txt.write('LF\n')
  3172. txt.reconfigure(newline='\r\n')
  3173. self.assertEqual(txt.encoding, 'ascii')
  3174. self.assertEqual(txt.errors, 'replace')
  3175. txt.reconfigure(errors='ignore')
  3176. self.assertEqual(txt.encoding, 'ascii')
  3177. self.assertEqual(txt.errors, 'ignore')
  3178. txt.write('CRLF\n')
  3179. txt.reconfigure(encoding='utf-8', newline=None)
  3180. self.assertEqual(txt.errors, 'strict')
  3181. txt.seek(0)
  3182. self.assertEqual(txt.read(), 'LF\nCRLF\n')
  3183. self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
  3184. def test_reconfigure_newline(self):
  3185. raw = self.BytesIO(b'CR\rEOF')
  3186. txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
  3187. txt.reconfigure(newline=None)
  3188. self.assertEqual(txt.readline(), 'CR\n')
  3189. raw = self.BytesIO(b'CR\rEOF')
  3190. txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
  3191. txt.reconfigure(newline='')
  3192. self.assertEqual(txt.readline(), 'CR\r')
  3193. raw = self.BytesIO(b'CR\rLF\nEOF')
  3194. txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
  3195. txt.reconfigure(newline='\n')
  3196. self.assertEqual(txt.readline(), 'CR\rLF\n')
  3197. raw = self.BytesIO(b'LF\nCR\rEOF')
  3198. txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
  3199. txt.reconfigure(newline='\r')
  3200. self.assertEqual(txt.readline(), 'LF\nCR\r')
  3201. raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
  3202. txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
  3203. txt.reconfigure(newline='\r\n')
  3204. self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
  3205. txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
  3206. txt.reconfigure(newline=None)
  3207. txt.write('linesep\n')
  3208. txt.reconfigure(newline='')
  3209. txt.write('LF\n')
  3210. txt.reconfigure(newline='\n')
  3211. txt.write('LF\n')
  3212. txt.reconfigure(newline='\r')
  3213. txt.write('CR\n')
  3214. txt.reconfigure(newline='\r\n')
  3215. txt.write('CRLF\n')
  3216. expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
  3217. self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
  3218. def test_issue25862(self):
  3219. # Assertion failures occurred in tell() after read() and write().
  3220. t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
  3221. t.read(1)
  3222. t.read()
  3223. t.tell()
  3224. t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
  3225. t.read(1)
  3226. t.write('x')
  3227. t.tell()
  3228. class MemviewBytesIO(io.BytesIO):
  3229. '''A BytesIO object whose read method returns memoryviews
  3230. rather than bytes'''
  3231. def read1(self, len_):
  3232. return _to_memoryview(super().read1(len_))
  3233. def read(self, len_):
  3234. return _to_memoryview(super().read(len_))
  3235. def _to_memoryview(buf):
  3236. '''Convert bytes-object *buf* to a non-trivial memoryview'''
  3237. arr = array.array('i')
  3238. idx = len(buf) - len(buf) % arr.itemsize
  3239. arr.frombytes(buf[:idx])
  3240. return memoryview(arr)
  3241. class CTextIOWrapperTest(TextIOWrapperTest):
  3242. io = io
  3243. shutdown_error = "LookupError: unknown encoding: ascii"
  3244. def test_initialization(self):
  3245. r = self.BytesIO(b"\xc3\xa9\n\n")
  3246. b = self.BufferedReader(r, 1000)
  3247. t = self.TextIOWrapper(b, encoding="utf-8")
  3248. self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
  3249. self.assertRaises(ValueError, t.read)
  3250. t = self.TextIOWrapper.__new__(self.TextIOWrapper)
  3251. self.assertRaises(Exception, repr, t)
  3252. def test_garbage_collection(self):
  3253. # C TextIOWrapper objects are collected, and collecting them flushes
  3254. # all data to disk.
  3255. # The Python version has __del__, so it ends in gc.garbage instead.
  3256. with warnings_helper.check_warnings(('', ResourceWarning)):
  3257. rawio = io.FileIO(os_helper.TESTFN, "wb")
  3258. b = self.BufferedWriter(rawio)
  3259. t = self.TextIOWrapper(b, encoding="ascii")
  3260. t.write("456def")
  3261. t.x = t
  3262. wr = weakref.ref(t)
  3263. del t
  3264. support.gc_collect()
  3265. self.assertIsNone(wr(), wr)
  3266. with self.open(os_helper.TESTFN, "rb") as f:
  3267. self.assertEqual(f.read(), b"456def")
  3268. def test_rwpair_cleared_before_textio(self):
  3269. # Issue 13070: TextIOWrapper's finalization would crash when called
  3270. # after the reference to the underlying BufferedRWPair's writer got
  3271. # cleared by the GC.
  3272. for i in range(1000):
  3273. b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
  3274. t1 = self.TextIOWrapper(b1, encoding="ascii")
  3275. b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
  3276. t2 = self.TextIOWrapper(b2, encoding="ascii")
  3277. # circular references
  3278. t1.buddy = t2
  3279. t2.buddy = t1
  3280. support.gc_collect()
  3281. def test_del__CHUNK_SIZE_SystemError(self):
  3282. t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
  3283. with self.assertRaises(AttributeError):
  3284. del t._CHUNK_SIZE
  3285. def test_internal_buffer_size(self):
  3286. # bpo-43260: TextIOWrapper's internal buffer should not store
  3287. # data larger than chunk size.
  3288. chunk_size = 8192 # default chunk size, updated later
  3289. class MockIO(self.MockRawIO):
  3290. def write(self, data):
  3291. if len(data) > chunk_size:
  3292. raise RuntimeError
  3293. return super().write(data)
  3294. buf = MockIO()
  3295. t = self.TextIOWrapper(buf, encoding="ascii")
  3296. chunk_size = t._CHUNK_SIZE
  3297. t.write("abc")
  3298. t.write("def")
  3299. # default chunk size is 8192 bytes so t don't write data to buf.
  3300. self.assertEqual([], buf._write_stack)
  3301. with self.assertRaises(RuntimeError):
  3302. t.write("x"*(chunk_size+1))
  3303. self.assertEqual([b"abcdef"], buf._write_stack)
  3304. t.write("ghi")
  3305. t.write("x"*chunk_size)
  3306. self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
  3307. class PyTextIOWrapperTest(TextIOWrapperTest):
  3308. io = pyio
  3309. shutdown_error = "LookupError: unknown encoding: ascii"
  3310. class IncrementalNewlineDecoderTest(unittest.TestCase):
  3311. def check_newline_decoding_utf8(self, decoder):
  3312. # UTF-8 specific tests for a newline decoder
  3313. def _check_decode(b, s, **kwargs):
  3314. # We exercise getstate() / setstate() as well as decode()
  3315. state = decoder.getstate()
  3316. self.assertEqual(decoder.decode(b, **kwargs), s)
  3317. decoder.setstate(state)
  3318. self.assertEqual(decoder.decode(b, **kwargs), s)
  3319. _check_decode(b'\xe8\xa2\x88', "\u8888")
  3320. _check_decode(b'\xe8', "")
  3321. _check_decode(b'\xa2', "")
  3322. _check_decode(b'\x88', "\u8888")
  3323. _check_decode(b'\xe8', "")
  3324. _check_decode(b'\xa2', "")
  3325. _check_decode(b'\x88', "\u8888")
  3326. _check_decode(b'\xe8', "")
  3327. self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
  3328. decoder.reset()
  3329. _check_decode(b'\n', "\n")
  3330. _check_decode(b'\r', "")
  3331. _check_decode(b'', "\n", final=True)
  3332. _check_decode(b'\r', "\n", final=True)
  3333. _check_decode(b'\r', "")
  3334. _check_decode(b'a', "\na")
  3335. _check_decode(b'\r\r\n', "\n\n")
  3336. _check_decode(b'\r', "")
  3337. _check_decode(b'\r', "\n")
  3338. _check_decode(b'\na', "\na")
  3339. _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
  3340. _check_decode(b'\xe8\xa2\x88', "\u8888")
  3341. _check_decode(b'\n', "\n")
  3342. _check_decode(b'\xe8\xa2\x88\r', "\u8888")
  3343. _check_decode(b'\n', "\n")
  3344. def check_newline_decoding(self, decoder, encoding):
  3345. result = []
  3346. if encoding is not None:
  3347. encoder = codecs.getincrementalencoder(encoding)()
  3348. def _decode_bytewise(s):
  3349. # Decode one byte at a time
  3350. for b in encoder.encode(s):
  3351. result.append(decoder.decode(bytes([b])))
  3352. else:
  3353. encoder = None
  3354. def _decode_bytewise(s):
  3355. # Decode one char at a time
  3356. for c in s:
  3357. result.append(decoder.decode(c))
  3358. self.assertEqual(decoder.newlines, None)
  3359. _decode_bytewise("abc\n\r")
  3360. self.assertEqual(decoder.newlines, '\n')
  3361. _decode_bytewise("\nabc")
  3362. self.assertEqual(decoder.newlines, ('\n', '\r\n'))
  3363. _decode_bytewise("abc\r")
  3364. self.assertEqual(decoder.newlines, ('\n', '\r\n'))
  3365. _decode_bytewise("abc")
  3366. self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
  3367. _decode_bytewise("abc\r")
  3368. self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
  3369. decoder.reset()
  3370. input = "abc"
  3371. if encoder is not None:
  3372. encoder.reset()
  3373. input = encoder.encode(input)
  3374. self.assertEqual(decoder.decode(input), "abc")
  3375. self.assertEqual(decoder.newlines, None)
  3376. def test_newline_decoder(self):
  3377. encodings = (
  3378. # None meaning the IncrementalNewlineDecoder takes unicode input
  3379. # rather than bytes input
  3380. None, 'utf-8', 'latin-1',
  3381. 'utf-16', 'utf-16-le', 'utf-16-be',
  3382. 'utf-32', 'utf-32-le', 'utf-32-be',
  3383. )
  3384. for enc in encodings:
  3385. decoder = enc and codecs.getincrementaldecoder(enc)()
  3386. decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
  3387. self.check_newline_decoding(decoder, enc)
  3388. decoder = codecs.getincrementaldecoder("utf-8")()
  3389. decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
  3390. self.check_newline_decoding_utf8(decoder)
  3391. self.assertRaises(TypeError, decoder.setstate, 42)
  3392. def test_newline_bytes(self):
  3393. # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
  3394. def _check(dec):
  3395. self.assertEqual(dec.newlines, None)
  3396. self.assertEqual(dec.decode("\u0D00"), "\u0D00")
  3397. self.assertEqual(dec.newlines, None)
  3398. self.assertEqual(dec.decode("\u0A00"), "\u0A00")
  3399. self.assertEqual(dec.newlines, None)
  3400. dec = self.IncrementalNewlineDecoder(None, translate=False)
  3401. _check(dec)
  3402. dec = self.IncrementalNewlineDecoder(None, translate=True)
  3403. _check(dec)
  3404. def test_translate(self):
  3405. # issue 35062
  3406. for translate in (-2, -1, 1, 2):
  3407. decoder = codecs.getincrementaldecoder("utf-8")()
  3408. decoder = self.IncrementalNewlineDecoder(decoder, translate)
  3409. self.check_newline_decoding_utf8(decoder)
  3410. decoder = codecs.getincrementaldecoder("utf-8")()
  3411. decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
  3412. self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
  3413. class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
  3414. @support.cpython_only
  3415. def test_uninitialized(self):
  3416. uninitialized = self.IncrementalNewlineDecoder.__new__(
  3417. self.IncrementalNewlineDecoder)
  3418. self.assertRaises(ValueError, uninitialized.decode, b'bar')
  3419. self.assertRaises(ValueError, uninitialized.getstate)
  3420. self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
  3421. self.assertRaises(ValueError, uninitialized.reset)
  3422. class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
  3423. pass
  3424. # XXX Tests for open()
  3425. class MiscIOTest(unittest.TestCase):
  3426. def tearDown(self):
  3427. os_helper.unlink(os_helper.TESTFN)
  3428. def test___all__(self):
  3429. for name in self.io.__all__:
  3430. obj = getattr(self.io, name, None)
  3431. self.assertIsNotNone(obj, name)
  3432. if name in ("open", "open_code"):
  3433. continue
  3434. elif "error" in name.lower() or name == "UnsupportedOperation":
  3435. self.assertTrue(issubclass(obj, Exception), name)
  3436. elif not name.startswith("SEEK_"):
  3437. self.assertTrue(issubclass(obj, self.IOBase))
  3438. def test_attributes(self):
  3439. f = self.open(os_helper.TESTFN, "wb", buffering=0)
  3440. self.assertEqual(f.mode, "wb")
  3441. f.close()
  3442. f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
  3443. self.assertEqual(f.mode, "w+")
  3444. self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
  3445. self.assertEqual(f.buffer.raw.mode, "rb+")
  3446. g = self.open(f.fileno(), "wb", closefd=False)
  3447. self.assertEqual(g.mode, "wb")
  3448. self.assertEqual(g.raw.mode, "wb")
  3449. self.assertEqual(g.name, f.fileno())
  3450. self.assertEqual(g.raw.name, f.fileno())
  3451. f.close()
  3452. g.close()
  3453. def test_removed_u_mode(self):
  3454. # bpo-37330: The "U" mode has been removed in Python 3.11
  3455. for mode in ("U", "rU", "r+U"):
  3456. with self.assertRaises(ValueError) as cm:
  3457. self.open(os_helper.TESTFN, mode)
  3458. self.assertIn('invalid mode', str(cm.exception))
  3459. @unittest.skipIf(
  3460. support.is_emscripten, "fstat() of a pipe fd is not supported"
  3461. )
  3462. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  3463. def test_open_pipe_with_append(self):
  3464. # bpo-27805: Ignore ESPIPE from lseek() in open().
  3465. r, w = os.pipe()
  3466. self.addCleanup(os.close, r)
  3467. f = self.open(w, 'a', encoding="utf-8")
  3468. self.addCleanup(f.close)
  3469. # Check that the file is marked non-seekable. On Windows, however, lseek
  3470. # somehow succeeds on pipes.
  3471. if sys.platform != 'win32':
  3472. self.assertFalse(f.seekable())
  3473. def test_io_after_close(self):
  3474. for kwargs in [
  3475. {"mode": "w"},
  3476. {"mode": "wb"},
  3477. {"mode": "w", "buffering": 1},
  3478. {"mode": "w", "buffering": 2},
  3479. {"mode": "wb", "buffering": 0},
  3480. {"mode": "r"},
  3481. {"mode": "rb"},
  3482. {"mode": "r", "buffering": 1},
  3483. {"mode": "r", "buffering": 2},
  3484. {"mode": "rb", "buffering": 0},
  3485. {"mode": "w+"},
  3486. {"mode": "w+b"},
  3487. {"mode": "w+", "buffering": 1},
  3488. {"mode": "w+", "buffering": 2},
  3489. {"mode": "w+b", "buffering": 0},
  3490. ]:
  3491. if "b" not in kwargs["mode"]:
  3492. kwargs["encoding"] = "utf-8"
  3493. f = self.open(os_helper.TESTFN, **kwargs)
  3494. f.close()
  3495. self.assertRaises(ValueError, f.flush)
  3496. self.assertRaises(ValueError, f.fileno)
  3497. self.assertRaises(ValueError, f.isatty)
  3498. self.assertRaises(ValueError, f.__iter__)
  3499. if hasattr(f, "peek"):
  3500. self.assertRaises(ValueError, f.peek, 1)
  3501. self.assertRaises(ValueError, f.read)
  3502. if hasattr(f, "read1"):
  3503. self.assertRaises(ValueError, f.read1, 1024)
  3504. self.assertRaises(ValueError, f.read1)
  3505. if hasattr(f, "readall"):
  3506. self.assertRaises(ValueError, f.readall)
  3507. if hasattr(f, "readinto"):
  3508. self.assertRaises(ValueError, f.readinto, bytearray(1024))
  3509. if hasattr(f, "readinto1"):
  3510. self.assertRaises(ValueError, f.readinto1, bytearray(1024))
  3511. self.assertRaises(ValueError, f.readline)
  3512. self.assertRaises(ValueError, f.readlines)
  3513. self.assertRaises(ValueError, f.readlines, 1)
  3514. self.assertRaises(ValueError, f.seek, 0)
  3515. self.assertRaises(ValueError, f.tell)
  3516. self.assertRaises(ValueError, f.truncate)
  3517. self.assertRaises(ValueError, f.write,
  3518. b"" if "b" in kwargs['mode'] else "")
  3519. self.assertRaises(ValueError, f.writelines, [])
  3520. self.assertRaises(ValueError, next, f)
  3521. def test_blockingioerror(self):
  3522. # Various BlockingIOError issues
  3523. class C(str):
  3524. pass
  3525. c = C("")
  3526. b = self.BlockingIOError(1, c)
  3527. c.b = b
  3528. b.c = c
  3529. wr = weakref.ref(c)
  3530. del c, b
  3531. support.gc_collect()
  3532. self.assertIsNone(wr(), wr)
  3533. def test_abcs(self):
  3534. # Test the visible base classes are ABCs.
  3535. self.assertIsInstance(self.IOBase, abc.ABCMeta)
  3536. self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
  3537. self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
  3538. self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
  3539. def _check_abc_inheritance(self, abcmodule):
  3540. with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
  3541. self.assertIsInstance(f, abcmodule.IOBase)
  3542. self.assertIsInstance(f, abcmodule.RawIOBase)
  3543. self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
  3544. self.assertNotIsInstance(f, abcmodule.TextIOBase)
  3545. with self.open(os_helper.TESTFN, "wb") as f:
  3546. self.assertIsInstance(f, abcmodule.IOBase)
  3547. self.assertNotIsInstance(f, abcmodule.RawIOBase)
  3548. self.assertIsInstance(f, abcmodule.BufferedIOBase)
  3549. self.assertNotIsInstance(f, abcmodule.TextIOBase)
  3550. with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
  3551. self.assertIsInstance(f, abcmodule.IOBase)
  3552. self.assertNotIsInstance(f, abcmodule.RawIOBase)
  3553. self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
  3554. self.assertIsInstance(f, abcmodule.TextIOBase)
  3555. def test_abc_inheritance(self):
  3556. # Test implementations inherit from their respective ABCs
  3557. self._check_abc_inheritance(self)
  3558. def test_abc_inheritance_official(self):
  3559. # Test implementations inherit from the official ABCs of the
  3560. # baseline "io" module.
  3561. self._check_abc_inheritance(io)
  3562. def _check_warn_on_dealloc(self, *args, **kwargs):
  3563. f = open(*args, **kwargs)
  3564. r = repr(f)
  3565. with self.assertWarns(ResourceWarning) as cm:
  3566. f = None
  3567. support.gc_collect()
  3568. self.assertIn(r, str(cm.warning.args[0]))
  3569. def test_warn_on_dealloc(self):
  3570. self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
  3571. self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
  3572. self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
  3573. def _check_warn_on_dealloc_fd(self, *args, **kwargs):
  3574. fds = []
  3575. def cleanup_fds():
  3576. for fd in fds:
  3577. try:
  3578. os.close(fd)
  3579. except OSError as e:
  3580. if e.errno != errno.EBADF:
  3581. raise
  3582. self.addCleanup(cleanup_fds)
  3583. r, w = os.pipe()
  3584. fds += r, w
  3585. self._check_warn_on_dealloc(r, *args, **kwargs)
  3586. # When using closefd=False, there's no warning
  3587. r, w = os.pipe()
  3588. fds += r, w
  3589. with warnings_helper.check_no_resource_warning(self):
  3590. open(r, *args, closefd=False, **kwargs)
  3591. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  3592. def test_warn_on_dealloc_fd(self):
  3593. self._check_warn_on_dealloc_fd("rb", buffering=0)
  3594. self._check_warn_on_dealloc_fd("rb")
  3595. self._check_warn_on_dealloc_fd("r", encoding="utf-8")
  3596. def test_pickling(self):
  3597. # Pickling file objects is forbidden
  3598. for kwargs in [
  3599. {"mode": "w"},
  3600. {"mode": "wb"},
  3601. {"mode": "wb", "buffering": 0},
  3602. {"mode": "r"},
  3603. {"mode": "rb"},
  3604. {"mode": "rb", "buffering": 0},
  3605. {"mode": "w+"},
  3606. {"mode": "w+b"},
  3607. {"mode": "w+b", "buffering": 0},
  3608. ]:
  3609. if "b" not in kwargs["mode"]:
  3610. kwargs["encoding"] = "utf-8"
  3611. for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
  3612. with self.open(os_helper.TESTFN, **kwargs) as f:
  3613. self.assertRaises(TypeError, pickle.dumps, f, protocol)
  3614. @unittest.skipIf(
  3615. support.is_emscripten, "fstat() of a pipe fd is not supported"
  3616. )
  3617. def test_nonblock_pipe_write_bigbuf(self):
  3618. self._test_nonblock_pipe_write(16*1024)
  3619. @unittest.skipIf(
  3620. support.is_emscripten, "fstat() of a pipe fd is not supported"
  3621. )
  3622. def test_nonblock_pipe_write_smallbuf(self):
  3623. self._test_nonblock_pipe_write(1024)
  3624. @unittest.skipUnless(hasattr(os, 'set_blocking'),
  3625. 'os.set_blocking() required for this test')
  3626. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  3627. def _test_nonblock_pipe_write(self, bufsize):
  3628. sent = []
  3629. received = []
  3630. r, w = os.pipe()
  3631. os.set_blocking(r, False)
  3632. os.set_blocking(w, False)
  3633. # To exercise all code paths in the C implementation we need
  3634. # to play with buffer sizes. For instance, if we choose a
  3635. # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
  3636. # then we will never get a partial write of the buffer.
  3637. rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
  3638. wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
  3639. with rf, wf:
  3640. for N in 9999, 73, 7574:
  3641. try:
  3642. i = 0
  3643. while True:
  3644. msg = bytes([i % 26 + 97]) * N
  3645. sent.append(msg)
  3646. wf.write(msg)
  3647. i += 1
  3648. except self.BlockingIOError as e:
  3649. self.assertEqual(e.args[0], errno.EAGAIN)
  3650. self.assertEqual(e.args[2], e.characters_written)
  3651. sent[-1] = sent[-1][:e.characters_written]
  3652. received.append(rf.read())
  3653. msg = b'BLOCKED'
  3654. wf.write(msg)
  3655. sent.append(msg)
  3656. while True:
  3657. try:
  3658. wf.flush()
  3659. break
  3660. except self.BlockingIOError as e:
  3661. self.assertEqual(e.args[0], errno.EAGAIN)
  3662. self.assertEqual(e.args[2], e.characters_written)
  3663. self.assertEqual(e.characters_written, 0)
  3664. received.append(rf.read())
  3665. received += iter(rf.read, None)
  3666. sent, received = b''.join(sent), b''.join(received)
  3667. self.assertEqual(sent, received)
  3668. self.assertTrue(wf.closed)
  3669. self.assertTrue(rf.closed)
  3670. def test_create_fail(self):
  3671. # 'x' mode fails if file is existing
  3672. with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
  3673. pass
  3674. self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
  3675. def test_create_writes(self):
  3676. # 'x' mode opens for writing
  3677. with self.open(os_helper.TESTFN, 'xb') as f:
  3678. f.write(b"spam")
  3679. with self.open(os_helper.TESTFN, 'rb') as f:
  3680. self.assertEqual(b"spam", f.read())
  3681. def test_open_allargs(self):
  3682. # there used to be a buffer overflow in the parser for rawmode
  3683. self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
  3684. def test_check_encoding_errors(self):
  3685. # bpo-37388: open() and TextIOWrapper must check encoding and errors
  3686. # arguments in dev mode
  3687. mod = self.io.__name__
  3688. filename = __file__
  3689. invalid = 'Boom, Shaka Laka, Boom!'
  3690. code = textwrap.dedent(f'''
  3691. import sys
  3692. from {mod} import open, TextIOWrapper
  3693. try:
  3694. open({filename!r}, encoding={invalid!r})
  3695. except LookupError:
  3696. pass
  3697. else:
  3698. sys.exit(21)
  3699. try:
  3700. open({filename!r}, errors={invalid!r})
  3701. except LookupError:
  3702. pass
  3703. else:
  3704. sys.exit(22)
  3705. fp = open({filename!r}, "rb")
  3706. with fp:
  3707. try:
  3708. TextIOWrapper(fp, encoding={invalid!r})
  3709. except LookupError:
  3710. pass
  3711. else:
  3712. sys.exit(23)
  3713. try:
  3714. TextIOWrapper(fp, errors={invalid!r})
  3715. except LookupError:
  3716. pass
  3717. else:
  3718. sys.exit(24)
  3719. sys.exit(10)
  3720. ''')
  3721. proc = assert_python_failure('-X', 'dev', '-c', code)
  3722. self.assertEqual(proc.rc, 10, proc)
  3723. def test_check_encoding_warning(self):
  3724. # PEP 597: Raise warning when encoding is not specified
  3725. # and sys.flags.warn_default_encoding is set.
  3726. mod = self.io.__name__
  3727. filename = __file__
  3728. code = textwrap.dedent(f'''\
  3729. import sys
  3730. from {mod} import open, TextIOWrapper
  3731. import pathlib
  3732. with open({filename!r}) as f: # line 5
  3733. pass
  3734. pathlib.Path({filename!r}).read_text() # line 8
  3735. ''')
  3736. proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
  3737. warnings = proc.err.splitlines()
  3738. self.assertEqual(len(warnings), 2)
  3739. self.assertTrue(
  3740. warnings[0].startswith(b"<string>:5: EncodingWarning: "))
  3741. self.assertTrue(
  3742. warnings[1].startswith(b"<string>:8: EncodingWarning: "))
  3743. def test_text_encoding(self):
  3744. # PEP 597, bpo-47000. io.text_encoding() returns "locale" or "utf-8"
  3745. # based on sys.flags.utf8_mode
  3746. code = "import io; print(io.text_encoding(None))"
  3747. proc = assert_python_ok('-X', 'utf8=0', '-c', code)
  3748. self.assertEqual(b"locale", proc.out.strip())
  3749. proc = assert_python_ok('-X', 'utf8=1', '-c', code)
  3750. self.assertEqual(b"utf-8", proc.out.strip())
  3751. @support.cpython_only
  3752. # Depending if OpenWrapper was already created or not, the warning is
  3753. # emitted or not. For example, the attribute is already created when this
  3754. # test is run multiple times.
  3755. @warnings_helper.ignore_warnings(category=DeprecationWarning)
  3756. def test_openwrapper(self):
  3757. self.assertIs(self.io.OpenWrapper, self.io.open)
  3758. class CMiscIOTest(MiscIOTest):
  3759. io = io
  3760. def test_readinto_buffer_overflow(self):
  3761. # Issue #18025
  3762. class BadReader(self.io.BufferedIOBase):
  3763. def read(self, n=-1):
  3764. return b'x' * 10**6
  3765. bufio = BadReader()
  3766. b = bytearray(2)
  3767. self.assertRaises(ValueError, bufio.readinto, b)
  3768. def check_daemon_threads_shutdown_deadlock(self, stream_name):
  3769. # Issue #23309: deadlocks at shutdown should be avoided when a
  3770. # daemon thread and the main thread both write to a file.
  3771. code = """if 1:
  3772. import sys
  3773. import time
  3774. import threading
  3775. from test.support import SuppressCrashReport
  3776. file = sys.{stream_name}
  3777. def run():
  3778. while True:
  3779. file.write('.')
  3780. file.flush()
  3781. crash = SuppressCrashReport()
  3782. crash.__enter__()
  3783. # don't call __exit__(): the crash occurs at Python shutdown
  3784. thread = threading.Thread(target=run)
  3785. thread.daemon = True
  3786. thread.start()
  3787. time.sleep(0.5)
  3788. file.write('!')
  3789. file.flush()
  3790. """.format_map(locals())
  3791. res, _ = run_python_until_end("-c", code)
  3792. err = res.err.decode()
  3793. if res.rc != 0:
  3794. # Failure: should be a fatal error
  3795. pattern = (r"Fatal Python error: _enter_buffered_busy: "
  3796. r"could not acquire lock "
  3797. r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
  3798. r"at interpreter shutdown, possibly due to "
  3799. r"daemon threads".format_map(locals()))
  3800. self.assertRegex(err, pattern)
  3801. else:
  3802. self.assertFalse(err.strip('.!'))
  3803. @threading_helper.requires_working_threading()
  3804. def test_daemon_threads_shutdown_stdout_deadlock(self):
  3805. self.check_daemon_threads_shutdown_deadlock('stdout')
  3806. @threading_helper.requires_working_threading()
  3807. def test_daemon_threads_shutdown_stderr_deadlock(self):
  3808. self.check_daemon_threads_shutdown_deadlock('stderr')
  3809. class PyMiscIOTest(MiscIOTest):
  3810. io = pyio
  3811. @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
  3812. class SignalsTest(unittest.TestCase):
  3813. def setUp(self):
  3814. self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
  3815. def tearDown(self):
  3816. signal.signal(signal.SIGALRM, self.oldalrm)
  3817. def alarm_interrupt(self, sig, frame):
  3818. 1/0
  3819. def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
  3820. """Check that a partial write, when it gets interrupted, properly
  3821. invokes the signal handler, and bubbles up the exception raised
  3822. in the latter."""
  3823. # XXX This test has three flaws that appear when objects are
  3824. # XXX not reference counted.
  3825. # - if wio.write() happens to trigger a garbage collection,
  3826. # the signal exception may be raised when some __del__
  3827. # method is running; it will not reach the assertRaises()
  3828. # call.
  3829. # - more subtle, if the wio object is not destroyed at once
  3830. # and survives this function, the next opened file is likely
  3831. # to have the same fileno (since the file descriptor was
  3832. # actively closed). When wio.__del__ is finally called, it
  3833. # will close the other's test file... To trigger this with
  3834. # CPython, try adding "global wio" in this function.
  3835. # - This happens only for streams created by the _pyio module,
  3836. # because a wio.close() that fails still consider that the
  3837. # file needs to be closed again. You can try adding an
  3838. # "assert wio.closed" at the end of the function.
  3839. # Fortunately, a little gc.collect() seems to be enough to
  3840. # work around all these issues.
  3841. support.gc_collect() # For PyPy or other GCs.
  3842. read_results = []
  3843. def _read():
  3844. s = os.read(r, 1)
  3845. read_results.append(s)
  3846. t = threading.Thread(target=_read)
  3847. t.daemon = True
  3848. r, w = os.pipe()
  3849. fdopen_kwargs["closefd"] = False
  3850. large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
  3851. try:
  3852. wio = self.io.open(w, **fdopen_kwargs)
  3853. if hasattr(signal, 'pthread_sigmask'):
  3854. # create the thread with SIGALRM signal blocked
  3855. signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
  3856. t.start()
  3857. signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
  3858. else:
  3859. t.start()
  3860. # Fill the pipe enough that the write will be blocking.
  3861. # It will be interrupted by the timer armed above. Since the
  3862. # other thread has read one byte, the low-level write will
  3863. # return with a successful (partial) result rather than an EINTR.
  3864. # The buffered IO layer must check for pending signal
  3865. # handlers, which in this case will invoke alarm_interrupt().
  3866. signal.alarm(1)
  3867. try:
  3868. self.assertRaises(ZeroDivisionError, wio.write, large_data)
  3869. finally:
  3870. signal.alarm(0)
  3871. t.join()
  3872. # We got one byte, get another one and check that it isn't a
  3873. # repeat of the first one.
  3874. read_results.append(os.read(r, 1))
  3875. self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
  3876. finally:
  3877. os.close(w)
  3878. os.close(r)
  3879. # This is deliberate. If we didn't close the file descriptor
  3880. # before closing wio, wio would try to flush its internal
  3881. # buffer, and block again.
  3882. try:
  3883. wio.close()
  3884. except OSError as e:
  3885. if e.errno != errno.EBADF:
  3886. raise
  3887. @requires_alarm
  3888. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  3889. def test_interrupted_write_unbuffered(self):
  3890. self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
  3891. @requires_alarm
  3892. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  3893. def test_interrupted_write_buffered(self):
  3894. self.check_interrupted_write(b"xy", b"xy", mode="wb")
  3895. @requires_alarm
  3896. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  3897. def test_interrupted_write_text(self):
  3898. self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
  3899. @support.no_tracing
  3900. def check_reentrant_write(self, data, **fdopen_kwargs):
  3901. def on_alarm(*args):
  3902. # Will be called reentrantly from the same thread
  3903. wio.write(data)
  3904. 1/0
  3905. signal.signal(signal.SIGALRM, on_alarm)
  3906. r, w = os.pipe()
  3907. wio = self.io.open(w, **fdopen_kwargs)
  3908. try:
  3909. signal.alarm(1)
  3910. # Either the reentrant call to wio.write() fails with RuntimeError,
  3911. # or the signal handler raises ZeroDivisionError.
  3912. with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
  3913. while 1:
  3914. for i in range(100):
  3915. wio.write(data)
  3916. wio.flush()
  3917. # Make sure the buffer doesn't fill up and block further writes
  3918. os.read(r, len(data) * 100)
  3919. exc = cm.exception
  3920. if isinstance(exc, RuntimeError):
  3921. self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
  3922. finally:
  3923. signal.alarm(0)
  3924. wio.close()
  3925. os.close(r)
  3926. @requires_alarm
  3927. def test_reentrant_write_buffered(self):
  3928. self.check_reentrant_write(b"xy", mode="wb")
  3929. @requires_alarm
  3930. def test_reentrant_write_text(self):
  3931. self.check_reentrant_write("xy", mode="w", encoding="ascii")
  3932. def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
  3933. """Check that a buffered read, when it gets interrupted (either
  3934. returning a partial result or EINTR), properly invokes the signal
  3935. handler and retries if the latter returned successfully."""
  3936. r, w = os.pipe()
  3937. fdopen_kwargs["closefd"] = False
  3938. def alarm_handler(sig, frame):
  3939. os.write(w, b"bar")
  3940. signal.signal(signal.SIGALRM, alarm_handler)
  3941. try:
  3942. rio = self.io.open(r, **fdopen_kwargs)
  3943. os.write(w, b"foo")
  3944. signal.alarm(1)
  3945. # Expected behaviour:
  3946. # - first raw read() returns partial b"foo"
  3947. # - second raw read() returns EINTR
  3948. # - third raw read() returns b"bar"
  3949. self.assertEqual(decode(rio.read(6)), "foobar")
  3950. finally:
  3951. signal.alarm(0)
  3952. rio.close()
  3953. os.close(w)
  3954. os.close(r)
  3955. @requires_alarm
  3956. def test_interrupted_read_retry_buffered(self):
  3957. self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
  3958. mode="rb")
  3959. @requires_alarm
  3960. def test_interrupted_read_retry_text(self):
  3961. self.check_interrupted_read_retry(lambda x: x,
  3962. mode="r", encoding="latin1")
  3963. def check_interrupted_write_retry(self, item, **fdopen_kwargs):
  3964. """Check that a buffered write, when it gets interrupted (either
  3965. returning a partial result or EINTR), properly invokes the signal
  3966. handler and retries if the latter returned successfully."""
  3967. select = import_helper.import_module("select")
  3968. # A quantity that exceeds the buffer size of an anonymous pipe's
  3969. # write end.
  3970. N = support.PIPE_MAX_SIZE
  3971. r, w = os.pipe()
  3972. fdopen_kwargs["closefd"] = False
  3973. # We need a separate thread to read from the pipe and allow the
  3974. # write() to finish. This thread is started after the SIGALRM is
  3975. # received (forcing a first EINTR in write()).
  3976. read_results = []
  3977. write_finished = False
  3978. error = None
  3979. def _read():
  3980. try:
  3981. while not write_finished:
  3982. while r in select.select([r], [], [], 1.0)[0]:
  3983. s = os.read(r, 1024)
  3984. read_results.append(s)
  3985. except BaseException as exc:
  3986. nonlocal error
  3987. error = exc
  3988. t = threading.Thread(target=_read)
  3989. t.daemon = True
  3990. def alarm1(sig, frame):
  3991. signal.signal(signal.SIGALRM, alarm2)
  3992. signal.alarm(1)
  3993. def alarm2(sig, frame):
  3994. t.start()
  3995. large_data = item * N
  3996. signal.signal(signal.SIGALRM, alarm1)
  3997. try:
  3998. wio = self.io.open(w, **fdopen_kwargs)
  3999. signal.alarm(1)
  4000. # Expected behaviour:
  4001. # - first raw write() is partial (because of the limited pipe buffer
  4002. # and the first alarm)
  4003. # - second raw write() returns EINTR (because of the second alarm)
  4004. # - subsequent write()s are successful (either partial or complete)
  4005. written = wio.write(large_data)
  4006. self.assertEqual(N, written)
  4007. wio.flush()
  4008. write_finished = True
  4009. t.join()
  4010. self.assertIsNone(error)
  4011. self.assertEqual(N, sum(len(x) for x in read_results))
  4012. finally:
  4013. signal.alarm(0)
  4014. write_finished = True
  4015. os.close(w)
  4016. os.close(r)
  4017. # This is deliberate. If we didn't close the file descriptor
  4018. # before closing wio, wio would try to flush its internal
  4019. # buffer, and could block (in case of failure).
  4020. try:
  4021. wio.close()
  4022. except OSError as e:
  4023. if e.errno != errno.EBADF:
  4024. raise
  4025. @requires_alarm
  4026. def test_interrupted_write_retry_buffered(self):
  4027. self.check_interrupted_write_retry(b"x", mode="wb")
  4028. @requires_alarm
  4029. def test_interrupted_write_retry_text(self):
  4030. self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
  4031. class CSignalsTest(SignalsTest):
  4032. io = io
  4033. class PySignalsTest(SignalsTest):
  4034. io = pyio
  4035. # Handling reentrancy issues would slow down _pyio even more, so the
  4036. # tests are disabled.
  4037. test_reentrant_write_buffered = None
  4038. test_reentrant_write_text = None
  4039. def load_tests(loader, tests, pattern):
  4040. tests = (CIOTest, PyIOTest, APIMismatchTest,
  4041. CBufferedReaderTest, PyBufferedReaderTest,
  4042. CBufferedWriterTest, PyBufferedWriterTest,
  4043. CBufferedRWPairTest, PyBufferedRWPairTest,
  4044. CBufferedRandomTest, PyBufferedRandomTest,
  4045. StatefulIncrementalDecoderTest,
  4046. CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
  4047. CTextIOWrapperTest, PyTextIOWrapperTest,
  4048. CMiscIOTest, PyMiscIOTest,
  4049. CSignalsTest, PySignalsTest,
  4050. )
  4051. # Put the namespaces of the IO module we are testing and some useful mock
  4052. # classes in the __dict__ of each test.
  4053. mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
  4054. MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
  4055. SlowFlushRawIO)
  4056. all_members = io.__all__ + ["IncrementalNewlineDecoder"]
  4057. c_io_ns = {name : getattr(io, name) for name in all_members}
  4058. py_io_ns = {name : getattr(pyio, name) for name in all_members}
  4059. globs = globals()
  4060. c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
  4061. py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
  4062. for test in tests:
  4063. if test.__name__.startswith("C"):
  4064. for name, obj in c_io_ns.items():
  4065. setattr(test, name, obj)
  4066. elif test.__name__.startswith("Py"):
  4067. for name, obj in py_io_ns.items():
  4068. setattr(test, name, obj)
  4069. suite = loader.suiteClass()
  4070. for test in tests:
  4071. suite.addTest(loader.loadTestsFromTestCase(test))
  4072. return suite
  4073. if __name__ == "__main__":
  4074. unittest.main()