httpproxy.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. #!/usr/bin/env python
  2. # Copyright 2010 Google Inc. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import BaseHTTPServer
  16. import certutils
  17. import collections
  18. import errno
  19. import logging
  20. import socket
  21. import SocketServer
  22. import ssl
  23. import sys
  24. import time
  25. import urlparse
  26. import daemonserver
  27. import httparchive
  28. import platformsettings
  29. import proxyshaper
  30. import sslproxy
  31. def _HandleSSLCertificateError():
  32. """
  33. This method is intended to be called from
  34. BaseHTTPServer.HTTPServer.handle_error().
  35. """
  36. exc_type, exc_value, exc_traceback = sys.exc_info()
  37. if isinstance(exc_value, ssl.SSLError):
  38. return
  39. raise
  40. class HttpProxyError(Exception):
  41. """Module catch-all error."""
  42. pass
  43. class HttpProxyServerError(HttpProxyError):
  44. """Raised for errors like 'Address already in use'."""
  45. pass
  46. class HttpArchiveHandler(BaseHTTPServer.BaseHTTPRequestHandler):
  47. protocol_version = 'HTTP/1.1' # override BaseHTTPServer setting
  48. # Since we do lots of small wfile.write() calls, turn on buffering.
  49. wbufsize = -1 # override StreamRequestHandler (a base class) setting
  50. def setup(self):
  51. """Override StreamRequestHandler method."""
  52. BaseHTTPServer.BaseHTTPRequestHandler.setup(self)
  53. if self.server.traffic_shaping_up_bps:
  54. self.rfile = proxyshaper.RateLimitedFile(
  55. self.server.get_active_request_count, self.rfile,
  56. self.server.traffic_shaping_up_bps)
  57. if self.server.traffic_shaping_down_bps:
  58. self.wfile = proxyshaper.RateLimitedFile(
  59. self.server.get_active_request_count, self.wfile,
  60. self.server.traffic_shaping_down_bps)
  61. # Make request handler logging match our logging format.
  62. def log_request(self, code='-', size='-'):
  63. pass
  64. def log_error(self, format, *args): # pylint:disable=redefined-builtin
  65. logging.error(format, *args)
  66. def log_message(self, format, *args): # pylint:disable=redefined-builtin
  67. logging.info(format, *args)
  68. def read_request_body(self):
  69. request_body = None
  70. length = int(self.headers.get('content-length', 0)) or None
  71. if length:
  72. request_body = self.rfile.read(length)
  73. return request_body
  74. def get_header_dict(self):
  75. return dict(self.headers.items())
  76. def get_archived_http_request(self):
  77. host = self.headers.get('host')
  78. if host is None:
  79. logging.error('Request without host header')
  80. return None
  81. parsed = urlparse.urlparse(self.path)
  82. params = ';%s' % parsed.params if parsed.params else ''
  83. query = '?%s' % parsed.query if parsed.query else ''
  84. fragment = '#%s' % parsed.fragment if parsed.fragment else ''
  85. full_path = '%s%s%s%s' % (parsed.path, params, query, fragment)
  86. StubRequest = collections.namedtuple('StubRequest', ('host', 'full_path'))
  87. request, response = StubRequest(host, full_path), None
  88. self.server.log_url(request, response)
  89. return httparchive.ArchivedHttpRequest(
  90. self.command,
  91. host,
  92. full_path,
  93. self.read_request_body(),
  94. self.get_header_dict(),
  95. self.server.is_ssl)
  96. def send_archived_http_response(self, response):
  97. try:
  98. # We need to set the server name before we start the response.
  99. is_chunked = response.is_chunked()
  100. has_content_length = response.get_header('content-length') is not None
  101. self.server_version = response.get_header('server', 'WebPageReplay')
  102. self.sys_version = ''
  103. if response.version == 10:
  104. self.protocol_version = 'HTTP/1.0'
  105. # If we don't have chunked encoding and there is no content length,
  106. # we need to manually compute the content-length.
  107. if not is_chunked and not has_content_length:
  108. content_length = sum(len(c) for c in response.response_data)
  109. response.headers.append(('content-length', str(content_length)))
  110. is_replay = not self.server.http_archive_fetch.is_record_mode
  111. if is_replay and self.server.traffic_shaping_delay_ms:
  112. logging.debug('Using round trip delay: %sms',
  113. self.server.traffic_shaping_delay_ms)
  114. time.sleep(self.server.traffic_shaping_delay_ms / 1000.0)
  115. if is_replay and self.server.use_delays:
  116. logging.debug('Using delays (ms): %s', response.delays)
  117. time.sleep(response.delays['headers'] / 1000.0)
  118. delays = response.delays['data']
  119. else:
  120. delays = [0] * len(response.response_data)
  121. self.send_response(response.status, response.reason)
  122. # TODO(mbelshe): This is lame - each write is a packet!
  123. for header, value in response.headers:
  124. if header in ('last-modified', 'expires'):
  125. self.send_header(header, response.update_date(value))
  126. elif header not in ('date', 'server'):
  127. self.send_header(header, value)
  128. self.end_headers()
  129. for chunk, delay in zip(response.response_data, delays):
  130. if delay:
  131. self.wfile.flush()
  132. time.sleep(delay / 1000.0)
  133. if is_chunked:
  134. # Write chunk length (hex) and data (e.g. "A\r\nTESSELATED\r\n").
  135. self.wfile.write('%x\r\n%s\r\n' % (len(chunk), chunk))
  136. else:
  137. self.wfile.write(chunk)
  138. if is_chunked:
  139. self.wfile.write('0\r\n\r\n') # write final, zero-length chunk.
  140. self.wfile.flush()
  141. # TODO(mbelshe): This connection close doesn't seem to work.
  142. if response.version == 10:
  143. self.close_connection = 1
  144. except Exception, e:
  145. logging.error('Error sending response for %s%s: %s',
  146. self.headers['host'], self.path, e)
  147. def handle_one_request(self):
  148. """Handle a single HTTP request.
  149. This method overrides a method from BaseHTTPRequestHandler. When this
  150. method returns, it must leave self.close_connection in the correct state.
  151. If this method raises an exception, the state of self.close_connection
  152. doesn't matter.
  153. """
  154. try:
  155. self.raw_requestline = self.rfile.readline(65537)
  156. self.do_parse_and_handle_one_request()
  157. except socket.timeout, e:
  158. # A read or a write timed out. Discard this connection
  159. self.log_error('Request timed out: %r', e)
  160. self.close_connection = 1
  161. return
  162. except ssl.SSLError:
  163. # There is insufficient information passed up the stack from OpenSSL to
  164. # determine the true cause of the SSL error. This almost always happens
  165. # because the client refuses to accept the self-signed certs of
  166. # WebPageReplay.
  167. self.close_connection = 1
  168. return
  169. except socket.error, e:
  170. # Connection reset errors happen all the time due to the browser closing
  171. # without terminating the connection properly. They can be safely
  172. # ignored.
  173. if e[0] == errno.ECONNRESET:
  174. self.close_connection = 1
  175. return
  176. raise
  177. def do_parse_and_handle_one_request(self):
  178. start_time = time.time()
  179. self.server.num_active_requests += 1
  180. request = None
  181. try:
  182. if len(self.raw_requestline) > 65536:
  183. self.requestline = ''
  184. self.request_version = ''
  185. self.command = ''
  186. self.send_error(414)
  187. self.close_connection = 0
  188. return
  189. if not self.raw_requestline:
  190. # This indicates that the socket has been closed by the client.
  191. self.close_connection = 1
  192. return
  193. # self.parse_request() sets self.close_connection. There is no need to
  194. # set the property after the method is executed, unless custom behavior
  195. # is desired.
  196. if not self.parse_request():
  197. # An error code has been sent, just exit.
  198. return
  199. try:
  200. response = None
  201. request = self.get_archived_http_request()
  202. if request is None:
  203. self.send_error(500)
  204. return
  205. response = self.server.custom_handlers.handle(request)
  206. if not response:
  207. response = self.server.http_archive_fetch(request)
  208. if (response and response.status == 200 and
  209. self.server.allow_generate_304 and
  210. request.command in set(['GET', 'HEAD']) and
  211. (request.headers.get('if-modified-since', None) or
  212. request.headers.get('if-none-match', None))):
  213. # The WPR archive never get modified since it is not being recorded.
  214. response = httparchive.create_response(
  215. status=304, headers=response.headers)
  216. if response:
  217. self.send_archived_http_response(response)
  218. else:
  219. self.send_error(404)
  220. finally:
  221. self.wfile.flush() # Actually send the response if not already done.
  222. finally:
  223. request_time_ms = (time.time() - start_time) * 1000.0
  224. self.server.total_request_time += request_time_ms
  225. if request:
  226. if response:
  227. logging.debug('Served: %s (%dms)', request, request_time_ms)
  228. else:
  229. logging.warning('Failed to find response for: %s (%dms)',
  230. request, request_time_ms)
  231. self.server.num_active_requests -= 1
  232. def send_error(self, status, body=None):
  233. """Override the default send error with a version that doesn't unnecessarily
  234. close the connection.
  235. """
  236. response = httparchive.create_response(status, body=body)
  237. self.send_archived_http_response(response)
  238. class HttpProxyServer(SocketServer.ThreadingMixIn,
  239. BaseHTTPServer.HTTPServer,
  240. daemonserver.DaemonServer):
  241. HANDLER = HttpArchiveHandler
  242. # Increase the request queue size. The default value, 5, is set in
  243. # SocketServer.TCPServer (the parent of BaseHTTPServer.HTTPServer).
  244. # Since we're intercepting many domains through this single server,
  245. # it is quite possible to get more than 5 concurrent requests.
  246. request_queue_size = 256
  247. # The number of simultaneous connections that the HTTP server supports. This
  248. # is primarily limited by system limits such as RLIMIT_NOFILE.
  249. connection_limit = 500
  250. # Allow sockets to be reused. See
  251. # http://svn.python.org/projects/python/trunk/Lib/SocketServer.py for more
  252. # details.
  253. allow_reuse_address = True
  254. # Don't prevent python from exiting when there is thread activity.
  255. daemon_threads = True
  256. def __init__(self, http_archive_fetch, custom_handlers, rules,
  257. host='localhost', port=80, use_delays=False, is_ssl=False,
  258. protocol='HTTP', allow_generate_304=False,
  259. down_bandwidth='0', up_bandwidth='0', delay_ms='0'):
  260. """Start HTTP server.
  261. Args:
  262. rules: a rule_parser Rules.
  263. host: a host string (name or IP) for the web proxy.
  264. port: a port string (e.g. '80') for the web proxy.
  265. use_delays: if True, add response data delays during replay.
  266. is_ssl: True iff proxy is using SSL.
  267. up_bandwidth: Upload bandwidth
  268. down_bandwidth: Download bandwidth
  269. Bandwidths measured in [K|M]{bit/s|Byte/s}. '0' means unlimited.
  270. delay_ms: Propagation delay in milliseconds. '0' means no delay.
  271. """
  272. if platformsettings.SupportsFdLimitControl():
  273. # BaseHTTPServer opens a new thread and two fds for each connection.
  274. # Check that the process can open at least 1000 fds.
  275. soft_limit, hard_limit = platformsettings.GetFdLimit()
  276. # Add some wiggle room since there are probably fds not associated with
  277. # connections.
  278. wiggle_room = 100
  279. desired_limit = 2 * HttpProxyServer.connection_limit + wiggle_room
  280. if soft_limit < desired_limit:
  281. assert desired_limit <= hard_limit, (
  282. 'The hard limit for number of open files per process is %s which '
  283. 'is lower than the desired limit of %s.' %
  284. (hard_limit, desired_limit))
  285. platformsettings.AdjustFdLimit(desired_limit, hard_limit)
  286. try:
  287. BaseHTTPServer.HTTPServer.__init__(self, (host, port), self.HANDLER)
  288. except Exception, e:
  289. raise HttpProxyServerError('Could not start HTTPServer on port %d: %s' %
  290. (port, e))
  291. self.http_archive_fetch = http_archive_fetch
  292. self.custom_handlers = custom_handlers
  293. self.use_delays = use_delays
  294. self.is_ssl = is_ssl
  295. self.traffic_shaping_down_bps = proxyshaper.GetBitsPerSecond(down_bandwidth)
  296. self.traffic_shaping_up_bps = proxyshaper.GetBitsPerSecond(up_bandwidth)
  297. self.traffic_shaping_delay_ms = int(delay_ms)
  298. self.num_active_requests = 0
  299. self.num_active_connections = 0
  300. self.total_request_time = 0
  301. self.protocol = protocol
  302. self.allow_generate_304 = allow_generate_304
  303. self.log_url = rules.Find('log_url')
  304. # Note: This message may be scraped. Do not change it.
  305. logging.warning(
  306. '%s server started on %s:%d' % (self.protocol, self.server_address[0],
  307. self.server_address[1]))
  308. def cleanup(self):
  309. try:
  310. self.shutdown()
  311. self.server_close()
  312. except KeyboardInterrupt:
  313. pass
  314. logging.info('Stopped %s server. Total time processing requests: %dms',
  315. self.protocol, self.total_request_time)
  316. def get_active_request_count(self):
  317. return self.num_active_requests
  318. def get_request(self):
  319. self.num_active_connections += 1
  320. if self.num_active_connections >= HttpProxyServer.connection_limit:
  321. logging.error(
  322. 'Number of active connections (%s) surpasses the '
  323. 'supported limit of %s.' %
  324. (self.num_active_connections, HttpProxyServer.connection_limit))
  325. return BaseHTTPServer.HTTPServer.get_request(self)
  326. def close_request(self, request):
  327. BaseHTTPServer.HTTPServer.close_request(self, request)
  328. self.num_active_connections -= 1
  329. class HttpsProxyServer(HttpProxyServer):
  330. """SSL server that generates certs for each host."""
  331. def __init__(self, http_archive_fetch, custom_handlers, rules,
  332. https_root_ca_cert_path, **kwargs):
  333. self.ca_cert_path = https_root_ca_cert_path
  334. self.HANDLER = sslproxy.wrap_handler(HttpArchiveHandler)
  335. HttpProxyServer.__init__(self, http_archive_fetch, custom_handlers, rules,
  336. is_ssl=True, protocol='HTTPS', **kwargs)
  337. with open(self.ca_cert_path, 'r') as cert_file:
  338. self._ca_cert_str = cert_file.read()
  339. self._host_to_cert_map = {}
  340. self._server_cert_to_cert_map = {}
  341. def cleanup(self):
  342. try:
  343. self.shutdown()
  344. self.server_close()
  345. except KeyboardInterrupt:
  346. pass
  347. def get_certificate(self, host):
  348. if host in self._host_to_cert_map:
  349. return self._host_to_cert_map[host]
  350. server_cert = self.http_archive_fetch.http_archive.get_server_cert(host)
  351. if server_cert in self._server_cert_to_cert_map:
  352. cert = self._server_cert_to_cert_map[server_cert]
  353. self._host_to_cert_map[host] = cert
  354. return cert
  355. cert = certutils.generate_cert(self._ca_cert_str, server_cert, host)
  356. self._server_cert_to_cert_map[server_cert] = cert
  357. self._host_to_cert_map[host] = cert
  358. return cert
  359. def handle_error(self, request, client_address):
  360. _HandleSSLCertificateError()
  361. class SingleCertHttpsProxyServer(HttpProxyServer):
  362. """SSL server."""
  363. def __init__(self, http_archive_fetch, custom_handlers, rules,
  364. https_root_ca_cert_path, **kwargs):
  365. HttpProxyServer.__init__(self, http_archive_fetch, custom_handlers, rules,
  366. is_ssl=True, protocol='HTTPS', **kwargs)
  367. self.socket = ssl.wrap_socket(
  368. self.socket, certfile=https_root_ca_cert_path, server_side=True,
  369. do_handshake_on_connect=False)
  370. # Ancestor class, DaemonServer, calls serve_forever() during its __init__.
  371. def handle_error(self, request, client_address):
  372. _HandleSSLCertificateError()
  373. class HttpToHttpsProxyServer(HttpProxyServer):
  374. """Listens for HTTP requests but sends them to the target as HTTPS requests"""
  375. def __init__(self, http_archive_fetch, custom_handlers, rules, **kwargs):
  376. HttpProxyServer.__init__(self, http_archive_fetch, custom_handlers, rules,
  377. is_ssl=True, protocol='HTTP-to-HTTPS', **kwargs)
  378. def handle_error(self, request, client_address):
  379. _HandleSSLCertificateError()