test_wsgiref.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. from unittest import mock
  2. from test import support
  3. from test.support import socket_helper
  4. from test.test_httpservers import NoLogRequestHandler
  5. from unittest import TestCase
  6. from wsgiref.util import setup_testing_defaults
  7. from wsgiref.headers import Headers
  8. from wsgiref.handlers import BaseHandler, BaseCGIHandler, SimpleHandler
  9. from wsgiref import util
  10. from wsgiref.validate import validator
  11. from wsgiref.simple_server import WSGIServer, WSGIRequestHandler
  12. from wsgiref.simple_server import make_server
  13. from http.client import HTTPConnection
  14. from io import StringIO, BytesIO, BufferedReader
  15. from socketserver import BaseServer
  16. from platform import python_implementation
  17. import os
  18. import re
  19. import signal
  20. import sys
  21. import threading
  22. import unittest
  23. class MockServer(WSGIServer):
  24. """Non-socket HTTP server"""
  25. def __init__(self, server_address, RequestHandlerClass):
  26. BaseServer.__init__(self, server_address, RequestHandlerClass)
  27. self.server_bind()
  28. def server_bind(self):
  29. host, port = self.server_address
  30. self.server_name = host
  31. self.server_port = port
  32. self.setup_environ()
  33. class MockHandler(WSGIRequestHandler):
  34. """Non-socket HTTP handler"""
  35. def setup(self):
  36. self.connection = self.request
  37. self.rfile, self.wfile = self.connection
  38. def finish(self):
  39. pass
  40. def hello_app(environ,start_response):
  41. start_response("200 OK", [
  42. ('Content-Type','text/plain'),
  43. ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
  44. ])
  45. return [b"Hello, world!"]
  46. def header_app(environ, start_response):
  47. start_response("200 OK", [
  48. ('Content-Type', 'text/plain'),
  49. ('Date', 'Mon, 05 Jun 2006 18:49:54 GMT')
  50. ])
  51. return [';'.join([
  52. environ['HTTP_X_TEST_HEADER'], environ['QUERY_STRING'],
  53. environ['PATH_INFO']
  54. ]).encode('iso-8859-1')]
  55. def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
  56. server = make_server("", 80, app, MockServer, MockHandler)
  57. inp = BufferedReader(BytesIO(data))
  58. out = BytesIO()
  59. olderr = sys.stderr
  60. err = sys.stderr = StringIO()
  61. try:
  62. server.finish_request((inp, out), ("127.0.0.1",8888))
  63. finally:
  64. sys.stderr = olderr
  65. return out.getvalue(), err.getvalue()
  66. def compare_generic_iter(make_it, match):
  67. """Utility to compare a generic iterator with an iterable
  68. This tests the iterator using iter()/next().
  69. 'make_it' must be a function returning a fresh
  70. iterator to be tested (since this may test the iterator twice)."""
  71. it = make_it()
  72. if not iter(it) is it:
  73. raise AssertionError
  74. for item in match:
  75. if not next(it) == item:
  76. raise AssertionError
  77. try:
  78. next(it)
  79. except StopIteration:
  80. pass
  81. else:
  82. raise AssertionError("Too many items from .__next__()", it)
  83. class IntegrationTests(TestCase):
  84. def check_hello(self, out, has_length=True):
  85. pyver = (python_implementation() + "/" +
  86. sys.version.split()[0])
  87. self.assertEqual(out,
  88. ("HTTP/1.0 200 OK\r\n"
  89. "Server: WSGIServer/0.2 " + pyver +"\r\n"
  90. "Content-Type: text/plain\r\n"
  91. "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
  92. (has_length and "Content-Length: 13\r\n" or "") +
  93. "\r\n"
  94. "Hello, world!").encode("iso-8859-1")
  95. )
  96. def test_plain_hello(self):
  97. out, err = run_amock()
  98. self.check_hello(out)
  99. def test_environ(self):
  100. request = (
  101. b"GET /p%61th/?query=test HTTP/1.0\n"
  102. b"X-Test-Header: Python test \n"
  103. b"X-Test-Header: Python test 2\n"
  104. b"Content-Length: 0\n\n"
  105. )
  106. out, err = run_amock(header_app, request)
  107. self.assertEqual(
  108. out.splitlines()[-1],
  109. b"Python test,Python test 2;query=test;/path/"
  110. )
  111. def test_request_length(self):
  112. out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n")
  113. self.assertEqual(out.splitlines()[0],
  114. b"HTTP/1.0 414 Request-URI Too Long")
  115. def test_validated_hello(self):
  116. out, err = run_amock(validator(hello_app))
  117. # the middleware doesn't support len(), so content-length isn't there
  118. self.check_hello(out, has_length=False)
  119. def test_simple_validation_error(self):
  120. def bad_app(environ,start_response):
  121. start_response("200 OK", ('Content-Type','text/plain'))
  122. return ["Hello, world!"]
  123. out, err = run_amock(validator(bad_app))
  124. self.assertTrue(out.endswith(
  125. b"A server error occurred. Please contact the administrator."
  126. ))
  127. self.assertEqual(
  128. err.splitlines()[-2],
  129. "AssertionError: Headers (('Content-Type', 'text/plain')) must"
  130. " be of type list: <class 'tuple'>"
  131. )
  132. def test_status_validation_errors(self):
  133. def create_bad_app(status):
  134. def bad_app(environ, start_response):
  135. start_response(status, [("Content-Type", "text/plain; charset=utf-8")])
  136. return [b"Hello, world!"]
  137. return bad_app
  138. tests = [
  139. ('200', 'AssertionError: Status must be at least 4 characters'),
  140. ('20X OK', 'AssertionError: Status message must begin w/3-digit code'),
  141. ('200OK', 'AssertionError: Status message must have a space after code'),
  142. ]
  143. for status, exc_message in tests:
  144. with self.subTest(status=status):
  145. out, err = run_amock(create_bad_app(status))
  146. self.assertTrue(out.endswith(
  147. b"A server error occurred. Please contact the administrator."
  148. ))
  149. self.assertEqual(err.splitlines()[-2], exc_message)
  150. def test_wsgi_input(self):
  151. def bad_app(e,s):
  152. e["wsgi.input"].read()
  153. s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
  154. return [b"data"]
  155. out, err = run_amock(validator(bad_app))
  156. self.assertTrue(out.endswith(
  157. b"A server error occurred. Please contact the administrator."
  158. ))
  159. self.assertEqual(
  160. err.splitlines()[-2], "AssertionError"
  161. )
  162. def test_bytes_validation(self):
  163. def app(e, s):
  164. s("200 OK", [
  165. ("Content-Type", "text/plain; charset=utf-8"),
  166. ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
  167. ])
  168. return [b"data"]
  169. out, err = run_amock(validator(app))
  170. self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n'))
  171. ver = sys.version.split()[0].encode('ascii')
  172. py = python_implementation().encode('ascii')
  173. pyver = py + b"/" + ver
  174. self.assertEqual(
  175. b"HTTP/1.0 200 OK\r\n"
  176. b"Server: WSGIServer/0.2 "+ pyver + b"\r\n"
  177. b"Content-Type: text/plain; charset=utf-8\r\n"
  178. b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n"
  179. b"\r\n"
  180. b"data",
  181. out)
  182. def test_cp1252_url(self):
  183. def app(e, s):
  184. s("200 OK", [
  185. ("Content-Type", "text/plain"),
  186. ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
  187. ])
  188. # PEP3333 says environ variables are decoded as latin1.
  189. # Encode as latin1 to get original bytes
  190. return [e["PATH_INFO"].encode("latin1")]
  191. out, err = run_amock(
  192. validator(app), data=b"GET /\x80%80 HTTP/1.0")
  193. self.assertEqual(
  194. [
  195. b"HTTP/1.0 200 OK",
  196. mock.ANY,
  197. b"Content-Type: text/plain",
  198. b"Date: Wed, 24 Dec 2008 13:29:32 GMT",
  199. b"",
  200. b"/\x80\x80",
  201. ],
  202. out.splitlines())
  203. def test_interrupted_write(self):
  204. # BaseHandler._write() and _flush() have to write all data, even if
  205. # it takes multiple send() calls. Test this by interrupting a send()
  206. # call with a Unix signal.
  207. pthread_kill = support.get_attribute(signal, "pthread_kill")
  208. def app(environ, start_response):
  209. start_response("200 OK", [])
  210. return [b'\0' * support.SOCK_MAX_SIZE]
  211. class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
  212. pass
  213. server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler)
  214. self.addCleanup(server.server_close)
  215. interrupted = threading.Event()
  216. def signal_handler(signum, frame):
  217. interrupted.set()
  218. original = signal.signal(signal.SIGUSR1, signal_handler)
  219. self.addCleanup(signal.signal, signal.SIGUSR1, original)
  220. received = None
  221. main_thread = threading.get_ident()
  222. def run_client():
  223. http = HTTPConnection(*server.server_address)
  224. http.request("GET", "/")
  225. with http.getresponse() as response:
  226. response.read(100)
  227. # The main thread should now be blocking in a send() system
  228. # call. But in theory, it could get interrupted by other
  229. # signals, and then retried. So keep sending the signal in a
  230. # loop, in case an earlier signal happens to be delivered at
  231. # an inconvenient moment.
  232. while True:
  233. pthread_kill(main_thread, signal.SIGUSR1)
  234. if interrupted.wait(timeout=float(1)):
  235. break
  236. nonlocal received
  237. received = len(response.read())
  238. http.close()
  239. background = threading.Thread(target=run_client)
  240. background.start()
  241. server.handle_request()
  242. background.join()
  243. self.assertEqual(received, support.SOCK_MAX_SIZE - 100)
  244. class UtilityTests(TestCase):
  245. def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
  246. env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
  247. util.setup_testing_defaults(env)
  248. self.assertEqual(util.shift_path_info(env),part)
  249. self.assertEqual(env['PATH_INFO'],pi_out)
  250. self.assertEqual(env['SCRIPT_NAME'],sn_out)
  251. return env
  252. def checkDefault(self, key, value, alt=None):
  253. # Check defaulting when empty
  254. env = {}
  255. util.setup_testing_defaults(env)
  256. if isinstance(value, StringIO):
  257. self.assertIsInstance(env[key], StringIO)
  258. elif isinstance(value,BytesIO):
  259. self.assertIsInstance(env[key],BytesIO)
  260. else:
  261. self.assertEqual(env[key], value)
  262. # Check existing value
  263. env = {key:alt}
  264. util.setup_testing_defaults(env)
  265. self.assertIs(env[key], alt)
  266. def checkCrossDefault(self,key,value,**kw):
  267. util.setup_testing_defaults(kw)
  268. self.assertEqual(kw[key],value)
  269. def checkAppURI(self,uri,**kw):
  270. util.setup_testing_defaults(kw)
  271. self.assertEqual(util.application_uri(kw),uri)
  272. def checkReqURI(self,uri,query=1,**kw):
  273. util.setup_testing_defaults(kw)
  274. self.assertEqual(util.request_uri(kw,query),uri)
  275. def checkFW(self,text,size,match):
  276. def make_it(text=text,size=size):
  277. return util.FileWrapper(StringIO(text),size)
  278. compare_generic_iter(make_it,match)
  279. it = make_it()
  280. self.assertFalse(it.filelike.closed)
  281. for item in it:
  282. pass
  283. self.assertFalse(it.filelike.closed)
  284. it.close()
  285. self.assertTrue(it.filelike.closed)
  286. def testSimpleShifts(self):
  287. self.checkShift('','/', '', '/', '')
  288. self.checkShift('','/x', 'x', '/x', '')
  289. self.checkShift('/','', None, '/', '')
  290. self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
  291. self.checkShift('/a','/x/', 'x', '/a/x', '/')
  292. def testNormalizedShifts(self):
  293. self.checkShift('/a/b', '/../y', '..', '/a', '/y')
  294. self.checkShift('', '/../y', '..', '', '/y')
  295. self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
  296. self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
  297. self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
  298. self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
  299. self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
  300. self.checkShift('/a/b', '///', '', '/a/b/', '')
  301. self.checkShift('/a/b', '/.//', '', '/a/b/', '')
  302. self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
  303. self.checkShift('/a/b', '/.', None, '/a/b', '')
  304. def testDefaults(self):
  305. for key, value in [
  306. ('SERVER_NAME','127.0.0.1'),
  307. ('SERVER_PORT', '80'),
  308. ('SERVER_PROTOCOL','HTTP/1.0'),
  309. ('HTTP_HOST','127.0.0.1'),
  310. ('REQUEST_METHOD','GET'),
  311. ('SCRIPT_NAME',''),
  312. ('PATH_INFO','/'),
  313. ('wsgi.version', (1,0)),
  314. ('wsgi.run_once', 0),
  315. ('wsgi.multithread', 0),
  316. ('wsgi.multiprocess', 0),
  317. ('wsgi.input', BytesIO()),
  318. ('wsgi.errors', StringIO()),
  319. ('wsgi.url_scheme','http'),
  320. ]:
  321. self.checkDefault(key,value)
  322. def testCrossDefaults(self):
  323. self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
  324. self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
  325. self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
  326. self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
  327. self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
  328. self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
  329. self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
  330. def testGuessScheme(self):
  331. self.assertEqual(util.guess_scheme({}), "http")
  332. self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
  333. self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
  334. self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
  335. self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
  336. def testAppURIs(self):
  337. self.checkAppURI("http://127.0.0.1/")
  338. self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
  339. self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
  340. self.checkAppURI("http://spam.example.com:2071/",
  341. HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
  342. self.checkAppURI("http://spam.example.com/",
  343. SERVER_NAME="spam.example.com")
  344. self.checkAppURI("http://127.0.0.1/",
  345. HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
  346. self.checkAppURI("https://127.0.0.1/", HTTPS="on")
  347. self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
  348. HTTP_HOST=None)
  349. def testReqURIs(self):
  350. self.checkReqURI("http://127.0.0.1/")
  351. self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
  352. self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
  353. self.checkReqURI("http://127.0.0.1/spammity/spam",
  354. SCRIPT_NAME="/spammity", PATH_INFO="/spam")
  355. self.checkReqURI("http://127.0.0.1/spammity/sp%E4m",
  356. SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m")
  357. self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
  358. SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham")
  359. self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
  360. SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678")
  361. self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
  362. SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
  363. self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni",
  364. SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni")
  365. self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
  366. SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
  367. def testFileWrapper(self):
  368. self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
  369. def testHopByHop(self):
  370. for hop in (
  371. "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
  372. "TE Trailers Transfer-Encoding Upgrade"
  373. ).split():
  374. for alt in hop, hop.title(), hop.upper(), hop.lower():
  375. self.assertTrue(util.is_hop_by_hop(alt))
  376. # Not comprehensive, just a few random header names
  377. for hop in (
  378. "Accept Cache-Control Date Pragma Trailer Via Warning"
  379. ).split():
  380. for alt in hop, hop.title(), hop.upper(), hop.lower():
  381. self.assertFalse(util.is_hop_by_hop(alt))
  382. class HeaderTests(TestCase):
  383. def testMappingInterface(self):
  384. test = [('x','y')]
  385. self.assertEqual(len(Headers()), 0)
  386. self.assertEqual(len(Headers([])),0)
  387. self.assertEqual(len(Headers(test[:])),1)
  388. self.assertEqual(Headers(test[:]).keys(), ['x'])
  389. self.assertEqual(Headers(test[:]).values(), ['y'])
  390. self.assertEqual(Headers(test[:]).items(), test)
  391. self.assertIsNot(Headers(test).items(), test) # must be copy!
  392. h = Headers()
  393. del h['foo'] # should not raise an error
  394. h['Foo'] = 'bar'
  395. for m in h.__contains__, h.get, h.get_all, h.__getitem__:
  396. self.assertTrue(m('foo'))
  397. self.assertTrue(m('Foo'))
  398. self.assertTrue(m('FOO'))
  399. self.assertFalse(m('bar'))
  400. self.assertEqual(h['foo'],'bar')
  401. h['foo'] = 'baz'
  402. self.assertEqual(h['FOO'],'baz')
  403. self.assertEqual(h.get_all('foo'),['baz'])
  404. self.assertEqual(h.get("foo","whee"), "baz")
  405. self.assertEqual(h.get("zoo","whee"), "whee")
  406. self.assertEqual(h.setdefault("foo","whee"), "baz")
  407. self.assertEqual(h.setdefault("zoo","whee"), "whee")
  408. self.assertEqual(h["foo"],"baz")
  409. self.assertEqual(h["zoo"],"whee")
  410. def testRequireList(self):
  411. self.assertRaises(TypeError, Headers, "foo")
  412. def testExtras(self):
  413. h = Headers()
  414. self.assertEqual(str(h),'\r\n')
  415. h.add_header('foo','bar',baz="spam")
  416. self.assertEqual(h['foo'], 'bar; baz="spam"')
  417. self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
  418. h.add_header('Foo','bar',cheese=None)
  419. self.assertEqual(h.get_all('foo'),
  420. ['bar; baz="spam"', 'bar; cheese'])
  421. self.assertEqual(str(h),
  422. 'foo: bar; baz="spam"\r\n'
  423. 'Foo: bar; cheese\r\n'
  424. '\r\n'
  425. )
  426. class ErrorHandler(BaseCGIHandler):
  427. """Simple handler subclass for testing BaseHandler"""
  428. # BaseHandler records the OS environment at import time, but envvars
  429. # might have been changed later by other tests, which trips up
  430. # HandlerTests.testEnviron().
  431. os_environ = dict(os.environ.items())
  432. def __init__(self,**kw):
  433. setup_testing_defaults(kw)
  434. BaseCGIHandler.__init__(
  435. self, BytesIO(), BytesIO(), StringIO(), kw,
  436. multithread=True, multiprocess=True
  437. )
  438. class TestHandler(ErrorHandler):
  439. """Simple handler subclass for testing BaseHandler, w/error passthru"""
  440. def handle_error(self):
  441. raise # for testing, we want to see what's happening
  442. class HandlerTests(TestCase):
  443. # testEnviron() can produce long error message
  444. maxDiff = 80 * 50
  445. def testEnviron(self):
  446. os_environ = {
  447. # very basic environment
  448. 'HOME': '/my/home',
  449. 'PATH': '/my/path',
  450. 'LANG': 'fr_FR.UTF-8',
  451. # set some WSGI variables
  452. 'SCRIPT_NAME': 'test_script_name',
  453. 'SERVER_NAME': 'test_server_name',
  454. }
  455. with support.swap_attr(TestHandler, 'os_environ', os_environ):
  456. # override X and HOME variables
  457. handler = TestHandler(X="Y", HOME="/override/home")
  458. handler.setup_environ()
  459. # Check that wsgi_xxx attributes are copied to wsgi.xxx variables
  460. # of handler.environ
  461. for attr in ('version', 'multithread', 'multiprocess', 'run_once',
  462. 'file_wrapper'):
  463. self.assertEqual(getattr(handler, 'wsgi_' + attr),
  464. handler.environ['wsgi.' + attr])
  465. # Test handler.environ as a dict
  466. expected = {}
  467. setup_testing_defaults(expected)
  468. # Handler inherits os_environ variables which are not overridden
  469. # by SimpleHandler.add_cgi_vars() (SimpleHandler.base_env)
  470. for key, value in os_environ.items():
  471. if key not in expected:
  472. expected[key] = value
  473. expected.update({
  474. # X doesn't exist in os_environ
  475. "X": "Y",
  476. # HOME is overridden by TestHandler
  477. 'HOME': "/override/home",
  478. # overridden by setup_testing_defaults()
  479. "SCRIPT_NAME": "",
  480. "SERVER_NAME": "127.0.0.1",
  481. # set by BaseHandler.setup_environ()
  482. 'wsgi.input': handler.get_stdin(),
  483. 'wsgi.errors': handler.get_stderr(),
  484. 'wsgi.version': (1, 0),
  485. 'wsgi.run_once': False,
  486. 'wsgi.url_scheme': 'http',
  487. 'wsgi.multithread': True,
  488. 'wsgi.multiprocess': True,
  489. 'wsgi.file_wrapper': util.FileWrapper,
  490. })
  491. self.assertDictEqual(handler.environ, expected)
  492. def testCGIEnviron(self):
  493. h = BaseCGIHandler(None,None,None,{})
  494. h.setup_environ()
  495. for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
  496. self.assertIn(key, h.environ)
  497. def testScheme(self):
  498. h=TestHandler(HTTPS="on"); h.setup_environ()
  499. self.assertEqual(h.environ['wsgi.url_scheme'],'https')
  500. h=TestHandler(); h.setup_environ()
  501. self.assertEqual(h.environ['wsgi.url_scheme'],'http')
  502. def testAbstractMethods(self):
  503. h = BaseHandler()
  504. for name in [
  505. '_flush','get_stdin','get_stderr','add_cgi_vars'
  506. ]:
  507. self.assertRaises(NotImplementedError, getattr(h,name))
  508. self.assertRaises(NotImplementedError, h._write, "test")
  509. def testContentLength(self):
  510. # Demo one reason iteration is better than write()... ;)
  511. def trivial_app1(e,s):
  512. s('200 OK',[])
  513. return [e['wsgi.url_scheme'].encode('iso-8859-1')]
  514. def trivial_app2(e,s):
  515. s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1'))
  516. return []
  517. def trivial_app3(e,s):
  518. s('200 OK',[])
  519. return ['\u0442\u0435\u0441\u0442'.encode("utf-8")]
  520. def trivial_app4(e,s):
  521. # Simulate a response to a HEAD request
  522. s('200 OK',[('Content-Length', '12345')])
  523. return []
  524. h = TestHandler()
  525. h.run(trivial_app1)
  526. self.assertEqual(h.stdout.getvalue(),
  527. ("Status: 200 OK\r\n"
  528. "Content-Length: 4\r\n"
  529. "\r\n"
  530. "http").encode("iso-8859-1"))
  531. h = TestHandler()
  532. h.run(trivial_app2)
  533. self.assertEqual(h.stdout.getvalue(),
  534. ("Status: 200 OK\r\n"
  535. "\r\n"
  536. "http").encode("iso-8859-1"))
  537. h = TestHandler()
  538. h.run(trivial_app3)
  539. self.assertEqual(h.stdout.getvalue(),
  540. b'Status: 200 OK\r\n'
  541. b'Content-Length: 8\r\n'
  542. b'\r\n'
  543. b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82')
  544. h = TestHandler()
  545. h.run(trivial_app4)
  546. self.assertEqual(h.stdout.getvalue(),
  547. b'Status: 200 OK\r\n'
  548. b'Content-Length: 12345\r\n'
  549. b'\r\n')
  550. def testBasicErrorOutput(self):
  551. def non_error_app(e,s):
  552. s('200 OK',[])
  553. return []
  554. def error_app(e,s):
  555. raise AssertionError("This should be caught by handler")
  556. h = ErrorHandler()
  557. h.run(non_error_app)
  558. self.assertEqual(h.stdout.getvalue(),
  559. ("Status: 200 OK\r\n"
  560. "Content-Length: 0\r\n"
  561. "\r\n").encode("iso-8859-1"))
  562. self.assertEqual(h.stderr.getvalue(),"")
  563. h = ErrorHandler()
  564. h.run(error_app)
  565. self.assertEqual(h.stdout.getvalue(),
  566. ("Status: %s\r\n"
  567. "Content-Type: text/plain\r\n"
  568. "Content-Length: %d\r\n"
  569. "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1')
  570. + h.error_body)
  571. self.assertIn("AssertionError", h.stderr.getvalue())
  572. def testErrorAfterOutput(self):
  573. MSG = b"Some output has been sent"
  574. def error_app(e,s):
  575. s("200 OK",[])(MSG)
  576. raise AssertionError("This should be caught by handler")
  577. h = ErrorHandler()
  578. h.run(error_app)
  579. self.assertEqual(h.stdout.getvalue(),
  580. ("Status: 200 OK\r\n"
  581. "\r\n".encode("iso-8859-1")+MSG))
  582. self.assertIn("AssertionError", h.stderr.getvalue())
  583. def testHeaderFormats(self):
  584. def non_error_app(e,s):
  585. s('200 OK',[])
  586. return []
  587. stdpat = (
  588. r"HTTP/%s 200 OK\r\n"
  589. r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
  590. r"%s" r"Content-Length: 0\r\n" r"\r\n"
  591. )
  592. shortpat = (
  593. "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
  594. ).encode("iso-8859-1")
  595. for ssw in "FooBar/1.0", None:
  596. sw = ssw and "Server: %s\r\n" % ssw or ""
  597. for version in "1.0", "1.1":
  598. for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
  599. h = TestHandler(SERVER_PROTOCOL=proto)
  600. h.origin_server = False
  601. h.http_version = version
  602. h.server_software = ssw
  603. h.run(non_error_app)
  604. self.assertEqual(shortpat,h.stdout.getvalue())
  605. h = TestHandler(SERVER_PROTOCOL=proto)
  606. h.origin_server = True
  607. h.http_version = version
  608. h.server_software = ssw
  609. h.run(non_error_app)
  610. if proto=="HTTP/0.9":
  611. self.assertEqual(h.stdout.getvalue(),b"")
  612. else:
  613. self.assertTrue(
  614. re.match((stdpat%(version,sw)).encode("iso-8859-1"),
  615. h.stdout.getvalue()),
  616. ((stdpat%(version,sw)).encode("iso-8859-1"),
  617. h.stdout.getvalue())
  618. )
  619. def testBytesData(self):
  620. def app(e, s):
  621. s("200 OK", [
  622. ("Content-Type", "text/plain; charset=utf-8"),
  623. ])
  624. return [b"data"]
  625. h = TestHandler()
  626. h.run(app)
  627. self.assertEqual(b"Status: 200 OK\r\n"
  628. b"Content-Type: text/plain; charset=utf-8\r\n"
  629. b"Content-Length: 4\r\n"
  630. b"\r\n"
  631. b"data",
  632. h.stdout.getvalue())
  633. def testCloseOnError(self):
  634. side_effects = {'close_called': False}
  635. MSG = b"Some output has been sent"
  636. def error_app(e,s):
  637. s("200 OK",[])(MSG)
  638. class CrashyIterable(object):
  639. def __iter__(self):
  640. while True:
  641. yield b'blah'
  642. raise AssertionError("This should be caught by handler")
  643. def close(self):
  644. side_effects['close_called'] = True
  645. return CrashyIterable()
  646. h = ErrorHandler()
  647. h.run(error_app)
  648. self.assertEqual(side_effects['close_called'], True)
  649. def testPartialWrite(self):
  650. written = bytearray()
  651. class PartialWriter:
  652. def write(self, b):
  653. partial = b[:7]
  654. written.extend(partial)
  655. return len(partial)
  656. def flush(self):
  657. pass
  658. environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
  659. h = SimpleHandler(BytesIO(), PartialWriter(), sys.stderr, environ)
  660. msg = "should not do partial writes"
  661. with self.assertWarnsRegex(DeprecationWarning, msg):
  662. h.run(hello_app)
  663. self.assertEqual(b"HTTP/1.0 200 OK\r\n"
  664. b"Content-Type: text/plain\r\n"
  665. b"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n"
  666. b"Content-Length: 13\r\n"
  667. b"\r\n"
  668. b"Hello, world!",
  669. written)
  670. def testClientConnectionTerminations(self):
  671. environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
  672. for exception in (
  673. ConnectionAbortedError,
  674. BrokenPipeError,
  675. ConnectionResetError,
  676. ):
  677. with self.subTest(exception=exception):
  678. class AbortingWriter:
  679. def write(self, b):
  680. raise exception
  681. stderr = StringIO()
  682. h = SimpleHandler(BytesIO(), AbortingWriter(), stderr, environ)
  683. h.run(hello_app)
  684. self.assertFalse(stderr.getvalue())
  685. def testDontResetInternalStateOnException(self):
  686. class CustomException(ValueError):
  687. pass
  688. # We are raising CustomException here to trigger an exception
  689. # during the execution of SimpleHandler.finish_response(), so
  690. # we can easily test that the internal state of the handler is
  691. # preserved in case of an exception.
  692. class AbortingWriter:
  693. def write(self, b):
  694. raise CustomException
  695. stderr = StringIO()
  696. environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
  697. h = SimpleHandler(BytesIO(), AbortingWriter(), stderr, environ)
  698. h.run(hello_app)
  699. self.assertIn("CustomException", stderr.getvalue())
  700. # Test that the internal state of the handler is preserved.
  701. self.assertIsNotNone(h.result)
  702. self.assertIsNotNone(h.headers)
  703. self.assertIsNotNone(h.status)
  704. self.assertIsNotNone(h.environ)
  705. if __name__ == "__main__":
  706. unittest.main()