| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- #!/usr/bin/env python
- # Copyright 2011 Google Inc. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import calendar
- import email.utils
- import httparchive
- import unittest
- def create_request(headers):
- return httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/', None, headers)
- def create_response(headers):
- return httparchive.ArchivedHttpResponse(
- 11, 200, 'OK', headers, '')
- class HttpArchiveTest(unittest.TestCase):
- REQUEST_HEADERS = {}
- REQUEST = create_request(REQUEST_HEADERS)
- # Used for if-(un)modified-since checks
- DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT'
- DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT'
- DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT'
- DATE_INVALID = 'This is an invalid date!!'
- # etag values
- ETAG_VALID = 'etag'
- ETAG_INVALID = 'This is an invalid etag value!!'
- RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)]
- RESPONSE = create_response(RESPONSE_HEADERS)
- def setUp(self):
- self.archive = httparchive.HttpArchive()
- self.archive[self.REQUEST] = self.RESPONSE
- # Also add an identical POST request for testing
- request = httparchive.ArchivedHttpRequest(
- 'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS)
- self.archive[request] = self.RESPONSE
- def tearDown(self):
- pass
- def test_init(self):
- archive = httparchive.HttpArchive()
- self.assertEqual(len(archive), 0)
- def test_request__TrimHeaders(self):
- request = httparchive.ArchivedHttpRequest
- header1 = {'accept-encoding': 'gzip,deflate'}
- self.assertEqual(request._TrimHeaders(header1),
- [(k, v) for k, v in header1.items()])
- header2 = {'referer': 'www.google.com'}
- self.assertEqual(request._TrimHeaders(header2), [])
- header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!',
- 'hello': 'world'}
- self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')])
- # Tests that spaces and trailing comma get stripped.
- header4 = {'accept-encoding': 'gzip, deflate,, '}
- self.assertEqual(request._TrimHeaders(header4),
- [('accept-encoding', 'gzip,deflate')])
- # Tests that 'lzma' gets stripped.
- header5 = {'accept-encoding': 'gzip, deflate, lzma'}
- self.assertEqual(request._TrimHeaders(header5),
- [('accept-encoding', 'gzip,deflate')])
- # Tests that x-client-data gets stripped.
- header6 = {'x-client-data': 'testdata'}
- self.assertEqual(request._TrimHeaders(header6), [])
- def test_matches(self):
- headers = {}
- request1 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/index.html?hello=world', None, headers)
- request2 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/index.html?foo=bar', None, headers)
- self.assert_(not request1.matches(
- request2.command, request2.host, request2.full_path, use_query=True))
- self.assert_(request1.matches(
- request2.command, request2.host, request2.full_path, use_query=False))
- self.assert_(request1.matches(
- request2.command, request2.host, None, use_query=True))
- self.assert_(request1.matches(
- request2.command, None, request2.full_path, use_query=False))
- empty_request = httparchive.ArchivedHttpRequest(
- None, None, None, None, headers)
- self.assert_(not empty_request.matches(
- request2.command, request2.host, None, use_query=True))
- self.assert_(not empty_request.matches(
- request2.command, None, request2.full_path, use_query=False))
- def setup_find_closest_request(self):
- headers = {}
- request1 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/a?hello=world', None, headers)
- request2 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/a?foo=bar', None, headers)
- request3 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/b?hello=world', None, headers)
- request4 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/c?hello=world', None, headers)
- archive = httparchive.HttpArchive()
- # Add requests 2 and 3 and find closest match with request1
- archive[request2] = self.RESPONSE
- archive[request3] = self.RESPONSE
- return archive, request1, request2, request3, request4
- def test_find_closest_request(self):
- archive, request1, request2, request3, request4 = (
- self.setup_find_closest_request())
- # Always favor requests with same paths, even if use_path=False.
- self.assertEqual(
- request2, archive.find_closest_request(request1, use_path=False))
- # If we match strictly on path, request2 is the only match
- self.assertEqual(
- request2, archive.find_closest_request(request1, use_path=True))
- # request4 can be matched with request3, if use_path=False
- self.assertEqual(
- request3, archive.find_closest_request(request4, use_path=False))
- # ...but None, if use_path=True
- self.assertEqual(
- None, archive.find_closest_request(request4, use_path=True))
- def test_find_closest_request_delete_simple(self):
- archive, request1, request2, request3, request4 = (
- self.setup_find_closest_request())
- del archive[request3]
- self.assertEqual(
- request2, archive.find_closest_request(request1, use_path=False))
- self.assertEqual(
- request2, archive.find_closest_request(request1, use_path=True))
- def test_find_closest_request_delete_complex(self):
- archive, request1, request2, request3, request4 = (
- self.setup_find_closest_request())
- del archive[request2]
- self.assertEqual(
- request3, archive.find_closest_request(request1, use_path=False))
- self.assertEqual(
- None, archive.find_closest_request(request1, use_path=True))
- def test_find_closest_request_timestamp(self):
- headers = {}
- request1 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/index.html?time=100000000&important=true',
- None, headers)
- request2 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/index.html?time=99999999&important=true',
- None, headers)
- request3 = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/index.html?time=10000000&important=false',
- None, headers)
- archive = httparchive.HttpArchive()
- # Add requests 2 and 3 and find closest match with request1
- archive[request2] = self.RESPONSE
- archive[request3] = self.RESPONSE
- # Although request3 is lexicographically closer, request2 is semantically
- # more similar.
- self.assertEqual(
- request2, archive.find_closest_request(request1, use_path=True))
- def test_get_cmp_seq(self):
- # The order of key-value pairs in query and header respectively should not
- # matter.
- headers = {'k2': 'v2', 'k1': 'v1'}
- request = httparchive.ArchivedHttpRequest(
- 'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers)
- self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'),
- ('k1', 'v1'), ('k2', 'v2')],
- request._GetCmpSeq('c=d&a=b;e=f'))
- def test_get_simple(self):
- request = self.REQUEST
- response = self.RESPONSE
- archive = self.archive
- self.assertEqual(archive.get(request), response)
- false_request_headers = {'foo': 'bar'}
- false_request = create_request(false_request_headers)
- self.assertEqual(archive.get(false_request, default=None), None)
- def test_get_modified_headers(self):
- request = self.REQUEST
- response = self.RESPONSE
- archive = self.archive
- not_modified_response = httparchive.create_response(304)
- # Fail check and return response again
- request_headers = {'if-modified-since': self.DATE_PAST}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- # Succeed check and return 304 Not Modified
- request_headers = {'if-modified-since': self.DATE_FUTURE}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- # Succeed check and return 304 Not Modified
- request_headers = {'if-modified-since': self.DATE_PRESENT}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- # Invalid date, fail check and return response again
- request_headers = {'if-modified-since': self.DATE_INVALID}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- # fail check since the request is not a GET or HEAD request (as per RFC)
- request_headers = {'if-modified-since': self.DATE_FUTURE}
- request = httparchive.ArchivedHttpRequest(
- 'POST', 'www.test.com', '/', None, request_headers)
- self.assertEqual(archive.get(request), response)
- def test_get_unmodified_headers(self):
- request = self.REQUEST
- response = self.RESPONSE
- archive = self.archive
- not_modified_response = httparchive.create_response(304)
- # Succeed check
- request_headers = {'if-unmodified-since': self.DATE_PAST}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- # Fail check
- request_headers = {'if-unmodified-since': self.DATE_FUTURE}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- # Succeed check
- request_headers = {'if-unmodified-since': self.DATE_PRESENT}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- # Fail check
- request_headers = {'if-unmodified-since': self.DATE_INVALID}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- # Fail check since the request is not a GET or HEAD request (as per RFC)
- request_headers = {'if-modified-since': self.DATE_PAST}
- request = httparchive.ArchivedHttpRequest(
- 'POST', 'www.test.com', '/', None, request_headers)
- self.assertEqual(archive.get(request), response)
- def test_get_etags(self):
- request = self.REQUEST
- response = self.RESPONSE
- archive = self.archive
- not_modified_response = httparchive.create_response(304)
- precondition_failed_response = httparchive.create_response(412)
- # if-match headers
- request_headers = {'if-match': self.ETAG_VALID}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- request_headers = {'if-match': self.ETAG_INVALID}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), precondition_failed_response)
- # if-none-match headers
- request_headers = {'if-none-match': self.ETAG_VALID}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- request_headers = {'if-none-match': self.ETAG_INVALID}
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- def test_get_multiple_match_headers(self):
- request = self.REQUEST
- response = self.RESPONSE
- archive = self.archive
- not_modified_response = httparchive.create_response(304)
- precondition_failed_response = httparchive.create_response(412)
- # if-match headers
- # If the request would, without the If-Match header field,
- # result in anything other than a 2xx or 412 status,
- # then the If-Match header MUST be ignored.
- request_headers = {
- 'if-match': self.ETAG_VALID,
- 'if-modified-since': self.DATE_PAST,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- # Invalid etag, precondition failed
- request_headers = {
- 'if-match': self.ETAG_INVALID,
- 'if-modified-since': self.DATE_PAST,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), precondition_failed_response)
- # 304 response; ignore if-match header
- request_headers = {
- 'if-match': self.ETAG_VALID,
- 'if-modified-since': self.DATE_FUTURE,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- # 304 response; ignore if-match header
- request_headers = {
- 'if-match': self.ETAG_INVALID,
- 'if-modified-since': self.DATE_PRESENT,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- # Invalid etag, precondition failed
- request_headers = {
- 'if-match': self.ETAG_INVALID,
- 'if-modified-since': self.DATE_INVALID,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), precondition_failed_response)
- def test_get_multiple_none_match_headers(self):
- request = self.REQUEST
- response = self.RESPONSE
- archive = self.archive
- not_modified_response = httparchive.create_response(304)
- precondition_failed_response = httparchive.create_response(412)
- # if-none-match headers
- # If the request would, without the If-None-Match header field,
- # result in anything other than a 2xx or 304 status,
- # then the If-None-Match header MUST be ignored.
- request_headers = {
- 'if-none-match': self.ETAG_VALID,
- 'if-modified-since': self.DATE_PAST,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- request_headers = {
- 'if-none-match': self.ETAG_INVALID,
- 'if-modified-since': self.DATE_PAST,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- # etag match, precondition failed
- request_headers = {
- 'if-none-match': self.ETAG_VALID,
- 'if-modified-since': self.DATE_FUTURE,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- request_headers = {
- 'if-none-match': self.ETAG_INVALID,
- 'if-modified-since': self.DATE_PRESENT,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), not_modified_response)
- request_headers = {
- 'if-none-match': self.ETAG_INVALID,
- 'if-modified-since': self.DATE_INVALID,
- }
- request = create_request(request_headers)
- self.assertEqual(archive.get(request), response)
- def test_response__TrimHeaders(self):
- response = httparchive.ArchivedHttpResponse
- header1 = [('access-control-allow-origin', '*'),
- ('content-type', 'image/jpeg'),
- ('content-length', 2878)]
- self.assertEqual(response._TrimHeaders(header1), header1)
- header2 = [('content-type', 'text/javascript; charset=utf-8'),
- ('connection', 'keep-alive'),
- ('cache-control', 'private, must-revalidate, max-age=0'),
- ('content-encoding', 'gzip')]
- self.assertEqual(response._TrimHeaders(header2), header2)
- header3 = [('content-security-policy', """\
- default-src 'self' http://*.cnn.com:* https://*.cnn.com:* \
- *.cnn.net:* *.turner.com:* *.ugdturner.com:* *.vgtf.net:*; \
- script-src 'unsafe-inline' 'unsafe-eval' 'self' *; \
- style-src 'unsafe-inline' 'self' *; frame-src 'self' *; \
- object-src 'self' *; img-src 'self' * data:; media-src 'self' *; \
- font-src 'self' *; connect-src 'self' *"""),
- ('access-control-allow-origin', '*'),
- ('content-type', 'text/html; charset=utf-8'),
- ('content-encoding', 'gzip')]
- self.assertEqual(response._TrimHeaders(header3), [
- ('access-control-allow-origin', '*'),
- ('content-type', 'text/html; charset=utf-8'),
- ('content-encoding', 'gzip')
- ])
- header4 = [('content-security-policy', """\
- default-src * data: blob:;script-src *.facebook.com *.fbcdn.net \
- *.facebook.net *.google-analytics.com *.virtualearth.net *.google.com \
- 127.0.0.1:* *.spotilocal.com:* 'unsafe-inline' 'unsafe-eval' \
- fbstatic-a.akamaihd.net fbcdn-static-b-a.akamaihd.net *.atlassolutions.com \
- blob: chrome-extension://lifbcibllhkdhoafpjfnlhfpfgnpldfl \
- *.liverail.com;style-src * 'unsafe-inline' data:;connect-src *.facebook.com \
- *.fbcdn.net *.facebook.net *.spotilocal.com:* *.akamaihd.net \
- wss://*.facebook.com:* https://fb.scanandcleanlocal.com:* \
- *.atlassolutions.com attachment.fbsbx.com ws://localhost:* \
- blob: 127.0.0.1:* *.liverail.com""")]
- self.assertEqual(response._TrimHeaders(header4), [])
- class ArchivedHttpResponse(unittest.TestCase):
- PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT'
- PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT' # PAST_DATE_A -1 hour
- PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT' # PAST_DATE_A +1 hour
- NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT'
- NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT' # NOW_DATE_A -1 hour
- NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT' # NOW_DATE_A +1 hour
- NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A))
- def setUp(self):
- self.response = create_response([('date', self.PAST_DATE_A)])
- def test_update_date_same_date(self):
- self.assertEqual(
- self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS),
- self.NOW_DATE_A)
- def test_update_date_before_date(self):
- self.assertEqual(
- self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
- self.NOW_DATE_B)
- def test_update_date_after_date(self):
- self.assertEqual(
- self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS),
- self.NOW_DATE_C)
- def test_update_date_bad_date_param(self):
- self.assertEqual(
- self.response.update_date('garbage date', now=self.NOW_SECONDS),
- 'garbage date')
- def test_update_date_bad_date_header(self):
- self.response.set_header('date', 'garbage date')
- self.assertEqual(
- self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
- self.PAST_DATE_B)
- if __name__ == '__main__':
- unittest.main()
|