mock_socket.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. """Mock socket module used by the smtpd and smtplib tests.
  2. """
  3. # imported for _GLOBAL_DEFAULT_TIMEOUT
  4. import socket as socket_module
  5. # Mock socket module
  6. _defaulttimeout = None
  7. _reply_data = None
  8. # This is used to queue up data to be read through socket.makefile, typically
  9. # *before* the socket object is even created. It is intended to handle a single
  10. # line which the socket will feed on recv() or makefile().
  11. def reply_with(line):
  12. global _reply_data
  13. _reply_data = line
  14. class MockFile:
  15. """Mock file object returned by MockSocket.makefile().
  16. """
  17. def __init__(self, lines):
  18. self.lines = lines
  19. def readline(self, limit=-1):
  20. result = self.lines.pop(0) + b'\r\n'
  21. if limit >= 0:
  22. # Re-insert the line, removing the \r\n we added.
  23. self.lines.insert(0, result[limit:-2])
  24. result = result[:limit]
  25. return result
  26. def close(self):
  27. pass
  28. class MockSocket:
  29. """Mock socket object used by smtpd and smtplib tests.
  30. """
  31. def __init__(self, family=None):
  32. global _reply_data
  33. self.family = family
  34. self.output = []
  35. self.lines = []
  36. if _reply_data:
  37. self.lines.append(_reply_data)
  38. _reply_data = None
  39. self.conn = None
  40. self.timeout = None
  41. def queue_recv(self, line):
  42. self.lines.append(line)
  43. def recv(self, bufsize, flags=None):
  44. data = self.lines.pop(0) + b'\r\n'
  45. return data
  46. def fileno(self):
  47. return 0
  48. def settimeout(self, timeout):
  49. if timeout is None:
  50. self.timeout = _defaulttimeout
  51. else:
  52. self.timeout = timeout
  53. def gettimeout(self):
  54. return self.timeout
  55. def setsockopt(self, level, optname, value):
  56. pass
  57. def getsockopt(self, level, optname, buflen=None):
  58. return 0
  59. def bind(self, address):
  60. pass
  61. def accept(self):
  62. self.conn = MockSocket()
  63. return self.conn, 'c'
  64. def getsockname(self):
  65. return ('0.0.0.0', 0)
  66. def setblocking(self, flag):
  67. pass
  68. def listen(self, backlog):
  69. pass
  70. def makefile(self, mode='r', bufsize=-1):
  71. handle = MockFile(self.lines)
  72. return handle
  73. def sendall(self, data, flags=None):
  74. self.last = data
  75. self.output.append(data)
  76. return len(data)
  77. def send(self, data, flags=None):
  78. self.last = data
  79. self.output.append(data)
  80. return len(data)
  81. def getpeername(self):
  82. return ('peer-address', 'peer-port')
  83. def close(self):
  84. pass
  85. def connect(self, host):
  86. pass
  87. def socket(family=None, type=None, proto=None):
  88. return MockSocket(family)
  89. def create_connection(address, timeout=socket_module._GLOBAL_DEFAULT_TIMEOUT,
  90. source_address=None):
  91. try:
  92. int_port = int(address[1])
  93. except ValueError:
  94. raise error
  95. ms = MockSocket()
  96. if timeout is socket_module._GLOBAL_DEFAULT_TIMEOUT:
  97. timeout = getdefaulttimeout()
  98. ms.settimeout(timeout)
  99. return ms
  100. def setdefaulttimeout(timeout):
  101. global _defaulttimeout
  102. _defaulttimeout = timeout
  103. def getdefaulttimeout():
  104. return _defaulttimeout
  105. def getfqdn():
  106. return ""
  107. def gethostname():
  108. pass
  109. def gethostbyname(name):
  110. return ""
  111. def getaddrinfo(*args, **kw):
  112. return socket_module.getaddrinfo(*args, **kw)
  113. gaierror = socket_module.gaierror
  114. error = socket_module.error
  115. # Constants
  116. _GLOBAL_DEFAULT_TIMEOUT = socket_module._GLOBAL_DEFAULT_TIMEOUT
  117. AF_INET = socket_module.AF_INET
  118. AF_INET6 = socket_module.AF_INET6
  119. SOCK_STREAM = socket_module.SOCK_STREAM
  120. SOL_SOCKET = None
  121. SO_REUSEADDR = None
  122. if hasattr(socket_module, 'AF_UNIX'):
  123. AF_UNIX = socket_module.AF_UNIX