test_urllibnet.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import unittest
  2. from test import support
  3. from test.support import os_helper
  4. from test.support import socket_helper
  5. import contextlib
  6. import socket
  7. import urllib.parse
  8. import urllib.request
  9. import os
  10. import email.message
  11. import time
  12. support.requires('network')
  13. class URLTimeoutTest(unittest.TestCase):
  14. # XXX this test doesn't seem to test anything useful.
  15. def setUp(self):
  16. socket.setdefaulttimeout(support.INTERNET_TIMEOUT)
  17. def tearDown(self):
  18. socket.setdefaulttimeout(None)
  19. def testURLread(self):
  20. # clear _opener global variable
  21. self.addCleanup(urllib.request.urlcleanup)
  22. domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
  23. with socket_helper.transient_internet(domain):
  24. f = urllib.request.urlopen(support.TEST_HTTP_URL)
  25. f.read()
  26. class urlopenNetworkTests(unittest.TestCase):
  27. """Tests urllib.request.urlopen using the network.
  28. These tests are not exhaustive. Assuming that testing using files does a
  29. good job overall of some of the basic interface features. There are no
  30. tests exercising the optional 'data' and 'proxies' arguments. No tests
  31. for transparent redirection have been written.
  32. setUp is not used for always constructing a connection to
  33. http://www.pythontest.net/ since there a few tests that don't use that address
  34. and making a connection is expensive enough to warrant minimizing unneeded
  35. connections.
  36. """
  37. url = 'http://www.pythontest.net/'
  38. def setUp(self):
  39. # clear _opener global variable
  40. self.addCleanup(urllib.request.urlcleanup)
  41. @contextlib.contextmanager
  42. def urlopen(self, *args, **kwargs):
  43. resource = args[0]
  44. with socket_helper.transient_internet(resource):
  45. r = urllib.request.urlopen(*args, **kwargs)
  46. try:
  47. yield r
  48. finally:
  49. r.close()
  50. def test_basic(self):
  51. # Simple test expected to pass.
  52. with self.urlopen(self.url) as open_url:
  53. for attr in ("read", "readline", "readlines", "fileno", "close",
  54. "info", "geturl"):
  55. self.assertTrue(hasattr(open_url, attr), "object returned from "
  56. "urlopen lacks the %s attribute" % attr)
  57. self.assertTrue(open_url.read(), "calling 'read' failed")
  58. def test_readlines(self):
  59. # Test both readline and readlines.
  60. with self.urlopen(self.url) as open_url:
  61. self.assertIsInstance(open_url.readline(), bytes,
  62. "readline did not return a string")
  63. self.assertIsInstance(open_url.readlines(), list,
  64. "readlines did not return a list")
  65. def test_info(self):
  66. # Test 'info'.
  67. with self.urlopen(self.url) as open_url:
  68. info_obj = open_url.info()
  69. self.assertIsInstance(info_obj, email.message.Message,
  70. "object returned by 'info' is not an "
  71. "instance of email.message.Message")
  72. self.assertEqual(info_obj.get_content_subtype(), "html")
  73. def test_geturl(self):
  74. # Make sure same URL as opened is returned by geturl.
  75. with self.urlopen(self.url) as open_url:
  76. gotten_url = open_url.geturl()
  77. self.assertEqual(gotten_url, self.url)
  78. def test_getcode(self):
  79. # test getcode() with the fancy opener to get 404 error codes
  80. URL = self.url + "XXXinvalidXXX"
  81. with socket_helper.transient_internet(URL):
  82. with self.assertWarns(DeprecationWarning):
  83. open_url = urllib.request.FancyURLopener().open(URL)
  84. try:
  85. code = open_url.getcode()
  86. finally:
  87. open_url.close()
  88. self.assertEqual(code, 404)
  89. def test_bad_address(self):
  90. # Make sure proper exception is raised when connecting to a bogus
  91. # address.
  92. # Given that both VeriSign and various ISPs have in
  93. # the past or are presently hijacking various invalid
  94. # domain name requests in an attempt to boost traffic
  95. # to their own sites, finding a domain name to use
  96. # for this test is difficult. RFC2606 leads one to
  97. # believe that '.invalid' should work, but experience
  98. # seemed to indicate otherwise. Single character
  99. # TLDs are likely to remain invalid, so this seems to
  100. # be the best choice. The trailing '.' prevents a
  101. # related problem: The normal DNS resolver appends
  102. # the domain names from the search path if there is
  103. # no '.' the end and, and if one of those domains
  104. # implements a '*' rule a result is returned.
  105. # However, none of this will prevent the test from
  106. # failing if the ISP hijacks all invalid domain
  107. # requests. The real solution would be to be able to
  108. # parameterize the framework with a mock resolver.
  109. bogus_domain = "sadflkjsasf.i.nvali.d."
  110. try:
  111. socket.gethostbyname(bogus_domain)
  112. except OSError:
  113. # socket.gaierror is too narrow, since getaddrinfo() may also
  114. # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
  115. # i.e. Python's TimeoutError.
  116. pass
  117. else:
  118. # This happens with some overzealous DNS providers such as OpenDNS
  119. self.skipTest("%r should not resolve for test to work" % bogus_domain)
  120. failure_explanation = ('opening an invalid URL did not raise OSError; '
  121. 'can be caused by a broken DNS server '
  122. '(e.g. returns 404 or hijacks page)')
  123. with self.assertRaises(OSError, msg=failure_explanation):
  124. urllib.request.urlopen("http://{}/".format(bogus_domain))
  125. class urlretrieveNetworkTests(unittest.TestCase):
  126. """Tests urllib.request.urlretrieve using the network."""
  127. def setUp(self):
  128. # remove temporary files created by urlretrieve()
  129. self.addCleanup(urllib.request.urlcleanup)
  130. @contextlib.contextmanager
  131. def urlretrieve(self, *args, **kwargs):
  132. resource = args[0]
  133. with socket_helper.transient_internet(resource):
  134. file_location, info = urllib.request.urlretrieve(*args, **kwargs)
  135. try:
  136. yield file_location, info
  137. finally:
  138. os_helper.unlink(file_location)
  139. def test_basic(self):
  140. # Test basic functionality.
  141. with self.urlretrieve(self.logo) as (file_location, info):
  142. self.assertTrue(os.path.exists(file_location), "file location returned by"
  143. " urlretrieve is not a valid path")
  144. with open(file_location, 'rb') as f:
  145. self.assertTrue(f.read(), "reading from the file location returned"
  146. " by urlretrieve failed")
  147. def test_specified_path(self):
  148. # Make sure that specifying the location of the file to write to works.
  149. with self.urlretrieve(self.logo,
  150. os_helper.TESTFN) as (file_location, info):
  151. self.assertEqual(file_location, os_helper.TESTFN)
  152. self.assertTrue(os.path.exists(file_location))
  153. with open(file_location, 'rb') as f:
  154. self.assertTrue(f.read(), "reading from temporary file failed")
  155. def test_header(self):
  156. # Make sure header returned as 2nd value from urlretrieve is good.
  157. with self.urlretrieve(self.logo) as (file_location, info):
  158. self.assertIsInstance(info, email.message.Message,
  159. "info is not an instance of email.message.Message")
  160. logo = "http://www.pythontest.net/"
  161. def test_data_header(self):
  162. with self.urlretrieve(self.logo) as (file_location, fileheaders):
  163. datevalue = fileheaders.get('Date')
  164. dateformat = '%a, %d %b %Y %H:%M:%S GMT'
  165. try:
  166. time.strptime(datevalue, dateformat)
  167. except ValueError:
  168. self.fail('Date value not in %r format' % dateformat)
  169. def test_reporthook(self):
  170. records = []
  171. def recording_reporthook(blocks, block_size, total_size):
  172. records.append((blocks, block_size, total_size))
  173. with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
  174. file_location, fileheaders):
  175. expected_size = int(fileheaders['Content-Length'])
  176. records_repr = repr(records) # For use in error messages.
  177. self.assertGreater(len(records), 1, msg="There should always be two "
  178. "calls; the first one before the transfer starts.")
  179. self.assertEqual(records[0][0], 0)
  180. self.assertGreater(records[0][1], 0,
  181. msg="block size can't be 0 in %s" % records_repr)
  182. self.assertEqual(records[0][2], expected_size)
  183. self.assertEqual(records[-1][2], expected_size)
  184. block_sizes = {block_size for _, block_size, _ in records}
  185. self.assertEqual({records[0][1]}, block_sizes,
  186. msg="block sizes in %s must be equal" % records_repr)
  187. self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
  188. msg="number of blocks * block size must be"
  189. " >= total size in %s" % records_repr)
  190. if __name__ == "__main__":
  191. unittest.main()