| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254 |
- import enum
- import errno
- from http import client, HTTPStatus
- import io
- import itertools
- import os
- import array
- import re
- import socket
- import threading
- import warnings
- import unittest
- from unittest import mock
- TestCase = unittest.TestCase
- from test import support
- from test.support import os_helper
- from test.support import socket_helper
- from test.support import warnings_helper
- support.requires_working_socket(module=True)
- here = os.path.dirname(__file__)
- # Self-signed cert file for 'localhost'
- CERT_localhost = os.path.join(here, 'keycert.pem')
- # Self-signed cert file for 'fakehostname'
- CERT_fakehostname = os.path.join(here, 'keycert2.pem')
- # Self-signed cert file for self-signed.pythontest.net
- CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
- # constants for testing chunked encoding
- chunked_start = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- 'a\r\n'
- 'hello worl\r\n'
- '3\r\n'
- 'd! \r\n'
- '8\r\n'
- 'and now \r\n'
- '22\r\n'
- 'for something completely different\r\n'
- )
- chunked_expected = b'hello world! and now for something completely different'
- chunk_extension = ";foo=bar"
- last_chunk = "0\r\n"
- last_chunk_extended = "0" + chunk_extension + "\r\n"
- trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
- chunked_end = "\r\n"
- HOST = socket_helper.HOST
- class FakeSocket:
- def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
- if isinstance(text, str):
- text = text.encode("ascii")
- self.text = text
- self.fileclass = fileclass
- self.data = b''
- self.sendall_calls = 0
- self.file_closed = False
- self.host = host
- self.port = port
- def sendall(self, data):
- self.sendall_calls += 1
- self.data += data
- def makefile(self, mode, bufsize=None):
- if mode != 'r' and mode != 'rb':
- raise client.UnimplementedFileMode()
- # keep the file around so we can check how much was read from it
- self.file = self.fileclass(self.text)
- self.file.close = self.file_close #nerf close ()
- return self.file
- def file_close(self):
- self.file_closed = True
- def close(self):
- pass
- def setsockopt(self, level, optname, value):
- pass
- class EPipeSocket(FakeSocket):
- def __init__(self, text, pipe_trigger):
- # When sendall() is called with pipe_trigger, raise EPIPE.
- FakeSocket.__init__(self, text)
- self.pipe_trigger = pipe_trigger
- def sendall(self, data):
- if self.pipe_trigger in data:
- raise OSError(errno.EPIPE, "gotcha")
- self.data += data
- def close(self):
- pass
- class NoEOFBytesIO(io.BytesIO):
- """Like BytesIO, but raises AssertionError on EOF.
- This is used below to test that http.client doesn't try to read
- more from the underlying file than it should.
- """
- def read(self, n=-1):
- data = io.BytesIO.read(self, n)
- if data == b'':
- raise AssertionError('caller tried to read past EOF')
- return data
- def readline(self, length=None):
- data = io.BytesIO.readline(self, length)
- if data == b'':
- raise AssertionError('caller tried to read past EOF')
- return data
- class FakeSocketHTTPConnection(client.HTTPConnection):
- """HTTPConnection subclass using FakeSocket; counts connect() calls"""
- def __init__(self, *args):
- self.connections = 0
- super().__init__('example.com')
- self.fake_socket_args = args
- self._create_connection = self.create_connection
- def connect(self):
- """Count the number of times connect() is invoked"""
- self.connections += 1
- return super().connect()
- def create_connection(self, *pos, **kw):
- return FakeSocket(*self.fake_socket_args)
- class HeaderTests(TestCase):
- def test_auto_headers(self):
- # Some headers are added automatically, but should not be added by
- # .request() if they are explicitly set.
- class HeaderCountingBuffer(list):
- def __init__(self):
- self.count = {}
- def append(self, item):
- kv = item.split(b':')
- if len(kv) > 1:
- # item is a 'Key: Value' header string
- lcKey = kv[0].decode('ascii').lower()
- self.count.setdefault(lcKey, 0)
- self.count[lcKey] += 1
- list.append(self, item)
- for explicit_header in True, False:
- for header in 'Content-length', 'Host', 'Accept-encoding':
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket('blahblahblah')
- conn._buffer = HeaderCountingBuffer()
- body = 'spamspamspam'
- headers = {}
- if explicit_header:
- headers[header] = str(len(body))
- conn.request('POST', '/', body, headers)
- self.assertEqual(conn._buffer.count[header.lower()], 1)
- def test_content_length_0(self):
- class ContentLengthChecker(list):
- def __init__(self):
- list.__init__(self)
- self.content_length = None
- def append(self, item):
- kv = item.split(b':', 1)
- if len(kv) > 1 and kv[0].lower() == b'content-length':
- self.content_length = kv[1].strip()
- list.append(self, item)
- # Here, we're testing that methods expecting a body get a
- # content-length set to zero if the body is empty (either None or '')
- bodies = (None, '')
- methods_with_body = ('PUT', 'POST', 'PATCH')
- for method, body in itertools.product(methods_with_body, bodies):
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(None)
- conn._buffer = ContentLengthChecker()
- conn.request(method, '/', body)
- self.assertEqual(
- conn._buffer.content_length, b'0',
- 'Header Content-Length incorrect on {}'.format(method)
- )
- # For these methods, we make sure that content-length is not set when
- # the body is None because it might cause unexpected behaviour on the
- # server.
- methods_without_body = (
- 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
- )
- for method in methods_without_body:
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(None)
- conn._buffer = ContentLengthChecker()
- conn.request(method, '/', None)
- self.assertEqual(
- conn._buffer.content_length, None,
- 'Header Content-Length set for empty body on {}'.format(method)
- )
- # If the body is set to '', that's considered to be "present but
- # empty" rather than "missing", so content length would be set, even
- # for methods that don't expect a body.
- for method in methods_without_body:
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(None)
- conn._buffer = ContentLengthChecker()
- conn.request(method, '/', '')
- self.assertEqual(
- conn._buffer.content_length, b'0',
- 'Header Content-Length incorrect on {}'.format(method)
- )
- # If the body is set, make sure Content-Length is set.
- for method in itertools.chain(methods_without_body, methods_with_body):
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(None)
- conn._buffer = ContentLengthChecker()
- conn.request(method, '/', ' ')
- self.assertEqual(
- conn._buffer.content_length, b'1',
- 'Header Content-Length incorrect on {}'.format(method)
- )
- def test_putheader(self):
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(None)
- conn.putrequest('GET','/')
- conn.putheader('Content-length', 42)
- self.assertIn(b'Content-length: 42', conn._buffer)
- conn.putheader('Foo', ' bar ')
- self.assertIn(b'Foo: bar ', conn._buffer)
- conn.putheader('Bar', '\tbaz\t')
- self.assertIn(b'Bar: \tbaz\t', conn._buffer)
- conn.putheader('Authorization', 'Bearer mytoken')
- self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
- conn.putheader('IterHeader', 'IterA', 'IterB')
- self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
- conn.putheader('LatinHeader', b'\xFF')
- self.assertIn(b'LatinHeader: \xFF', conn._buffer)
- conn.putheader('Utf8Header', b'\xc3\x80')
- self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
- conn.putheader('C1-Control', b'next\x85line')
- self.assertIn(b'C1-Control: next\x85line', conn._buffer)
- conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
- self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
- conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
- self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
- conn.putheader('Key Space', 'value')
- self.assertIn(b'Key Space: value', conn._buffer)
- conn.putheader('KeySpace ', 'value')
- self.assertIn(b'KeySpace : value', conn._buffer)
- conn.putheader(b'Nonbreak\xa0Space', 'value')
- self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
- conn.putheader(b'\xa0NonbreakSpace', 'value')
- self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
- def test_ipv6host_header(self):
- # Default host header on IPv6 transaction should be wrapped by [] if
- # it is an IPv6 address
- expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
- b'Accept-Encoding: identity\r\n\r\n'
- conn = client.HTTPConnection('[2001::]:81')
- sock = FakeSocket('')
- conn.sock = sock
- conn.request('GET', '/foo')
- self.assertTrue(sock.data.startswith(expected))
- expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
- b'Accept-Encoding: identity\r\n\r\n'
- conn = client.HTTPConnection('[2001:102A::]')
- sock = FakeSocket('')
- conn.sock = sock
- conn.request('GET', '/foo')
- self.assertTrue(sock.data.startswith(expected))
- def test_malformed_headers_coped_with(self):
- # Issue 19996
- body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.getheader('First'), 'val')
- self.assertEqual(resp.getheader('Second'), 'val')
- def test_parse_all_octets(self):
- # Ensure no valid header field octet breaks the parser
- body = (
- b'HTTP/1.1 200 OK\r\n'
- b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters
- b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
- b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
- b'obs-fold: text\r\n'
- b' folded with space\r\n'
- b'\tfolded with tab\r\n'
- b'Content-Length: 0\r\n'
- b'\r\n'
- )
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.getheader('Content-Length'), '0')
- self.assertEqual(resp.msg['Content-Length'], '0')
- self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
- self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
- vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
- self.assertEqual(resp.getheader('VCHAR'), vchar)
- self.assertEqual(resp.msg['VCHAR'], vchar)
- self.assertIsNotNone(resp.getheader('obs-text'))
- self.assertIn('obs-text', resp.msg)
- for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
- self.assertTrue(folded.startswith('text'))
- self.assertIn(' folded with space', folded)
- self.assertTrue(folded.endswith('folded with tab'))
- def test_invalid_headers(self):
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket('')
- conn.putrequest('GET', '/')
- # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
- # longer allowed in header names
- cases = (
- (b'Invalid\r\nName', b'ValidValue'),
- (b'Invalid\rName', b'ValidValue'),
- (b'Invalid\nName', b'ValidValue'),
- (b'\r\nInvalidName', b'ValidValue'),
- (b'\rInvalidName', b'ValidValue'),
- (b'\nInvalidName', b'ValidValue'),
- (b' InvalidName', b'ValidValue'),
- (b'\tInvalidName', b'ValidValue'),
- (b'Invalid:Name', b'ValidValue'),
- (b':InvalidName', b'ValidValue'),
- (b'ValidName', b'Invalid\r\nValue'),
- (b'ValidName', b'Invalid\rValue'),
- (b'ValidName', b'Invalid\nValue'),
- (b'ValidName', b'InvalidValue\r\n'),
- (b'ValidName', b'InvalidValue\r'),
- (b'ValidName', b'InvalidValue\n'),
- )
- for name, value in cases:
- with self.subTest((name, value)):
- with self.assertRaisesRegex(ValueError, 'Invalid header'):
- conn.putheader(name, value)
- def test_headers_debuglevel(self):
- body = (
- b'HTTP/1.1 200 OK\r\n'
- b'First: val\r\n'
- b'Second: val1\r\n'
- b'Second: val2\r\n'
- )
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock, debuglevel=1)
- with support.captured_stdout() as output:
- resp.begin()
- lines = output.getvalue().splitlines()
- self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
- self.assertEqual(lines[1], "header: First: val")
- self.assertEqual(lines[2], "header: Second: val1")
- self.assertEqual(lines[3], "header: Second: val2")
- class HttpMethodTests(TestCase):
- def test_invalid_method_names(self):
- methods = (
- 'GET\r',
- 'POST\n',
- 'PUT\n\r',
- 'POST\nValue',
- 'POST\nHOST:abc',
- 'GET\nrHost:abc\n',
- 'POST\rRemainder:\r',
- 'GET\rHOST:\n',
- '\nPUT'
- )
- for method in methods:
- with self.assertRaisesRegex(
- ValueError, "method can't contain control characters"):
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(None)
- conn.request(method=method, url="/")
- class TransferEncodingTest(TestCase):
- expected_body = b"It's just a flesh wound"
- def test_endheaders_chunked(self):
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(b'')
- conn.putrequest('POST', '/')
- conn.endheaders(self._make_body(), encode_chunked=True)
- _, _, body = self._parse_request(conn.sock.data)
- body = self._parse_chunked(body)
- self.assertEqual(body, self.expected_body)
- def test_explicit_headers(self):
- # explicit chunked
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(b'')
- # this shouldn't actually be automatically chunk-encoded because the
- # calling code has explicitly stated that it's taking care of it
- conn.request(
- 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
- _, headers, body = self._parse_request(conn.sock.data)
- self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
- self.assertEqual(headers['Transfer-Encoding'], 'chunked')
- self.assertEqual(body, self.expected_body)
- # explicit chunked, string body
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(b'')
- conn.request(
- 'POST', '/', self.expected_body.decode('latin-1'),
- {'Transfer-Encoding': 'chunked'})
- _, headers, body = self._parse_request(conn.sock.data)
- self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
- self.assertEqual(headers['Transfer-Encoding'], 'chunked')
- self.assertEqual(body, self.expected_body)
- # User-specified TE, but request() does the chunk encoding
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(b'')
- conn.request('POST', '/',
- headers={'Transfer-Encoding': 'gzip, chunked'},
- encode_chunked=True,
- body=self._make_body())
- _, headers, body = self._parse_request(conn.sock.data)
- self.assertNotIn('content-length', [k.lower() for k in headers])
- self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
- self.assertEqual(self._parse_chunked(body), self.expected_body)
- def test_request(self):
- for empty_lines in (False, True,):
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(b'')
- conn.request(
- 'POST', '/', self._make_body(empty_lines=empty_lines))
- _, headers, body = self._parse_request(conn.sock.data)
- body = self._parse_chunked(body)
- self.assertEqual(body, self.expected_body)
- self.assertEqual(headers['Transfer-Encoding'], 'chunked')
- # Content-Length and Transfer-Encoding SHOULD not be sent in the
- # same request
- self.assertNotIn('content-length', [k.lower() for k in headers])
- def test_empty_body(self):
- # Zero-length iterable should be treated like any other iterable
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket(b'')
- conn.request('POST', '/', ())
- _, headers, body = self._parse_request(conn.sock.data)
- self.assertEqual(headers['Transfer-Encoding'], 'chunked')
- self.assertNotIn('content-length', [k.lower() for k in headers])
- self.assertEqual(body, b"0\r\n\r\n")
- def _make_body(self, empty_lines=False):
- lines = self.expected_body.split(b' ')
- for idx, line in enumerate(lines):
- # for testing handling empty lines
- if empty_lines and idx % 2:
- yield b''
- if idx < len(lines) - 1:
- yield line + b' '
- else:
- yield line
- def _parse_request(self, data):
- lines = data.split(b'\r\n')
- request = lines[0]
- headers = {}
- n = 1
- while n < len(lines) and len(lines[n]) > 0:
- key, val = lines[n].split(b':')
- key = key.decode('latin-1').strip()
- headers[key] = val.decode('latin-1').strip()
- n += 1
- return request, headers, b'\r\n'.join(lines[n + 1:])
- def _parse_chunked(self, data):
- body = []
- trailers = {}
- n = 0
- lines = data.split(b'\r\n')
- # parse body
- while True:
- size, chunk = lines[n:n+2]
- size = int(size, 16)
- if size == 0:
- n += 1
- break
- self.assertEqual(size, len(chunk))
- body.append(chunk)
- n += 2
- # we /should/ hit the end chunk, but check against the size of
- # lines so we're not stuck in an infinite loop should we get
- # malformed data
- if n > len(lines):
- break
- return b''.join(body)
- class BasicTest(TestCase):
- def test_dir_with_added_behavior_on_status(self):
- # see issue40084
- self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
- def test_simple_httpstatus(self):
- class CheckedHTTPStatus(enum.IntEnum):
- """HTTP status codes and reason phrases
- Status codes from the following RFCs are all observed:
- * RFC 7231: Hypertext Transfer Protocol (HTTP/1.1), obsoletes 2616
- * RFC 6585: Additional HTTP Status Codes
- * RFC 3229: Delta encoding in HTTP
- * RFC 4918: HTTP Extensions for WebDAV, obsoletes 2518
- * RFC 5842: Binding Extensions to WebDAV
- * RFC 7238: Permanent Redirect
- * RFC 2295: Transparent Content Negotiation in HTTP
- * RFC 2774: An HTTP Extension Framework
- * RFC 7725: An HTTP Status Code to Report Legal Obstacles
- * RFC 7540: Hypertext Transfer Protocol Version 2 (HTTP/2)
- * RFC 2324: Hyper Text Coffee Pot Control Protocol (HTCPCP/1.0)
- * RFC 8297: An HTTP Status Code for Indicating Hints
- * RFC 8470: Using Early Data in HTTP
- """
- def __new__(cls, value, phrase, description=''):
- obj = int.__new__(cls, value)
- obj._value_ = value
- obj.phrase = phrase
- obj.description = description
- return obj
- # informational
- CONTINUE = 100, 'Continue', 'Request received, please continue'
- SWITCHING_PROTOCOLS = (101, 'Switching Protocols',
- 'Switching to new protocol; obey Upgrade header')
- PROCESSING = 102, 'Processing'
- EARLY_HINTS = 103, 'Early Hints'
- # success
- OK = 200, 'OK', 'Request fulfilled, document follows'
- CREATED = 201, 'Created', 'Document created, URL follows'
- ACCEPTED = (202, 'Accepted',
- 'Request accepted, processing continues off-line')
- NON_AUTHORITATIVE_INFORMATION = (203,
- 'Non-Authoritative Information', 'Request fulfilled from cache')
- NO_CONTENT = 204, 'No Content', 'Request fulfilled, nothing follows'
- RESET_CONTENT = 205, 'Reset Content', 'Clear input form for further input'
- PARTIAL_CONTENT = 206, 'Partial Content', 'Partial content follows'
- MULTI_STATUS = 207, 'Multi-Status'
- ALREADY_REPORTED = 208, 'Already Reported'
- IM_USED = 226, 'IM Used'
- # redirection
- MULTIPLE_CHOICES = (300, 'Multiple Choices',
- 'Object has several resources -- see URI list')
- MOVED_PERMANENTLY = (301, 'Moved Permanently',
- 'Object moved permanently -- see URI list')
- FOUND = 302, 'Found', 'Object moved temporarily -- see URI list'
- SEE_OTHER = 303, 'See Other', 'Object moved -- see Method and URL list'
- NOT_MODIFIED = (304, 'Not Modified',
- 'Document has not changed since given time')
- USE_PROXY = (305, 'Use Proxy',
- 'You must use proxy specified in Location to access this resource')
- TEMPORARY_REDIRECT = (307, 'Temporary Redirect',
- 'Object moved temporarily -- see URI list')
- PERMANENT_REDIRECT = (308, 'Permanent Redirect',
- 'Object moved permanently -- see URI list')
- # client error
- BAD_REQUEST = (400, 'Bad Request',
- 'Bad request syntax or unsupported method')
- UNAUTHORIZED = (401, 'Unauthorized',
- 'No permission -- see authorization schemes')
- PAYMENT_REQUIRED = (402, 'Payment Required',
- 'No payment -- see charging schemes')
- FORBIDDEN = (403, 'Forbidden',
- 'Request forbidden -- authorization will not help')
- NOT_FOUND = (404, 'Not Found',
- 'Nothing matches the given URI')
- METHOD_NOT_ALLOWED = (405, 'Method Not Allowed',
- 'Specified method is invalid for this resource')
- NOT_ACCEPTABLE = (406, 'Not Acceptable',
- 'URI not available in preferred format')
- PROXY_AUTHENTICATION_REQUIRED = (407,
- 'Proxy Authentication Required',
- 'You must authenticate with this proxy before proceeding')
- REQUEST_TIMEOUT = (408, 'Request Timeout',
- 'Request timed out; try again later')
- CONFLICT = 409, 'Conflict', 'Request conflict'
- GONE = (410, 'Gone',
- 'URI no longer exists and has been permanently removed')
- LENGTH_REQUIRED = (411, 'Length Required',
- 'Client must specify Content-Length')
- PRECONDITION_FAILED = (412, 'Precondition Failed',
- 'Precondition in headers is false')
- REQUEST_ENTITY_TOO_LARGE = (413, 'Request Entity Too Large',
- 'Entity is too large')
- REQUEST_URI_TOO_LONG = (414, 'Request-URI Too Long',
- 'URI is too long')
- UNSUPPORTED_MEDIA_TYPE = (415, 'Unsupported Media Type',
- 'Entity body in unsupported format')
- REQUESTED_RANGE_NOT_SATISFIABLE = (416,
- 'Requested Range Not Satisfiable',
- 'Cannot satisfy request range')
- EXPECTATION_FAILED = (417, 'Expectation Failed',
- 'Expect condition could not be satisfied')
- IM_A_TEAPOT = (418, 'I\'m a Teapot',
- 'Server refuses to brew coffee because it is a teapot.')
- MISDIRECTED_REQUEST = (421, 'Misdirected Request',
- 'Server is not able to produce a response')
- UNPROCESSABLE_ENTITY = 422, 'Unprocessable Entity'
- LOCKED = 423, 'Locked'
- FAILED_DEPENDENCY = 424, 'Failed Dependency'
- TOO_EARLY = 425, 'Too Early'
- UPGRADE_REQUIRED = 426, 'Upgrade Required'
- PRECONDITION_REQUIRED = (428, 'Precondition Required',
- 'The origin server requires the request to be conditional')
- TOO_MANY_REQUESTS = (429, 'Too Many Requests',
- 'The user has sent too many requests in '
- 'a given amount of time ("rate limiting")')
- REQUEST_HEADER_FIELDS_TOO_LARGE = (431,
- 'Request Header Fields Too Large',
- 'The server is unwilling to process the request because its header '
- 'fields are too large')
- UNAVAILABLE_FOR_LEGAL_REASONS = (451,
- 'Unavailable For Legal Reasons',
- 'The server is denying access to the '
- 'resource as a consequence of a legal demand')
- # server errors
- INTERNAL_SERVER_ERROR = (500, 'Internal Server Error',
- 'Server got itself in trouble')
- NOT_IMPLEMENTED = (501, 'Not Implemented',
- 'Server does not support this operation')
- BAD_GATEWAY = (502, 'Bad Gateway',
- 'Invalid responses from another server/proxy')
- SERVICE_UNAVAILABLE = (503, 'Service Unavailable',
- 'The server cannot process the request due to a high load')
- GATEWAY_TIMEOUT = (504, 'Gateway Timeout',
- 'The gateway server did not receive a timely response')
- HTTP_VERSION_NOT_SUPPORTED = (505, 'HTTP Version Not Supported',
- 'Cannot fulfill request')
- VARIANT_ALSO_NEGOTIATES = 506, 'Variant Also Negotiates'
- INSUFFICIENT_STORAGE = 507, 'Insufficient Storage'
- LOOP_DETECTED = 508, 'Loop Detected'
- NOT_EXTENDED = 510, 'Not Extended'
- NETWORK_AUTHENTICATION_REQUIRED = (511,
- 'Network Authentication Required',
- 'The client needs to authenticate to gain network access')
- enum._test_simple_enum(CheckedHTTPStatus, HTTPStatus)
- def test_status_lines(self):
- # Test HTTP status lines
- body = "HTTP/1.1 200 Ok\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.read(0), b'') # Issue #20007
- self.assertFalse(resp.isclosed())
- self.assertFalse(resp.closed)
- self.assertEqual(resp.read(), b"Text")
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- self.assertRaises(client.BadStatusLine, resp.begin)
- def test_bad_status_repr(self):
- exc = client.BadStatusLine('')
- self.assertEqual(repr(exc), '''BadStatusLine("''")''')
- def test_partial_reads(self):
- # if we have Content-Length, HTTPResponse knows when to close itself,
- # the same behaviour as when we read the whole thing with read()
- body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.read(2), b'Te')
- self.assertFalse(resp.isclosed())
- self.assertEqual(resp.read(2), b'xt')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_mixed_reads(self):
- # readline() should update the remaining length, so that read() knows
- # how much data is left and does not raise IncompleteRead
- body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.readline(), b'Text\r\n')
- self.assertFalse(resp.isclosed())
- self.assertEqual(resp.read(), b'Another')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_partial_readintos(self):
- # if we have Content-Length, HTTPResponse knows when to close itself,
- # the same behaviour as when we read the whole thing with read()
- body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- b = bytearray(2)
- n = resp.readinto(b)
- self.assertEqual(n, 2)
- self.assertEqual(bytes(b), b'Te')
- self.assertFalse(resp.isclosed())
- n = resp.readinto(b)
- self.assertEqual(n, 2)
- self.assertEqual(bytes(b), b'xt')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_partial_reads_past_end(self):
- # if we have Content-Length, clip reads to the end
- body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.read(10), b'Text')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_partial_readintos_past_end(self):
- # if we have Content-Length, clip readintos to the end
- body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- b = bytearray(10)
- n = resp.readinto(b)
- self.assertEqual(n, 4)
- self.assertEqual(bytes(b)[:4], b'Text')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_partial_reads_no_content_length(self):
- # when no length is present, the socket should be gracefully closed when
- # all data was read
- body = "HTTP/1.1 200 Ok\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.read(2), b'Te')
- self.assertFalse(resp.isclosed())
- self.assertEqual(resp.read(2), b'xt')
- self.assertEqual(resp.read(1), b'')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_partial_readintos_no_content_length(self):
- # when no length is present, the socket should be gracefully closed when
- # all data was read
- body = "HTTP/1.1 200 Ok\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- b = bytearray(2)
- n = resp.readinto(b)
- self.assertEqual(n, 2)
- self.assertEqual(bytes(b), b'Te')
- self.assertFalse(resp.isclosed())
- n = resp.readinto(b)
- self.assertEqual(n, 2)
- self.assertEqual(bytes(b), b'xt')
- n = resp.readinto(b)
- self.assertEqual(n, 0)
- self.assertTrue(resp.isclosed())
- def test_partial_reads_incomplete_body(self):
- # if the server shuts down the connection before the whole
- # content-length is delivered, the socket is gracefully closed
- body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.read(2), b'Te')
- self.assertFalse(resp.isclosed())
- self.assertEqual(resp.read(2), b'xt')
- self.assertEqual(resp.read(1), b'')
- self.assertTrue(resp.isclosed())
- def test_partial_readintos_incomplete_body(self):
- # if the server shuts down the connection before the whole
- # content-length is delivered, the socket is gracefully closed
- body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- b = bytearray(2)
- n = resp.readinto(b)
- self.assertEqual(n, 2)
- self.assertEqual(bytes(b), b'Te')
- self.assertFalse(resp.isclosed())
- n = resp.readinto(b)
- self.assertEqual(n, 2)
- self.assertEqual(bytes(b), b'xt')
- n = resp.readinto(b)
- self.assertEqual(n, 0)
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_host_port(self):
- # Check invalid host_port
- for hp in ("www.python.org:abc", "user:password@www.python.org"):
- self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
- for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
- "fe80::207:e9ff:fe9b", 8000),
- ("www.python.org:80", "www.python.org", 80),
- ("www.python.org:", "www.python.org", 80),
- ("www.python.org", "www.python.org", 80),
- ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
- ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
- c = client.HTTPConnection(hp)
- self.assertEqual(h, c.host)
- self.assertEqual(p, c.port)
- def test_response_headers(self):
- # test response with multiple message headers with the same field name.
- text = ('HTTP/1.1 200 OK\r\n'
- 'Set-Cookie: Customer="WILE_E_COYOTE"; '
- 'Version="1"; Path="/acme"\r\n'
- 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
- ' Path="/acme"\r\n'
- '\r\n'
- 'No body\r\n')
- hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
- ', '
- 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
- s = FakeSocket(text)
- r = client.HTTPResponse(s)
- r.begin()
- cookies = r.getheader("Set-Cookie")
- self.assertEqual(cookies, hdr)
- def test_read_head(self):
- # Test that the library doesn't attempt to read any data
- # from a HEAD request. (Tickles SF bug #622042.)
- sock = FakeSocket(
- 'HTTP/1.1 200 OK\r\n'
- 'Content-Length: 14432\r\n'
- '\r\n',
- NoEOFBytesIO)
- resp = client.HTTPResponse(sock, method="HEAD")
- resp.begin()
- if resp.read():
- self.fail("Did not expect response from HEAD request")
- def test_readinto_head(self):
- # Test that the library doesn't attempt to read any data
- # from a HEAD request. (Tickles SF bug #622042.)
- sock = FakeSocket(
- 'HTTP/1.1 200 OK\r\n'
- 'Content-Length: 14432\r\n'
- '\r\n',
- NoEOFBytesIO)
- resp = client.HTTPResponse(sock, method="HEAD")
- resp.begin()
- b = bytearray(5)
- if resp.readinto(b) != 0:
- self.fail("Did not expect response from HEAD request")
- self.assertEqual(bytes(b), b'\x00'*5)
- def test_too_many_headers(self):
- headers = '\r\n'.join('Header%d: foo' % i
- for i in range(client._MAXHEADERS + 1)) + '\r\n'
- text = ('HTTP/1.1 200 OK\r\n' + headers)
- s = FakeSocket(text)
- r = client.HTTPResponse(s)
- self.assertRaisesRegex(client.HTTPException,
- r"got more than \d+ headers", r.begin)
- def test_send_file(self):
- expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
- b'Accept-Encoding: identity\r\n'
- b'Transfer-Encoding: chunked\r\n'
- b'\r\n')
- with open(__file__, 'rb') as body:
- conn = client.HTTPConnection('example.com')
- sock = FakeSocket(body)
- conn.sock = sock
- conn.request('GET', '/foo', body)
- self.assertTrue(sock.data.startswith(expected), '%r != %r' %
- (sock.data[:len(expected)], expected))
- def test_send(self):
- expected = b'this is a test this is only a test'
- conn = client.HTTPConnection('example.com')
- sock = FakeSocket(None)
- conn.sock = sock
- conn.send(expected)
- self.assertEqual(expected, sock.data)
- sock.data = b''
- conn.send(array.array('b', expected))
- self.assertEqual(expected, sock.data)
- sock.data = b''
- conn.send(io.BytesIO(expected))
- self.assertEqual(expected, sock.data)
- def test_send_updating_file(self):
- def data():
- yield 'data'
- yield None
- yield 'data_two'
- class UpdatingFile(io.TextIOBase):
- mode = 'r'
- d = data()
- def read(self, blocksize=-1):
- return next(self.d)
- expected = b'data'
- conn = client.HTTPConnection('example.com')
- sock = FakeSocket("")
- conn.sock = sock
- conn.send(UpdatingFile())
- self.assertEqual(sock.data, expected)
- def test_send_iter(self):
- expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
- b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
- b'\r\nonetwothree'
- def body():
- yield b"one"
- yield b"two"
- yield b"three"
- conn = client.HTTPConnection('example.com')
- sock = FakeSocket("")
- conn.sock = sock
- conn.request('GET', '/foo', body(), {'Content-Length': '11'})
- self.assertEqual(sock.data, expected)
- def test_blocksize_request(self):
- """Check that request() respects the configured block size."""
- blocksize = 8 # For easy debugging.
- conn = client.HTTPConnection('example.com', blocksize=blocksize)
- sock = FakeSocket(None)
- conn.sock = sock
- expected = b"a" * blocksize + b"b"
- conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
- self.assertEqual(sock.sendall_calls, 3)
- body = sock.data.split(b"\r\n\r\n", 1)[1]
- self.assertEqual(body, expected)
- def test_blocksize_send(self):
- """Check that send() respects the configured block size."""
- blocksize = 8 # For easy debugging.
- conn = client.HTTPConnection('example.com', blocksize=blocksize)
- sock = FakeSocket(None)
- conn.sock = sock
- expected = b"a" * blocksize + b"b"
- conn.send(io.BytesIO(expected))
- self.assertEqual(sock.sendall_calls, 2)
- self.assertEqual(sock.data, expected)
- def test_send_type_error(self):
- # See: Issue #12676
- conn = client.HTTPConnection('example.com')
- conn.sock = FakeSocket('')
- with self.assertRaises(TypeError):
- conn.request('POST', 'test', conn)
- def test_chunked(self):
- expected = chunked_expected
- sock = FakeSocket(chunked_start + last_chunk + chunked_end)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(), expected)
- resp.close()
- # Various read sizes
- for n in range(1, 12):
- sock = FakeSocket(chunked_start + last_chunk + chunked_end)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
- resp.close()
- for x in ('', 'foo\r\n'):
- sock = FakeSocket(chunked_start + x)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- try:
- resp.read()
- except client.IncompleteRead as i:
- self.assertEqual(i.partial, expected)
- expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
- self.assertEqual(repr(i), expected_message)
- self.assertEqual(str(i), expected_message)
- else:
- self.fail('IncompleteRead expected')
- finally:
- resp.close()
- def test_readinto_chunked(self):
- expected = chunked_expected
- nexpected = len(expected)
- b = bytearray(128)
- sock = FakeSocket(chunked_start + last_chunk + chunked_end)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- n = resp.readinto(b)
- self.assertEqual(b[:nexpected], expected)
- self.assertEqual(n, nexpected)
- resp.close()
- # Various read sizes
- for n in range(1, 12):
- sock = FakeSocket(chunked_start + last_chunk + chunked_end)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- m = memoryview(b)
- i = resp.readinto(m[0:n])
- i += resp.readinto(m[i:n + i])
- i += resp.readinto(m[i:])
- self.assertEqual(b[:nexpected], expected)
- self.assertEqual(i, nexpected)
- resp.close()
- for x in ('', 'foo\r\n'):
- sock = FakeSocket(chunked_start + x)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- try:
- n = resp.readinto(b)
- except client.IncompleteRead as i:
- self.assertEqual(i.partial, expected)
- expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
- self.assertEqual(repr(i), expected_message)
- self.assertEqual(str(i), expected_message)
- else:
- self.fail('IncompleteRead expected')
- finally:
- resp.close()
- def test_chunked_head(self):
- chunked_start = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- 'a\r\n'
- 'hello world\r\n'
- '1\r\n'
- 'd\r\n'
- )
- sock = FakeSocket(chunked_start + last_chunk + chunked_end)
- resp = client.HTTPResponse(sock, method="HEAD")
- resp.begin()
- self.assertEqual(resp.read(), b'')
- self.assertEqual(resp.status, 200)
- self.assertEqual(resp.reason, 'OK')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_readinto_chunked_head(self):
- chunked_start = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- 'a\r\n'
- 'hello world\r\n'
- '1\r\n'
- 'd\r\n'
- )
- sock = FakeSocket(chunked_start + last_chunk + chunked_end)
- resp = client.HTTPResponse(sock, method="HEAD")
- resp.begin()
- b = bytearray(5)
- n = resp.readinto(b)
- self.assertEqual(n, 0)
- self.assertEqual(bytes(b), b'\x00'*5)
- self.assertEqual(resp.status, 200)
- self.assertEqual(resp.reason, 'OK')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_negative_content_length(self):
- sock = FakeSocket(
- 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(), b'Hello\r\n')
- self.assertTrue(resp.isclosed())
- def test_incomplete_read(self):
- sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- try:
- resp.read()
- except client.IncompleteRead as i:
- self.assertEqual(i.partial, b'Hello\r\n')
- self.assertEqual(repr(i),
- "IncompleteRead(7 bytes read, 3 more expected)")
- self.assertEqual(str(i),
- "IncompleteRead(7 bytes read, 3 more expected)")
- self.assertTrue(resp.isclosed())
- else:
- self.fail('IncompleteRead expected')
- def test_epipe(self):
- sock = EPipeSocket(
- "HTTP/1.0 401 Authorization Required\r\n"
- "Content-type: text/html\r\n"
- "WWW-Authenticate: Basic realm=\"example\"\r\n",
- b"Content-Length")
- conn = client.HTTPConnection("example.com")
- conn.sock = sock
- self.assertRaises(OSError,
- lambda: conn.request("PUT", "/url", "body"))
- resp = conn.getresponse()
- self.assertEqual(401, resp.status)
- self.assertEqual("Basic realm=\"example\"",
- resp.getheader("www-authenticate"))
- # Test lines overflowing the max line size (_MAXLINE in http.client)
- def test_overflowing_status_line(self):
- body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
- resp = client.HTTPResponse(FakeSocket(body))
- self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
- def test_overflowing_header_line(self):
- body = (
- 'HTTP/1.1 200 OK\r\n'
- 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
- )
- resp = client.HTTPResponse(FakeSocket(body))
- self.assertRaises(client.LineTooLong, resp.begin)
- def test_overflowing_header_limit_after_100(self):
- body = (
- 'HTTP/1.1 100 OK\r\n'
- 'r\n' * 32768
- )
- resp = client.HTTPResponse(FakeSocket(body))
- with self.assertRaises(client.HTTPException) as cm:
- resp.begin()
- # We must assert more because other reasonable errors that we
- # do not want can also be HTTPException derived.
- self.assertIn('got more than ', str(cm.exception))
- self.assertIn('headers', str(cm.exception))
- def test_overflowing_chunked_line(self):
- body = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- + '0' * 65536 + 'a\r\n'
- 'hello world\r\n'
- '0\r\n'
- '\r\n'
- )
- resp = client.HTTPResponse(FakeSocket(body))
- resp.begin()
- self.assertRaises(client.LineTooLong, resp.read)
- def test_early_eof(self):
- # Test httpresponse with no \r\n termination,
- body = "HTTP/1.1 200 Ok"
- sock = FakeSocket(body)
- resp = client.HTTPResponse(sock)
- resp.begin()
- self.assertEqual(resp.read(), b'')
- self.assertTrue(resp.isclosed())
- self.assertFalse(resp.closed)
- resp.close()
- self.assertTrue(resp.closed)
- def test_error_leak(self):
- # Test that the socket is not leaked if getresponse() fails
- conn = client.HTTPConnection('example.com')
- response = None
- class Response(client.HTTPResponse):
- def __init__(self, *pos, **kw):
- nonlocal response
- response = self # Avoid garbage collector closing the socket
- client.HTTPResponse.__init__(self, *pos, **kw)
- conn.response_class = Response
- conn.sock = FakeSocket('Invalid status line')
- conn.request('GET', '/')
- self.assertRaises(client.BadStatusLine, conn.getresponse)
- self.assertTrue(response.closed)
- self.assertTrue(conn.sock.file_closed)
- def test_chunked_extension(self):
- extra = '3;foo=bar\r\n' + 'abc\r\n'
- expected = chunked_expected + b'abc'
- sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(), expected)
- resp.close()
- def test_chunked_missing_end(self):
- """some servers may serve up a short chunked encoding stream"""
- expected = chunked_expected
- sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(), expected)
- resp.close()
- def test_chunked_trailers(self):
- """See that trailers are read and ignored"""
- expected = chunked_expected
- sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(), expected)
- # we should have reached the end of the file
- self.assertEqual(sock.file.read(), b"") #we read to the end
- resp.close()
- def test_chunked_sync(self):
- """Check that we don't read past the end of the chunked-encoding stream"""
- expected = chunked_expected
- extradata = "extradata"
- sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(), expected)
- # the file should now have our extradata ready to be read
- self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
- resp.close()
- def test_content_length_sync(self):
- """Check that we don't read past the end of the Content-Length stream"""
- extradata = b"extradata"
- expected = b"Hello123\r\n"
- sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read(), expected)
- # the file should now have our extradata ready to be read
- self.assertEqual(sock.file.read(), extradata) #we read to the end
- resp.close()
- def test_readlines_content_length(self):
- extradata = b"extradata"
- expected = b"Hello123\r\n"
- sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.readlines(2000), [expected])
- # the file should now have our extradata ready to be read
- self.assertEqual(sock.file.read(), extradata) #we read to the end
- resp.close()
- def test_read1_content_length(self):
- extradata = b"extradata"
- expected = b"Hello123\r\n"
- sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read1(2000), expected)
- # the file should now have our extradata ready to be read
- self.assertEqual(sock.file.read(), extradata) #we read to the end
- resp.close()
- def test_readline_bound_content_length(self):
- extradata = b"extradata"
- expected = b"Hello123\r\n"
- sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.readline(10), expected)
- self.assertEqual(resp.readline(10), b"")
- # the file should now have our extradata ready to be read
- self.assertEqual(sock.file.read(), extradata) #we read to the end
- resp.close()
- def test_read1_bound_content_length(self):
- extradata = b"extradata"
- expected = b"Hello123\r\n"
- sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- self.assertEqual(resp.read1(20), expected*2)
- self.assertEqual(resp.read(), expected)
- # the file should now have our extradata ready to be read
- self.assertEqual(sock.file.read(), extradata) #we read to the end
- resp.close()
- def test_response_fileno(self):
- # Make sure fd returned by fileno is valid.
- serv = socket.create_server((HOST, 0))
- self.addCleanup(serv.close)
- result = None
- def run_server():
- [conn, address] = serv.accept()
- with conn, conn.makefile("rb") as reader:
- # Read the request header until a blank line
- while True:
- line = reader.readline()
- if not line.rstrip(b"\r\n"):
- break
- conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
- nonlocal result
- result = reader.read()
- thread = threading.Thread(target=run_server)
- thread.start()
- self.addCleanup(thread.join, float(1))
- conn = client.HTTPConnection(*serv.getsockname())
- conn.request("CONNECT", "dummy:1234")
- response = conn.getresponse()
- try:
- self.assertEqual(response.status, client.OK)
- s = socket.socket(fileno=response.fileno())
- try:
- s.sendall(b"proxied data\n")
- finally:
- s.detach()
- finally:
- response.close()
- conn.close()
- thread.join()
- self.assertEqual(result, b"proxied data\n")
- def test_putrequest_override_domain_validation(self):
- """
- It should be possible to override the default validation
- behavior in putrequest (bpo-38216).
- """
- class UnsafeHTTPConnection(client.HTTPConnection):
- def _validate_path(self, url):
- pass
- conn = UnsafeHTTPConnection('example.com')
- conn.sock = FakeSocket('')
- conn.putrequest('GET', '/\x00')
- def test_putrequest_override_host_validation(self):
- class UnsafeHTTPConnection(client.HTTPConnection):
- def _validate_host(self, url):
- pass
- conn = UnsafeHTTPConnection('example.com\r\n')
- conn.sock = FakeSocket('')
- # set skip_host so a ValueError is not raised upon adding the
- # invalid URL as the value of the "Host:" header
- conn.putrequest('GET', '/', skip_host=1)
- def test_putrequest_override_encoding(self):
- """
- It should be possible to override the default encoding
- to transmit bytes in another encoding even if invalid
- (bpo-36274).
- """
- class UnsafeHTTPConnection(client.HTTPConnection):
- def _encode_request(self, str_url):
- return str_url.encode('utf-8')
- conn = UnsafeHTTPConnection('example.com')
- conn.sock = FakeSocket('')
- conn.putrequest('GET', '/☃')
- class ExtendedReadTest(TestCase):
- """
- Test peek(), read1(), readline()
- """
- lines = (
- 'HTTP/1.1 200 OK\r\n'
- '\r\n'
- 'hello world!\n'
- 'and now \n'
- 'for something completely different\n'
- 'foo'
- )
- lines_expected = lines[lines.find('hello'):].encode("ascii")
- lines_chunked = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- 'a\r\n'
- 'hello worl\r\n'
- '3\r\n'
- 'd!\n\r\n'
- '9\r\n'
- 'and now \n\r\n'
- '23\r\n'
- 'for something completely different\n\r\n'
- '3\r\n'
- 'foo\r\n'
- '0\r\n' # terminating chunk
- '\r\n' # end of trailers
- )
- def setUp(self):
- sock = FakeSocket(self.lines)
- resp = client.HTTPResponse(sock, method="GET")
- resp.begin()
- resp.fp = io.BufferedReader(resp.fp)
- self.resp = resp
- def test_peek(self):
- resp = self.resp
- # patch up the buffered peek so that it returns not too much stuff
- oldpeek = resp.fp.peek
- def mypeek(n=-1):
- p = oldpeek(n)
- if n >= 0:
- return p[:n]
- return p[:10]
- resp.fp.peek = mypeek
- all = []
- while True:
- # try a short peek
- p = resp.peek(3)
- if p:
- self.assertGreater(len(p), 0)
- # then unbounded peek
- p2 = resp.peek()
- self.assertGreaterEqual(len(p2), len(p))
- self.assertTrue(p2.startswith(p))
- next = resp.read(len(p2))
- self.assertEqual(next, p2)
- else:
- next = resp.read()
- self.assertFalse(next)
- all.append(next)
- if not next:
- break
- self.assertEqual(b"".join(all), self.lines_expected)
- def test_readline(self):
- resp = self.resp
- self._verify_readline(self.resp.readline, self.lines_expected)
- def _verify_readline(self, readline, expected):
- all = []
- while True:
- # short readlines
- line = readline(5)
- if line and line != b"foo":
- if len(line) < 5:
- self.assertTrue(line.endswith(b"\n"))
- all.append(line)
- if not line:
- break
- self.assertEqual(b"".join(all), expected)
- def test_read1(self):
- resp = self.resp
- def r():
- res = resp.read1(4)
- self.assertLessEqual(len(res), 4)
- return res
- readliner = Readliner(r)
- self._verify_readline(readliner.readline, self.lines_expected)
- def test_read1_unbounded(self):
- resp = self.resp
- all = []
- while True:
- data = resp.read1()
- if not data:
- break
- all.append(data)
- self.assertEqual(b"".join(all), self.lines_expected)
- def test_read1_bounded(self):
- resp = self.resp
- all = []
- while True:
- data = resp.read1(10)
- if not data:
- break
- self.assertLessEqual(len(data), 10)
- all.append(data)
- self.assertEqual(b"".join(all), self.lines_expected)
- def test_read1_0(self):
- self.assertEqual(self.resp.read1(0), b"")
- def test_peek_0(self):
- p = self.resp.peek(0)
- self.assertLessEqual(0, len(p))
- class ExtendedReadTestChunked(ExtendedReadTest):
- """
- Test peek(), read1(), readline() in chunked mode
- """
- lines = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- 'a\r\n'
- 'hello worl\r\n'
- '3\r\n'
- 'd!\n\r\n'
- '9\r\n'
- 'and now \n\r\n'
- '23\r\n'
- 'for something completely different\n\r\n'
- '3\r\n'
- 'foo\r\n'
- '0\r\n' # terminating chunk
- '\r\n' # end of trailers
- )
- class Readliner:
- """
- a simple readline class that uses an arbitrary read function and buffering
- """
- def __init__(self, readfunc):
- self.readfunc = readfunc
- self.remainder = b""
- def readline(self, limit):
- data = []
- datalen = 0
- read = self.remainder
- try:
- while True:
- idx = read.find(b'\n')
- if idx != -1:
- break
- if datalen + len(read) >= limit:
- idx = limit - datalen - 1
- # read more data
- data.append(read)
- read = self.readfunc()
- if not read:
- idx = 0 #eof condition
- break
- idx += 1
- data.append(read[:idx])
- self.remainder = read[idx:]
- return b"".join(data)
- except:
- self.remainder = b"".join(data)
- raise
- class OfflineTest(TestCase):
- def test_all(self):
- # Documented objects defined in the module should be in __all__
- expected = {"responses"} # Allowlist documented dict() object
- # HTTPMessage, parse_headers(), and the HTTP status code constants are
- # intentionally omitted for simplicity
- denylist = {"HTTPMessage", "parse_headers"}
- for name in dir(client):
- if name.startswith("_") or name in denylist:
- continue
- module_object = getattr(client, name)
- if getattr(module_object, "__module__", None) == "http.client":
- expected.add(name)
- self.assertCountEqual(client.__all__, expected)
- def test_responses(self):
- self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
- def test_client_constants(self):
- # Make sure we don't break backward compatibility with 3.4
- expected = [
- 'CONTINUE',
- 'SWITCHING_PROTOCOLS',
- 'PROCESSING',
- 'OK',
- 'CREATED',
- 'ACCEPTED',
- 'NON_AUTHORITATIVE_INFORMATION',
- 'NO_CONTENT',
- 'RESET_CONTENT',
- 'PARTIAL_CONTENT',
- 'MULTI_STATUS',
- 'IM_USED',
- 'MULTIPLE_CHOICES',
- 'MOVED_PERMANENTLY',
- 'FOUND',
- 'SEE_OTHER',
- 'NOT_MODIFIED',
- 'USE_PROXY',
- 'TEMPORARY_REDIRECT',
- 'BAD_REQUEST',
- 'UNAUTHORIZED',
- 'PAYMENT_REQUIRED',
- 'FORBIDDEN',
- 'NOT_FOUND',
- 'METHOD_NOT_ALLOWED',
- 'NOT_ACCEPTABLE',
- 'PROXY_AUTHENTICATION_REQUIRED',
- 'REQUEST_TIMEOUT',
- 'CONFLICT',
- 'GONE',
- 'LENGTH_REQUIRED',
- 'PRECONDITION_FAILED',
- 'REQUEST_ENTITY_TOO_LARGE',
- 'REQUEST_URI_TOO_LONG',
- 'UNSUPPORTED_MEDIA_TYPE',
- 'REQUESTED_RANGE_NOT_SATISFIABLE',
- 'EXPECTATION_FAILED',
- 'IM_A_TEAPOT',
- 'MISDIRECTED_REQUEST',
- 'UNPROCESSABLE_ENTITY',
- 'LOCKED',
- 'FAILED_DEPENDENCY',
- 'UPGRADE_REQUIRED',
- 'PRECONDITION_REQUIRED',
- 'TOO_MANY_REQUESTS',
- 'REQUEST_HEADER_FIELDS_TOO_LARGE',
- 'UNAVAILABLE_FOR_LEGAL_REASONS',
- 'INTERNAL_SERVER_ERROR',
- 'NOT_IMPLEMENTED',
- 'BAD_GATEWAY',
- 'SERVICE_UNAVAILABLE',
- 'GATEWAY_TIMEOUT',
- 'HTTP_VERSION_NOT_SUPPORTED',
- 'INSUFFICIENT_STORAGE',
- 'NOT_EXTENDED',
- 'NETWORK_AUTHENTICATION_REQUIRED',
- 'EARLY_HINTS',
- 'TOO_EARLY'
- ]
- for const in expected:
- with self.subTest(constant=const):
- self.assertTrue(hasattr(client, const))
- class SourceAddressTest(TestCase):
- def setUp(self):
- self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.port = socket_helper.bind_port(self.serv)
- self.source_port = socket_helper.find_unused_port()
- self.serv.listen()
- self.conn = None
- def tearDown(self):
- if self.conn:
- self.conn.close()
- self.conn = None
- self.serv.close()
- self.serv = None
- def testHTTPConnectionSourceAddress(self):
- self.conn = client.HTTPConnection(HOST, self.port,
- source_address=('', self.source_port))
- self.conn.connect()
- self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
- @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
- 'http.client.HTTPSConnection not defined')
- def testHTTPSConnectionSourceAddress(self):
- self.conn = client.HTTPSConnection(HOST, self.port,
- source_address=('', self.source_port))
- # We don't test anything here other than the constructor not barfing as
- # this code doesn't deal with setting up an active running SSL server
- # for an ssl_wrapped connect() to actually return from.
- class TimeoutTest(TestCase):
- PORT = None
- def setUp(self):
- self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- TimeoutTest.PORT = socket_helper.bind_port(self.serv)
- self.serv.listen()
- def tearDown(self):
- self.serv.close()
- self.serv = None
- def testTimeoutAttribute(self):
- # This will prove that the timeout gets through HTTPConnection
- # and into the socket.
- # default -- use global socket timeout
- self.assertIsNone(socket.getdefaulttimeout())
- socket.setdefaulttimeout(30)
- try:
- httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
- httpConn.connect()
- finally:
- socket.setdefaulttimeout(None)
- self.assertEqual(httpConn.sock.gettimeout(), 30)
- httpConn.close()
- # no timeout -- do not use global socket default
- self.assertIsNone(socket.getdefaulttimeout())
- socket.setdefaulttimeout(30)
- try:
- httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
- timeout=None)
- httpConn.connect()
- finally:
- socket.setdefaulttimeout(None)
- self.assertEqual(httpConn.sock.gettimeout(), None)
- httpConn.close()
- # a value
- httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
- httpConn.connect()
- self.assertEqual(httpConn.sock.gettimeout(), 30)
- httpConn.close()
- class PersistenceTest(TestCase):
- def test_reuse_reconnect(self):
- # Should reuse or reconnect depending on header from server
- tests = (
- ('1.0', '', False),
- ('1.0', 'Connection: keep-alive\r\n', True),
- ('1.1', '', True),
- ('1.1', 'Connection: close\r\n', False),
- ('1.0', 'Connection: keep-ALIVE\r\n', True),
- ('1.1', 'Connection: cloSE\r\n', False),
- )
- for version, header, reuse in tests:
- with self.subTest(version=version, header=header):
- msg = (
- 'HTTP/{} 200 OK\r\n'
- '{}'
- 'Content-Length: 12\r\n'
- '\r\n'
- 'Dummy body\r\n'
- ).format(version, header)
- conn = FakeSocketHTTPConnection(msg)
- self.assertIsNone(conn.sock)
- conn.request('GET', '/open-connection')
- with conn.getresponse() as response:
- self.assertEqual(conn.sock is None, not reuse)
- response.read()
- self.assertEqual(conn.sock is None, not reuse)
- self.assertEqual(conn.connections, 1)
- conn.request('GET', '/subsequent-request')
- self.assertEqual(conn.connections, 1 if reuse else 2)
- def test_disconnected(self):
- def make_reset_reader(text):
- """Return BufferedReader that raises ECONNRESET at EOF"""
- stream = io.BytesIO(text)
- def readinto(buffer):
- size = io.BytesIO.readinto(stream, buffer)
- if size == 0:
- raise ConnectionResetError()
- return size
- stream.readinto = readinto
- return io.BufferedReader(stream)
- tests = (
- (io.BytesIO, client.RemoteDisconnected),
- (make_reset_reader, ConnectionResetError),
- )
- for stream_factory, exception in tests:
- with self.subTest(exception=exception):
- conn = FakeSocketHTTPConnection(b'', stream_factory)
- conn.request('GET', '/eof-response')
- self.assertRaises(exception, conn.getresponse)
- self.assertIsNone(conn.sock)
- # HTTPConnection.connect() should be automatically invoked
- conn.request('GET', '/reconnect')
- self.assertEqual(conn.connections, 2)
- def test_100_close(self):
- conn = FakeSocketHTTPConnection(
- b'HTTP/1.1 100 Continue\r\n'
- b'\r\n'
- # Missing final response
- )
- conn.request('GET', '/', headers={'Expect': '100-continue'})
- self.assertRaises(client.RemoteDisconnected, conn.getresponse)
- self.assertIsNone(conn.sock)
- conn.request('GET', '/reconnect')
- self.assertEqual(conn.connections, 2)
- class HTTPSTest(TestCase):
- def setUp(self):
- if not hasattr(client, 'HTTPSConnection'):
- self.skipTest('ssl support required')
- def make_server(self, certfile):
- from test.ssl_servers import make_https_server
- return make_https_server(self, certfile=certfile)
- def test_attributes(self):
- # simple test to check it's storing the timeout
- h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
- self.assertEqual(h.timeout, 30)
- def test_networked(self):
- # Default settings: requires a valid cert from a trusted CA
- import ssl
- support.requires('network')
- with socket_helper.transient_internet('self-signed.pythontest.net'):
- h = client.HTTPSConnection('self-signed.pythontest.net', 443)
- with self.assertRaises(ssl.SSLError) as exc_info:
- h.request('GET', '/')
- self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
- def test_networked_noverification(self):
- # Switch off cert verification
- import ssl
- support.requires('network')
- with socket_helper.transient_internet('self-signed.pythontest.net'):
- context = ssl._create_unverified_context()
- h = client.HTTPSConnection('self-signed.pythontest.net', 443,
- context=context)
- h.request('GET', '/')
- resp = h.getresponse()
- h.close()
- self.assertIn('nginx', resp.getheader('server'))
- resp.close()
- @support.system_must_validate_cert
- def test_networked_trusted_by_default_cert(self):
- # Default settings: requires a valid cert from a trusted CA
- support.requires('network')
- with socket_helper.transient_internet('www.python.org'):
- h = client.HTTPSConnection('www.python.org', 443)
- h.request('GET', '/')
- resp = h.getresponse()
- content_type = resp.getheader('content-type')
- resp.close()
- h.close()
- self.assertIn('text/html', content_type)
- def test_networked_good_cert(self):
- # We feed the server's cert as a validating cert
- import ssl
- support.requires('network')
- selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
- with socket_helper.transient_internet(selfsigned_pythontestdotnet):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
- self.assertEqual(context.check_hostname, True)
- context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
- try:
- h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
- context=context)
- h.request('GET', '/')
- resp = h.getresponse()
- except ssl.SSLError as ssl_err:
- ssl_err_str = str(ssl_err)
- # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
- # modern Linux distros (Debian Buster, etc) default OpenSSL
- # configurations it'll fail saying "key too weak" until we
- # address https://bugs.python.org/issue36816 to use a proper
- # key size on self-signed.pythontest.net.
- if re.search(r'(?i)key.too.weak', ssl_err_str):
- raise unittest.SkipTest(
- f'Got {ssl_err_str} trying to connect '
- f'to {selfsigned_pythontestdotnet}. '
- 'See https://bugs.python.org/issue36816.')
- raise
- server_string = resp.getheader('server')
- resp.close()
- h.close()
- self.assertIn('nginx', server_string)
- def test_networked_bad_cert(self):
- # We feed a "CA" cert that is unrelated to the server's cert
- import ssl
- support.requires('network')
- with socket_helper.transient_internet('self-signed.pythontest.net'):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.load_verify_locations(CERT_localhost)
- h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
- with self.assertRaises(ssl.SSLError) as exc_info:
- h.request('GET', '/')
- self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
- def test_local_unknown_cert(self):
- # The custom cert isn't known to the default trust bundle
- import ssl
- server = self.make_server(CERT_localhost)
- h = client.HTTPSConnection('localhost', server.port)
- with self.assertRaises(ssl.SSLError) as exc_info:
- h.request('GET', '/')
- self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
- def test_local_good_hostname(self):
- # The (valid) cert validates the HTTP hostname
- import ssl
- server = self.make_server(CERT_localhost)
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.load_verify_locations(CERT_localhost)
- h = client.HTTPSConnection('localhost', server.port, context=context)
- self.addCleanup(h.close)
- h.request('GET', '/nonexistent')
- resp = h.getresponse()
- self.addCleanup(resp.close)
- self.assertEqual(resp.status, 404)
- def test_local_bad_hostname(self):
- # The (valid) cert doesn't validate the HTTP hostname
- import ssl
- server = self.make_server(CERT_fakehostname)
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.load_verify_locations(CERT_fakehostname)
- h = client.HTTPSConnection('localhost', server.port, context=context)
- with self.assertRaises(ssl.CertificateError):
- h.request('GET', '/')
- # Same with explicit check_hostname=True
- with warnings_helper.check_warnings(('', DeprecationWarning)):
- h = client.HTTPSConnection('localhost', server.port,
- context=context, check_hostname=True)
- with self.assertRaises(ssl.CertificateError):
- h.request('GET', '/')
- # With check_hostname=False, the mismatching is ignored
- context.check_hostname = False
- with warnings_helper.check_warnings(('', DeprecationWarning)):
- h = client.HTTPSConnection('localhost', server.port,
- context=context, check_hostname=False)
- h.request('GET', '/nonexistent')
- resp = h.getresponse()
- resp.close()
- h.close()
- self.assertEqual(resp.status, 404)
- # The context's check_hostname setting is used if one isn't passed to
- # HTTPSConnection.
- context.check_hostname = False
- h = client.HTTPSConnection('localhost', server.port, context=context)
- h.request('GET', '/nonexistent')
- resp = h.getresponse()
- self.assertEqual(resp.status, 404)
- resp.close()
- h.close()
- # Passing check_hostname to HTTPSConnection should override the
- # context's setting.
- with warnings_helper.check_warnings(('', DeprecationWarning)):
- h = client.HTTPSConnection('localhost', server.port,
- context=context, check_hostname=True)
- with self.assertRaises(ssl.CertificateError):
- h.request('GET', '/')
- @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
- 'http.client.HTTPSConnection not available')
- def test_host_port(self):
- # Check invalid host_port
- for hp in ("www.python.org:abc", "user:password@www.python.org"):
- self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
- for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
- "fe80::207:e9ff:fe9b", 8000),
- ("www.python.org:443", "www.python.org", 443),
- ("www.python.org:", "www.python.org", 443),
- ("www.python.org", "www.python.org", 443),
- ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
- ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
- 443)):
- c = client.HTTPSConnection(hp)
- self.assertEqual(h, c.host)
- self.assertEqual(p, c.port)
- def test_tls13_pha(self):
- import ssl
- if not ssl.HAS_TLSv1_3:
- self.skipTest('TLS 1.3 support required')
- # just check status of PHA flag
- h = client.HTTPSConnection('localhost', 443)
- self.assertTrue(h._context.post_handshake_auth)
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- self.assertFalse(context.post_handshake_auth)
- h = client.HTTPSConnection('localhost', 443, context=context)
- self.assertIs(h._context, context)
- self.assertFalse(h._context.post_handshake_auth)
- with warnings.catch_warnings():
- warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
- DeprecationWarning)
- h = client.HTTPSConnection('localhost', 443, context=context,
- cert_file=CERT_localhost)
- self.assertTrue(h._context.post_handshake_auth)
- class RequestBodyTest(TestCase):
- """Test cases where a request includes a message body."""
- def setUp(self):
- self.conn = client.HTTPConnection('example.com')
- self.conn.sock = self.sock = FakeSocket("")
- self.conn.sock = self.sock
- def get_headers_and_fp(self):
- f = io.BytesIO(self.sock.data)
- f.readline() # read the request line
- message = client.parse_headers(f)
- return message, f
- def test_list_body(self):
- # Note that no content-length is automatically calculated for
- # an iterable. The request will fall back to send chunked
- # transfer encoding.
- cases = (
- ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
- ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
- )
- for body, expected in cases:
- with self.subTest(body):
- self.conn = client.HTTPConnection('example.com')
- self.conn.sock = self.sock = FakeSocket('')
- self.conn.request('PUT', '/url', body)
- msg, f = self.get_headers_and_fp()
- self.assertNotIn('Content-Type', msg)
- self.assertNotIn('Content-Length', msg)
- self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
- self.assertEqual(expected, f.read())
- def test_manual_content_length(self):
- # Set an incorrect content-length so that we can verify that
- # it will not be over-ridden by the library.
- self.conn.request("PUT", "/url", "body",
- {"Content-Length": "42"})
- message, f = self.get_headers_and_fp()
- self.assertEqual("42", message.get("content-length"))
- self.assertEqual(4, len(f.read()))
- def test_ascii_body(self):
- self.conn.request("PUT", "/url", "body")
- message, f = self.get_headers_and_fp()
- self.assertEqual("text/plain", message.get_content_type())
- self.assertIsNone(message.get_charset())
- self.assertEqual("4", message.get("content-length"))
- self.assertEqual(b'body', f.read())
- def test_latin1_body(self):
- self.conn.request("PUT", "/url", "body\xc1")
- message, f = self.get_headers_and_fp()
- self.assertEqual("text/plain", message.get_content_type())
- self.assertIsNone(message.get_charset())
- self.assertEqual("5", message.get("content-length"))
- self.assertEqual(b'body\xc1', f.read())
- def test_bytes_body(self):
- self.conn.request("PUT", "/url", b"body\xc1")
- message, f = self.get_headers_and_fp()
- self.assertEqual("text/plain", message.get_content_type())
- self.assertIsNone(message.get_charset())
- self.assertEqual("5", message.get("content-length"))
- self.assertEqual(b'body\xc1', f.read())
- def test_text_file_body(self):
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- with open(os_helper.TESTFN, "w", encoding="utf-8") as f:
- f.write("body")
- with open(os_helper.TESTFN, encoding="utf-8") as f:
- self.conn.request("PUT", "/url", f)
- message, f = self.get_headers_and_fp()
- self.assertEqual("text/plain", message.get_content_type())
- self.assertIsNone(message.get_charset())
- # No content-length will be determined for files; the body
- # will be sent using chunked transfer encoding instead.
- self.assertIsNone(message.get("content-length"))
- self.assertEqual("chunked", message.get("transfer-encoding"))
- self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
- def test_binary_file_body(self):
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- with open(os_helper.TESTFN, "wb") as f:
- f.write(b"body\xc1")
- with open(os_helper.TESTFN, "rb") as f:
- self.conn.request("PUT", "/url", f)
- message, f = self.get_headers_and_fp()
- self.assertEqual("text/plain", message.get_content_type())
- self.assertIsNone(message.get_charset())
- self.assertEqual("chunked", message.get("Transfer-Encoding"))
- self.assertNotIn("Content-Length", message)
- self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
- class HTTPResponseTest(TestCase):
- def setUp(self):
- body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
- second-value\r\n\r\nText"
- sock = FakeSocket(body)
- self.resp = client.HTTPResponse(sock)
- self.resp.begin()
- def test_getting_header(self):
- header = self.resp.getheader('My-Header')
- self.assertEqual(header, 'first-value, second-value')
- header = self.resp.getheader('My-Header', 'some default')
- self.assertEqual(header, 'first-value, second-value')
- def test_getting_nonexistent_header_with_string_default(self):
- header = self.resp.getheader('No-Such-Header', 'default-value')
- self.assertEqual(header, 'default-value')
- def test_getting_nonexistent_header_with_iterable_default(self):
- header = self.resp.getheader('No-Such-Header', ['default', 'values'])
- self.assertEqual(header, 'default, values')
- header = self.resp.getheader('No-Such-Header', ('default', 'values'))
- self.assertEqual(header, 'default, values')
- def test_getting_nonexistent_header_without_default(self):
- header = self.resp.getheader('No-Such-Header')
- self.assertEqual(header, None)
- def test_getting_header_defaultint(self):
- header = self.resp.getheader('No-Such-Header',default=42)
- self.assertEqual(header, 42)
- class TunnelTests(TestCase):
- def setUp(self):
- response_text = (
- 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
- 'HTTP/1.1 200 OK\r\n' # Reply to HEAD
- 'Content-Length: 42\r\n\r\n'
- )
- self.host = 'proxy.com'
- self.conn = client.HTTPConnection(self.host)
- self.conn._create_connection = self._create_connection(response_text)
- def tearDown(self):
- self.conn.close()
- def _create_connection(self, response_text):
- def create_connection(address, timeout=None, source_address=None):
- return FakeSocket(response_text, host=address[0], port=address[1])
- return create_connection
- def test_set_tunnel_host_port_headers(self):
- tunnel_host = 'destination.com'
- tunnel_port = 8888
- tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
- self.conn.set_tunnel(tunnel_host, port=tunnel_port,
- headers=tunnel_headers)
- self.conn.request('HEAD', '/', '')
- self.assertEqual(self.conn.sock.host, self.host)
- self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
- self.assertEqual(self.conn._tunnel_host, tunnel_host)
- self.assertEqual(self.conn._tunnel_port, tunnel_port)
- self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
- def test_disallow_set_tunnel_after_connect(self):
- # Once connected, we shouldn't be able to tunnel anymore
- self.conn.connect()
- self.assertRaises(RuntimeError, self.conn.set_tunnel,
- 'destination.com')
- def test_connect_with_tunnel(self):
- self.conn.set_tunnel('destination.com')
- self.conn.request('HEAD', '/', '')
- self.assertEqual(self.conn.sock.host, self.host)
- self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
- self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
- # issue22095
- self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
- self.assertIn(b'Host: destination.com', self.conn.sock.data)
- # This test should be removed when CONNECT gets the HTTP/1.1 blessing
- self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
- def test_tunnel_connect_single_send_connection_setup(self):
- """Regresstion test for https://bugs.python.org/issue43332."""
- with mock.patch.object(self.conn, 'send') as mock_send:
- self.conn.set_tunnel('destination.com')
- self.conn.connect()
- self.conn.request('GET', '/')
- mock_send.assert_called()
- # Likely 2, but this test only cares about the first.
- self.assertGreater(
- len(mock_send.mock_calls), 1,
- msg=f'unexpected number of send calls: {mock_send.mock_calls}')
- proxy_setup_data_sent = mock_send.mock_calls[0][1][0]
- self.assertIn(b'CONNECT destination.com', proxy_setup_data_sent)
- self.assertTrue(
- proxy_setup_data_sent.endswith(b'\r\n\r\n'),
- msg=f'unexpected proxy data sent {proxy_setup_data_sent!r}')
- def test_connect_put_request(self):
- self.conn.set_tunnel('destination.com')
- self.conn.request('PUT', '/', '')
- self.assertEqual(self.conn.sock.host, self.host)
- self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
- self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
- self.assertIn(b'Host: destination.com', self.conn.sock.data)
- def test_tunnel_debuglog(self):
- expected_header = 'X-Dummy: 1'
- response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
- self.conn.set_debuglevel(1)
- self.conn._create_connection = self._create_connection(response_text)
- self.conn.set_tunnel('destination.com')
- with support.captured_stdout() as output:
- self.conn.request('PUT', '/', '')
- lines = output.getvalue().splitlines()
- self.assertIn('header: {}'.format(expected_header), lines)
- if __name__ == '__main__':
- unittest.main(verbosity=2)
|