| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- import errno
- import unittest
- from test import support
- from test.support import os_helper
- from test.support import socket_helper
- from test.support import ResourceDenied
- from test.test_urllib2 import sanepathname2url
- import os
- import socket
- import urllib.error
- import urllib.request
- import sys
- support.requires("network")
- def _retry_thrice(func, exc, *args, **kwargs):
- for i in range(3):
- try:
- return func(*args, **kwargs)
- except exc as e:
- last_exc = e
- continue
- raise last_exc
- def _wrap_with_retry_thrice(func, exc):
- def wrapped(*args, **kwargs):
- return _retry_thrice(func, exc, *args, **kwargs)
- return wrapped
- # Connecting to remote hosts is flaky. Make it more robust by retrying
- # the connection several times.
- _urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
- urllib.error.URLError)
- class TransientResource(object):
- """Raise ResourceDenied if an exception is raised while the context manager
- is in effect that matches the specified exception and attributes."""
- def __init__(self, exc, **kwargs):
- self.exc = exc
- self.attrs = kwargs
- def __enter__(self):
- return self
- def __exit__(self, type_=None, value=None, traceback=None):
- """If type_ is a subclass of self.exc and value has attributes matching
- self.attrs, raise ResourceDenied. Otherwise let the exception
- propagate (if any)."""
- if type_ is not None and issubclass(self.exc, type_):
- for attr, attr_value in self.attrs.items():
- if not hasattr(value, attr):
- break
- if getattr(value, attr) != attr_value:
- break
- else:
- raise ResourceDenied("an optional resource is not available")
- # Context managers that raise ResourceDenied when various issues
- # with the internet connection manifest themselves as exceptions.
- # XXX deprecate these and use transient_internet() instead
- time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
- socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
- ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
- class AuthTests(unittest.TestCase):
- """Tests urllib2 authentication features."""
- ## Disabled at the moment since there is no page under python.org which
- ## could be used to HTTP authentication.
- #
- # def test_basic_auth(self):
- # import http.client
- #
- # test_url = "http://www.python.org/test/test_urllib2/basic_auth"
- # test_hostport = "www.python.org"
- # test_realm = 'Test Realm'
- # test_user = 'test.test_urllib2net'
- # test_password = 'blah'
- #
- # # failure
- # try:
- # _urlopen_with_retry(test_url)
- # except urllib2.HTTPError, exc:
- # self.assertEqual(exc.code, 401)
- # else:
- # self.fail("urlopen() should have failed with 401")
- #
- # # success
- # auth_handler = urllib2.HTTPBasicAuthHandler()
- # auth_handler.add_password(test_realm, test_hostport,
- # test_user, test_password)
- # opener = urllib2.build_opener(auth_handler)
- # f = opener.open('http://localhost/')
- # response = _urlopen_with_retry("http://www.python.org/")
- #
- # # The 'userinfo' URL component is deprecated by RFC 3986 for security
- # # reasons, let's not implement it! (it's already implemented for proxy
- # # specification strings (that is, URLs or authorities specifying a
- # # proxy), so we must keep that)
- # self.assertRaises(http.client.InvalidURL,
- # urllib2.urlopen, "http://evil:thing@example.com")
- class CloseSocketTest(unittest.TestCase):
- def test_close(self):
- # clear _opener global variable
- self.addCleanup(urllib.request.urlcleanup)
- # calling .close() on urllib2's response objects should close the
- # underlying socket
- url = support.TEST_HTTP_URL
- with socket_helper.transient_internet(url):
- response = _urlopen_with_retry(url)
- sock = response.fp
- self.assertFalse(sock.closed)
- response.close()
- self.assertTrue(sock.closed)
- class OtherNetworkTests(unittest.TestCase):
- def setUp(self):
- if 0: # for debugging
- import logging
- logger = logging.getLogger("test_urllib2net")
- logger.addHandler(logging.StreamHandler())
- # XXX The rest of these tests aren't very good -- they don't check much.
- # They do sometimes catch some major disasters, though.
- def test_ftp(self):
- urls = [
- 'ftp://www.pythontest.net/README',
- ('ftp://www.pythontest.net/non-existent-file',
- None, urllib.error.URLError),
- ]
- self._test_urls(urls, self._extra_handlers())
- def test_file(self):
- TESTFN = os_helper.TESTFN
- f = open(TESTFN, 'w')
- try:
- f.write('hi there\n')
- f.close()
- urls = [
- 'file:' + sanepathname2url(os.path.abspath(TESTFN)),
- ('file:///nonsensename/etc/passwd', None,
- urllib.error.URLError),
- ]
- self._test_urls(urls, self._extra_handlers(), retry=True)
- finally:
- os.remove(TESTFN)
- self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
- # XXX Following test depends on machine configurations that are internal
- # to CNRI. Need to set up a public server with the right authentication
- # configuration for test purposes.
- ## def test_cnri(self):
- ## if socket.gethostname() == 'bitdiddle':
- ## localhost = 'bitdiddle.cnri.reston.va.us'
- ## elif socket.gethostname() == 'bitdiddle.concentric.net':
- ## localhost = 'localhost'
- ## else:
- ## localhost = None
- ## if localhost is not None:
- ## urls = [
- ## 'file://%s/etc/passwd' % localhost,
- ## 'http://%s/simple/' % localhost,
- ## 'http://%s/digest/' % localhost,
- ## 'http://%s/not/found.h' % localhost,
- ## ]
- ## bauth = HTTPBasicAuthHandler()
- ## bauth.add_password('basic_test_realm', localhost, 'jhylton',
- ## 'password')
- ## dauth = HTTPDigestAuthHandler()
- ## dauth.add_password('digest_test_realm', localhost, 'jhylton',
- ## 'password')
- ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
- def test_urlwithfrag(self):
- urlwith_frag = "http://www.pythontest.net/index.html#frag"
- with socket_helper.transient_internet(urlwith_frag):
- req = urllib.request.Request(urlwith_frag)
- res = urllib.request.urlopen(req)
- self.assertEqual(res.geturl(),
- "http://www.pythontest.net/index.html#frag")
- def test_redirect_url_withfrag(self):
- redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
- with socket_helper.transient_internet(redirect_url_with_frag):
- req = urllib.request.Request(redirect_url_with_frag)
- res = urllib.request.urlopen(req)
- self.assertEqual(res.geturl(),
- "http://www.pythontest.net/elsewhere/#frag")
- def test_custom_headers(self):
- url = support.TEST_HTTP_URL
- with socket_helper.transient_internet(url):
- opener = urllib.request.build_opener()
- request = urllib.request.Request(url)
- self.assertFalse(request.header_items())
- opener.open(request)
- self.assertTrue(request.header_items())
- self.assertTrue(request.has_header('User-agent'))
- request.add_header('User-Agent','Test-Agent')
- opener.open(request)
- self.assertEqual(request.get_header('User-agent'),'Test-Agent')
- @unittest.skip('XXX: http://www.imdb.com is gone')
- def test_sites_no_connection_close(self):
- # Some sites do not send Connection: close header.
- # Verify that those work properly. (#issue12576)
- URL = 'http://www.imdb.com' # mangles Connection:close
- with socket_helper.transient_internet(URL):
- try:
- with urllib.request.urlopen(URL) as res:
- pass
- except ValueError:
- self.fail("urlopen failed for site not sending \
- Connection:close")
- else:
- self.assertTrue(res)
- req = urllib.request.urlopen(URL)
- res = req.read()
- self.assertTrue(res)
- def _test_urls(self, urls, handlers, retry=True):
- import time
- import logging
- debug = logging.getLogger("test_urllib2").debug
- urlopen = urllib.request.build_opener(*handlers).open
- if retry:
- urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
- for url in urls:
- with self.subTest(url=url):
- if isinstance(url, tuple):
- url, req, expected_err = url
- else:
- req = expected_err = None
- with socket_helper.transient_internet(url):
- try:
- f = urlopen(url, req, support.INTERNET_TIMEOUT)
- # urllib.error.URLError is a subclass of OSError
- except OSError as err:
- if expected_err:
- msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
- (expected_err, url, req, type(err), err))
- self.assertIsInstance(err, expected_err, msg)
- else:
- raise
- else:
- try:
- with time_out, \
- socket_peer_reset, \
- ioerror_peer_reset:
- buf = f.read()
- debug("read %d bytes" % len(buf))
- except TimeoutError:
- print("<timeout: %s>" % url, file=sys.stderr)
- f.close()
- time.sleep(0.1)
- def _extra_handlers(self):
- handlers = []
- cfh = urllib.request.CacheFTPHandler()
- self.addCleanup(cfh.clear_cache)
- cfh.setTimeout(1)
- handlers.append(cfh)
- return handlers
- class TimeoutTest(unittest.TestCase):
- def setUp(self):
- # clear _opener global variable
- self.addCleanup(urllib.request.urlcleanup)
- def test_http_basic(self):
- self.assertIsNone(socket.getdefaulttimeout())
- url = support.TEST_HTTP_URL
- with socket_helper.transient_internet(url, timeout=None):
- u = _urlopen_with_retry(url)
- self.addCleanup(u.close)
- self.assertIsNone(u.fp.raw._sock.gettimeout())
- def test_http_default_timeout(self):
- self.assertIsNone(socket.getdefaulttimeout())
- url = support.TEST_HTTP_URL
- with socket_helper.transient_internet(url):
- socket.setdefaulttimeout(60)
- try:
- u = _urlopen_with_retry(url)
- self.addCleanup(u.close)
- finally:
- socket.setdefaulttimeout(None)
- self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
- def test_http_no_timeout(self):
- self.assertIsNone(socket.getdefaulttimeout())
- url = support.TEST_HTTP_URL
- with socket_helper.transient_internet(url):
- socket.setdefaulttimeout(60)
- try:
- u = _urlopen_with_retry(url, timeout=None)
- self.addCleanup(u.close)
- finally:
- socket.setdefaulttimeout(None)
- self.assertIsNone(u.fp.raw._sock.gettimeout())
- def test_http_timeout(self):
- url = support.TEST_HTTP_URL
- with socket_helper.transient_internet(url):
- u = _urlopen_with_retry(url, timeout=120)
- self.addCleanup(u.close)
- self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
- FTP_HOST = 'ftp://www.pythontest.net/'
- def test_ftp_basic(self):
- self.assertIsNone(socket.getdefaulttimeout())
- with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
- u = _urlopen_with_retry(self.FTP_HOST)
- self.addCleanup(u.close)
- self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
- def test_ftp_default_timeout(self):
- self.assertIsNone(socket.getdefaulttimeout())
- with socket_helper.transient_internet(self.FTP_HOST):
- socket.setdefaulttimeout(60)
- try:
- u = _urlopen_with_retry(self.FTP_HOST)
- self.addCleanup(u.close)
- finally:
- socket.setdefaulttimeout(None)
- self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
- def test_ftp_no_timeout(self):
- self.assertIsNone(socket.getdefaulttimeout())
- with socket_helper.transient_internet(self.FTP_HOST):
- socket.setdefaulttimeout(60)
- try:
- u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
- self.addCleanup(u.close)
- finally:
- socket.setdefaulttimeout(None)
- self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
- def test_ftp_timeout(self):
- with socket_helper.transient_internet(self.FTP_HOST):
- u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
- self.addCleanup(u.close)
- self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
- if __name__ == "__main__":
- unittest.main()
|