test_urllib2net.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. import errno
  2. import unittest
  3. from test import support
  4. from test.support import os_helper
  5. from test.support import socket_helper
  6. from test.support import ResourceDenied
  7. from test.test_urllib2 import sanepathname2url
  8. import os
  9. import socket
  10. import urllib.error
  11. import urllib.request
  12. import sys
  13. support.requires("network")
  14. def _retry_thrice(func, exc, *args, **kwargs):
  15. for i in range(3):
  16. try:
  17. return func(*args, **kwargs)
  18. except exc as e:
  19. last_exc = e
  20. continue
  21. raise last_exc
  22. def _wrap_with_retry_thrice(func, exc):
  23. def wrapped(*args, **kwargs):
  24. return _retry_thrice(func, exc, *args, **kwargs)
  25. return wrapped
  26. # Connecting to remote hosts is flaky. Make it more robust by retrying
  27. # the connection several times.
  28. _urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
  29. urllib.error.URLError)
  30. class TransientResource(object):
  31. """Raise ResourceDenied if an exception is raised while the context manager
  32. is in effect that matches the specified exception and attributes."""
  33. def __init__(self, exc, **kwargs):
  34. self.exc = exc
  35. self.attrs = kwargs
  36. def __enter__(self):
  37. return self
  38. def __exit__(self, type_=None, value=None, traceback=None):
  39. """If type_ is a subclass of self.exc and value has attributes matching
  40. self.attrs, raise ResourceDenied. Otherwise let the exception
  41. propagate (if any)."""
  42. if type_ is not None and issubclass(self.exc, type_):
  43. for attr, attr_value in self.attrs.items():
  44. if not hasattr(value, attr):
  45. break
  46. if getattr(value, attr) != attr_value:
  47. break
  48. else:
  49. raise ResourceDenied("an optional resource is not available")
  50. # Context managers that raise ResourceDenied when various issues
  51. # with the internet connection manifest themselves as exceptions.
  52. # XXX deprecate these and use transient_internet() instead
  53. time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
  54. socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
  55. ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
  56. class AuthTests(unittest.TestCase):
  57. """Tests urllib2 authentication features."""
  58. ## Disabled at the moment since there is no page under python.org which
  59. ## could be used to HTTP authentication.
  60. #
  61. # def test_basic_auth(self):
  62. # import http.client
  63. #
  64. # test_url = "http://www.python.org/test/test_urllib2/basic_auth"
  65. # test_hostport = "www.python.org"
  66. # test_realm = 'Test Realm'
  67. # test_user = 'test.test_urllib2net'
  68. # test_password = 'blah'
  69. #
  70. # # failure
  71. # try:
  72. # _urlopen_with_retry(test_url)
  73. # except urllib2.HTTPError, exc:
  74. # self.assertEqual(exc.code, 401)
  75. # else:
  76. # self.fail("urlopen() should have failed with 401")
  77. #
  78. # # success
  79. # auth_handler = urllib2.HTTPBasicAuthHandler()
  80. # auth_handler.add_password(test_realm, test_hostport,
  81. # test_user, test_password)
  82. # opener = urllib2.build_opener(auth_handler)
  83. # f = opener.open('http://localhost/')
  84. # response = _urlopen_with_retry("http://www.python.org/")
  85. #
  86. # # The 'userinfo' URL component is deprecated by RFC 3986 for security
  87. # # reasons, let's not implement it! (it's already implemented for proxy
  88. # # specification strings (that is, URLs or authorities specifying a
  89. # # proxy), so we must keep that)
  90. # self.assertRaises(http.client.InvalidURL,
  91. # urllib2.urlopen, "http://evil:thing@example.com")
  92. class CloseSocketTest(unittest.TestCase):
  93. def test_close(self):
  94. # clear _opener global variable
  95. self.addCleanup(urllib.request.urlcleanup)
  96. # calling .close() on urllib2's response objects should close the
  97. # underlying socket
  98. url = support.TEST_HTTP_URL
  99. with socket_helper.transient_internet(url):
  100. response = _urlopen_with_retry(url)
  101. sock = response.fp
  102. self.assertFalse(sock.closed)
  103. response.close()
  104. self.assertTrue(sock.closed)
  105. class OtherNetworkTests(unittest.TestCase):
  106. def setUp(self):
  107. if 0: # for debugging
  108. import logging
  109. logger = logging.getLogger("test_urllib2net")
  110. logger.addHandler(logging.StreamHandler())
  111. # XXX The rest of these tests aren't very good -- they don't check much.
  112. # They do sometimes catch some major disasters, though.
  113. def test_ftp(self):
  114. urls = [
  115. 'ftp://www.pythontest.net/README',
  116. ('ftp://www.pythontest.net/non-existent-file',
  117. None, urllib.error.URLError),
  118. ]
  119. self._test_urls(urls, self._extra_handlers())
  120. def test_file(self):
  121. TESTFN = os_helper.TESTFN
  122. f = open(TESTFN, 'w')
  123. try:
  124. f.write('hi there\n')
  125. f.close()
  126. urls = [
  127. 'file:' + sanepathname2url(os.path.abspath(TESTFN)),
  128. ('file:///nonsensename/etc/passwd', None,
  129. urllib.error.URLError),
  130. ]
  131. self._test_urls(urls, self._extra_handlers(), retry=True)
  132. finally:
  133. os.remove(TESTFN)
  134. self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
  135. # XXX Following test depends on machine configurations that are internal
  136. # to CNRI. Need to set up a public server with the right authentication
  137. # configuration for test purposes.
  138. ## def test_cnri(self):
  139. ## if socket.gethostname() == 'bitdiddle':
  140. ## localhost = 'bitdiddle.cnri.reston.va.us'
  141. ## elif socket.gethostname() == 'bitdiddle.concentric.net':
  142. ## localhost = 'localhost'
  143. ## else:
  144. ## localhost = None
  145. ## if localhost is not None:
  146. ## urls = [
  147. ## 'file://%s/etc/passwd' % localhost,
  148. ## 'http://%s/simple/' % localhost,
  149. ## 'http://%s/digest/' % localhost,
  150. ## 'http://%s/not/found.h' % localhost,
  151. ## ]
  152. ## bauth = HTTPBasicAuthHandler()
  153. ## bauth.add_password('basic_test_realm', localhost, 'jhylton',
  154. ## 'password')
  155. ## dauth = HTTPDigestAuthHandler()
  156. ## dauth.add_password('digest_test_realm', localhost, 'jhylton',
  157. ## 'password')
  158. ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
  159. def test_urlwithfrag(self):
  160. urlwith_frag = "http://www.pythontest.net/index.html#frag"
  161. with socket_helper.transient_internet(urlwith_frag):
  162. req = urllib.request.Request(urlwith_frag)
  163. res = urllib.request.urlopen(req)
  164. self.assertEqual(res.geturl(),
  165. "http://www.pythontest.net/index.html#frag")
  166. def test_redirect_url_withfrag(self):
  167. redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
  168. with socket_helper.transient_internet(redirect_url_with_frag):
  169. req = urllib.request.Request(redirect_url_with_frag)
  170. res = urllib.request.urlopen(req)
  171. self.assertEqual(res.geturl(),
  172. "http://www.pythontest.net/elsewhere/#frag")
  173. def test_custom_headers(self):
  174. url = support.TEST_HTTP_URL
  175. with socket_helper.transient_internet(url):
  176. opener = urllib.request.build_opener()
  177. request = urllib.request.Request(url)
  178. self.assertFalse(request.header_items())
  179. opener.open(request)
  180. self.assertTrue(request.header_items())
  181. self.assertTrue(request.has_header('User-agent'))
  182. request.add_header('User-Agent','Test-Agent')
  183. opener.open(request)
  184. self.assertEqual(request.get_header('User-agent'),'Test-Agent')
  185. @unittest.skip('XXX: http://www.imdb.com is gone')
  186. def test_sites_no_connection_close(self):
  187. # Some sites do not send Connection: close header.
  188. # Verify that those work properly. (#issue12576)
  189. URL = 'http://www.imdb.com' # mangles Connection:close
  190. with socket_helper.transient_internet(URL):
  191. try:
  192. with urllib.request.urlopen(URL) as res:
  193. pass
  194. except ValueError:
  195. self.fail("urlopen failed for site not sending \
  196. Connection:close")
  197. else:
  198. self.assertTrue(res)
  199. req = urllib.request.urlopen(URL)
  200. res = req.read()
  201. self.assertTrue(res)
  202. def _test_urls(self, urls, handlers, retry=True):
  203. import time
  204. import logging
  205. debug = logging.getLogger("test_urllib2").debug
  206. urlopen = urllib.request.build_opener(*handlers).open
  207. if retry:
  208. urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
  209. for url in urls:
  210. with self.subTest(url=url):
  211. if isinstance(url, tuple):
  212. url, req, expected_err = url
  213. else:
  214. req = expected_err = None
  215. with socket_helper.transient_internet(url):
  216. try:
  217. f = urlopen(url, req, support.INTERNET_TIMEOUT)
  218. # urllib.error.URLError is a subclass of OSError
  219. except OSError as err:
  220. if expected_err:
  221. msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
  222. (expected_err, url, req, type(err), err))
  223. self.assertIsInstance(err, expected_err, msg)
  224. else:
  225. raise
  226. else:
  227. try:
  228. with time_out, \
  229. socket_peer_reset, \
  230. ioerror_peer_reset:
  231. buf = f.read()
  232. debug("read %d bytes" % len(buf))
  233. except TimeoutError:
  234. print("<timeout: %s>" % url, file=sys.stderr)
  235. f.close()
  236. time.sleep(0.1)
  237. def _extra_handlers(self):
  238. handlers = []
  239. cfh = urllib.request.CacheFTPHandler()
  240. self.addCleanup(cfh.clear_cache)
  241. cfh.setTimeout(1)
  242. handlers.append(cfh)
  243. return handlers
  244. class TimeoutTest(unittest.TestCase):
  245. def setUp(self):
  246. # clear _opener global variable
  247. self.addCleanup(urllib.request.urlcleanup)
  248. def test_http_basic(self):
  249. self.assertIsNone(socket.getdefaulttimeout())
  250. url = support.TEST_HTTP_URL
  251. with socket_helper.transient_internet(url, timeout=None):
  252. u = _urlopen_with_retry(url)
  253. self.addCleanup(u.close)
  254. self.assertIsNone(u.fp.raw._sock.gettimeout())
  255. def test_http_default_timeout(self):
  256. self.assertIsNone(socket.getdefaulttimeout())
  257. url = support.TEST_HTTP_URL
  258. with socket_helper.transient_internet(url):
  259. socket.setdefaulttimeout(60)
  260. try:
  261. u = _urlopen_with_retry(url)
  262. self.addCleanup(u.close)
  263. finally:
  264. socket.setdefaulttimeout(None)
  265. self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
  266. def test_http_no_timeout(self):
  267. self.assertIsNone(socket.getdefaulttimeout())
  268. url = support.TEST_HTTP_URL
  269. with socket_helper.transient_internet(url):
  270. socket.setdefaulttimeout(60)
  271. try:
  272. u = _urlopen_with_retry(url, timeout=None)
  273. self.addCleanup(u.close)
  274. finally:
  275. socket.setdefaulttimeout(None)
  276. self.assertIsNone(u.fp.raw._sock.gettimeout())
  277. def test_http_timeout(self):
  278. url = support.TEST_HTTP_URL
  279. with socket_helper.transient_internet(url):
  280. u = _urlopen_with_retry(url, timeout=120)
  281. self.addCleanup(u.close)
  282. self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
  283. FTP_HOST = 'ftp://www.pythontest.net/'
  284. def test_ftp_basic(self):
  285. self.assertIsNone(socket.getdefaulttimeout())
  286. with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
  287. u = _urlopen_with_retry(self.FTP_HOST)
  288. self.addCleanup(u.close)
  289. self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
  290. def test_ftp_default_timeout(self):
  291. self.assertIsNone(socket.getdefaulttimeout())
  292. with socket_helper.transient_internet(self.FTP_HOST):
  293. socket.setdefaulttimeout(60)
  294. try:
  295. u = _urlopen_with_retry(self.FTP_HOST)
  296. self.addCleanup(u.close)
  297. finally:
  298. socket.setdefaulttimeout(None)
  299. self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
  300. def test_ftp_no_timeout(self):
  301. self.assertIsNone(socket.getdefaulttimeout())
  302. with socket_helper.transient_internet(self.FTP_HOST):
  303. socket.setdefaulttimeout(60)
  304. try:
  305. u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
  306. self.addCleanup(u.close)
  307. finally:
  308. socket.setdefaulttimeout(None)
  309. self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
  310. def test_ftp_timeout(self):
  311. with socket_helper.transient_internet(self.FTP_HOST):
  312. u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
  313. self.addCleanup(u.close)
  314. self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
  315. if __name__ == "__main__":
  316. unittest.main()