| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- #!/usr/bin/env python
- # Copyright 2012 Google Inc. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import mock
- import unittest
- import datetime
- import dnsproxy
- import httparchive
- import httpclient
- import platformsettings
- import script_injector
- import test_utils
- class RealHttpFetchTest(unittest.TestCase):
- # Initialize test data
- CONTENT_TYPE = 'content-type: image/x-icon'
- COOKIE_1 = ('Set-Cookie: GMAIL_IMP=EXPIRED; '
- 'Expires=Thu, 12-Jul-2012 22:41:22 GMT; '
- 'Path=/mail; Secure')
- COOKIE_2 = ('Set-Cookie: GMAIL_STAT_205a=EXPIRED; '
- 'Expires=Thu, 12-Jul-2012 22:42:24 GMT; '
- 'Path=/mail; Secure')
- FIRST_LINE = 'fake-header: first line'
- SECOND_LINE = ' second line'
- THIRD_LINE = '\tthird line'
- BAD_HEADER = 'this is a bad header'
- def test__GetHeaderNameValueBasic(self):
- """Test _GetHeaderNameValue with normal header."""
- real_http_fetch = httpclient.RealHttpFetch
- name_value = real_http_fetch._GetHeaderNameValue(self.CONTENT_TYPE)
- self.assertEqual(name_value, ('content-type', 'image/x-icon'))
- def test__GetHeaderNameValueLowercasesName(self):
- """_GetHeaderNameValue lowercases header name."""
- real_http_fetch = httpclient.RealHttpFetch
- header = 'X-Google-Gfe-Backend-Request-Info: eid=1KMAUMeiK4eMiAL52YyMBg'
- expected = ('x-google-gfe-backend-request-info',
- 'eid=1KMAUMeiK4eMiAL52YyMBg')
- name_value = real_http_fetch._GetHeaderNameValue(header)
- self.assertEqual(name_value, expected)
- def test__GetHeaderNameValueBadLineGivesNone(self):
- """_GetHeaderNameValue returns None for a header in wrong format."""
- real_http_fetch = httpclient.RealHttpFetch
- name_value = real_http_fetch._GetHeaderNameValue(self.BAD_HEADER)
- self.assertIsNone(name_value)
- def test__ToTuplesBasic(self):
- """Test _ToTuples with normal input."""
- real_http_fetch = httpclient.RealHttpFetch
- headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE]
- result = real_http_fetch._ToTuples(headers)
- expected = [('content-type', 'image/x-icon'),
- ('set-cookie', self.COOKIE_1[12:]),
- ('fake-header', 'first line')]
- self.assertEqual(result, expected)
- def test__ToTuplesMultipleHeadersWithSameName(self):
- """Test mulitple headers with the same name."""
- real_http_fetch = httpclient.RealHttpFetch
- headers = [self.CONTENT_TYPE, self.COOKIE_1, self.COOKIE_2, self.FIRST_LINE]
- result = real_http_fetch._ToTuples(headers)
- expected = [('content-type', 'image/x-icon'),
- ('set-cookie', self.COOKIE_1[12:]),
- ('set-cookie', self.COOKIE_2[12:]),
- ('fake-header', 'first line')]
- self.assertEqual(result, expected)
- def test__ToTuplesAppendsContinuationLine(self):
- """Test continuation line is handled."""
- real_http_fetch = httpclient.RealHttpFetch
- headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE,
- self.SECOND_LINE, self.THIRD_LINE]
- result = real_http_fetch._ToTuples(headers)
- expected = [('content-type', 'image/x-icon'),
- ('set-cookie', self.COOKIE_1[12:]),
- ('fake-header', 'first line\n second line\n third line')]
- self.assertEqual(result, expected)
- def test__ToTuplesIgnoresBadHeader(self):
- """Test bad header is ignored."""
- real_http_fetch = httpclient.RealHttpFetch
- bad_headers = [self.CONTENT_TYPE, self.BAD_HEADER, self.COOKIE_1]
- expected = [('content-type', 'image/x-icon'),
- ('set-cookie', self.COOKIE_1[12:])]
- result = real_http_fetch._ToTuples(bad_headers)
- self.assertEqual(result, expected)
- def test__ToTuplesIgnoresMisplacedContinuationLine(self):
- """Test misplaced continuation line is ignored."""
- real_http_fetch = httpclient.RealHttpFetch
- misplaced_headers = [self.THIRD_LINE, self.CONTENT_TYPE,
- self.COOKIE_1, self.FIRST_LINE, self.SECOND_LINE]
- result = real_http_fetch._ToTuples(misplaced_headers)
- expected = [('content-type', 'image/x-icon'),
- ('set-cookie', self.COOKIE_1[12:]),
- ('fake-header', 'first line\n second line')]
- self.assertEqual(result, expected)
- class RealHttpFetchGetConnectionTest(unittest.TestCase):
- """Test that a connection is made with request IP/port or proxy IP/port."""
- def setUp(self):
- def real_dns_lookup(host):
- return {
- 'example.com': '127.127.127.127',
- 'proxy.com': '2.2.2.2',
- }[host]
- self.fetch = httpclient.RealHttpFetch(real_dns_lookup)
- self.https_proxy = None
- self.http_proxy = None
- def get_proxy(is_ssl):
- return self.https_proxy if is_ssl else self.http_proxy
- self.fetch._get_system_proxy = get_proxy
- def set_http_proxy(self, host, port):
- self.http_proxy = platformsettings.SystemProxy(host, port)
- def set_https_proxy(self, host, port):
- self.https_proxy = platformsettings.SystemProxy(host, port)
- def test_get_connection_without_proxy_connects_to_host_ip(self):
- """HTTP connection with no proxy connects to host IP."""
- self.set_http_proxy(host=None, port=None)
- connection = self.fetch._get_connection('example.com', None, is_ssl=False)
- self.assertEqual('127.127.127.127', connection.host)
- self.assertEqual(80, connection.port) # default HTTP port
- def test_get_connection_without_proxy_uses_nondefault_request_port(self):
- """HTTP connection with no proxy connects with request port."""
- self.set_https_proxy(host=None, port=None)
- connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
- self.assertEqual('127.127.127.127', connection.host)
- self.assertEqual(8888, connection.port) # request HTTP port
- def test_get_connection_with_proxy_uses_proxy_port(self):
- """HTTP connection with proxy connects used proxy port."""
- self.set_http_proxy(host='proxy.com', port=None)
- connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
- self.assertEqual('2.2.2.2', connection.host) # proxy IP
- self.assertEqual(80, connection.port) # proxy port (default HTTP)
- def test_ssl_get_connection_without_proxy_connects_to_host_ip(self):
- """HTTPS (SSL) connection with no proxy connects to host IP."""
- self.set_https_proxy(host=None, port=None)
- connection = self.fetch._get_connection('example.com', None, is_ssl=True)
- self.assertEqual('127.127.127.127', connection.host)
- self.assertEqual(443, connection.port) # default SSL port
- def test_ssl_get_connection_with_proxy_connects_to_proxy_ip(self):
- """HTTPS (SSL) connection with proxy connects to proxy IP."""
- self.set_https_proxy(host='proxy.com', port=8443)
- connection = self.fetch._get_connection('example.com', None, is_ssl=True)
- self.assertEqual('2.2.2.2', connection.host) # proxy IP
- self.assertEqual(8443, connection.port) # SSL proxy port
- def test_ssl_get_connection_with_proxy_tunnels_to_host(self):
- """HTTPS (SSL) connection with proxy tunnels to target host."""
- self.set_https_proxy(host='proxy.com', port=8443)
- connection = self.fetch._get_connection('example.com', None, is_ssl=True)
- self.assertEqual('example.com', connection._tunnel_host) # host name
- self.assertEqual(None, connection._tunnel_port) # host port
- class ActualNetworkFetchTest(test_utils.RealNetworkFetchTest):
- def testFetchNonSSLRequest(self):
- real_dns_lookup = dnsproxy.RealDnsLookup(
- name_servers=[platformsettings.get_original_primary_nameserver()])
- fetch = httpclient.RealHttpFetch(real_dns_lookup)
- request = httparchive.ArchivedHttpRequest(
- command='GET', host='google.com', full_path='/search?q=dogs',
- request_body=None, headers={}, is_ssl=False)
- response = fetch(request)
- self.assertIsNotNone(response)
- def testFetchSSLRequest(self):
- real_dns_lookup = dnsproxy.RealDnsLookup(
- name_servers=[platformsettings.get_original_primary_nameserver()])
- fetch = httpclient.RealHttpFetch(real_dns_lookup)
- request = httparchive.ArchivedHttpRequest(
- command='GET', host='google.com', full_path='/search?q=dogs',
- request_body=None, headers={}, is_ssl=True)
- response = fetch(request)
- self.assertIsNotNone(response)
- class HttpArchiveFetchTest(unittest.TestCase):
- TEST_REQUEST_TIME = datetime.datetime(2016, 11, 17, 1, 2, 3, 456)
- def createTestResponse(self):
- return httparchive.ArchivedHttpResponse(
- 11, 200, 'OK', [('content-type', 'text/html')],
- ['<body>test</body>'],
- request_time=HttpArchiveFetchTest.TEST_REQUEST_TIME)
- def checkTestResponse(self, actual_response, archive, request):
- self.assertEqual(actual_response, archive[request])
- self.assertEqual(['<body>test</body>'], actual_response.response_data)
- self.assertEqual(HttpArchiveFetchTest.TEST_REQUEST_TIME,
- actual_response.request_time)
- @staticmethod
- def dummy_injector(_):
- return '<body>test</body>'
- class RecordHttpArchiveFetchTest(HttpArchiveFetchTest):
- @mock.patch('httpclient.RealHttpFetch')
- def testFetch(self, real_http_fetch):
- http_fetch_instance = real_http_fetch.return_value
- response = self.createTestResponse()
- http_fetch_instance.return_value = response
- archive = httparchive.HttpArchive()
- fetch = httpclient.RecordHttpArchiveFetch(archive, self.dummy_injector)
- request = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/', None, {})
- self.checkTestResponse(fetch(request), archive, request)
- class ReplayHttpArchiveFetchTest(HttpArchiveFetchTest):
- def testFetch(self):
- request = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/', None, {})
- response = self.createTestResponse()
- archive = httparchive.HttpArchive()
- archive[request] = response
- fetch = httpclient.ReplayHttpArchiveFetch(
- archive, None, self.dummy_injector)
- self.checkTestResponse(fetch(request), archive, request)
- @mock.patch('script_injector.util.resource_string')
- @mock.patch('script_injector.util.resource_exists')
- @mock.patch('script_injector.os.path.exists')
- def testInjectedDate(self, os_path, util_exists, util_resource_string):
- os_path.return_value = False
- util_exists.return_value = True
- util_resource_string.return_value = \
- ["""var time_seed={}""".format(script_injector.TIME_SEED_MARKER)]
- request = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/', None, {})
- response = self.createTestResponse()
- archive = httparchive.HttpArchive()
- archive[request] = response
- fetch = httpclient.ReplayHttpArchiveFetch(
- archive, None, script_injector.GetScriptInjector("time_script.js"))
- self.assertEqual(
- ['<script>var time_seed=1479344523000</script><body>test</body>'],
- fetch(request).response_data)
- if __name__ == '__main__':
- unittest.main()
|