httparchive_test.py 18 KB


  1. #!/usr/bin/env python
  2. # Copyright 2011 Google Inc. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import calendar
  16. import email.utils
  17. import httparchive
  18. import unittest
  19. def create_request(headers):
  20. return httparchive.ArchivedHttpRequest(
  21. 'GET', 'www.test.com', '/', None, headers)
  22. def create_response(headers):
  23. return httparchive.ArchivedHttpResponse(
  24. 11, 200, 'OK', headers, '')
  25. class HttpArchiveTest(unittest.TestCase):
  26. REQUEST_HEADERS = {}
  27. REQUEST = create_request(REQUEST_HEADERS)
  28. # Used for if-(un)modified-since checks
  29. DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT'
  30. DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT'
  31. DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT'
  32. DATE_INVALID = 'This is an invalid date!!'
  33. # etag values
  34. ETAG_VALID = 'etag'
  35. ETAG_INVALID = 'This is an invalid etag value!!'
  36. RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)]
  37. RESPONSE = create_response(RESPONSE_HEADERS)
  38. def setUp(self):
  39. self.archive = httparchive.HttpArchive()
  40. self.archive[self.REQUEST] = self.RESPONSE
  41. # Also add an identical POST request for testing
  42. request = httparchive.ArchivedHttpRequest(
  43. 'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS)
  44. self.archive[request] = self.RESPONSE
  45. def tearDown(self):
  46. pass
  47. def test_init(self):
  48. archive = httparchive.HttpArchive()
  49. self.assertEqual(len(archive), 0)
  50. def test_request__TrimHeaders(self):
  51. request = httparchive.ArchivedHttpRequest
  52. header1 = {'accept-encoding': 'gzip,deflate'}
  53. self.assertEqual(request._TrimHeaders(header1),
  54. [(k, v) for k, v in header1.items()])
  55. header2 = {'referer': 'www.google.com'}
  56. self.assertEqual(request._TrimHeaders(header2), [])
  57. header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!',
  58. 'hello': 'world'}
  59. self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')])
  60. # Tests that spaces and trailing comma get stripped.
  61. header4 = {'accept-encoding': 'gzip, deflate,, '}
  62. self.assertEqual(request._TrimHeaders(header4),
  63. [('accept-encoding', 'gzip,deflate')])
  64. # Tests that 'lzma' gets stripped.
  65. header5 = {'accept-encoding': 'gzip, deflate, lzma'}
  66. self.assertEqual(request._TrimHeaders(header5),
  67. [('accept-encoding', 'gzip,deflate')])
  68. # Tests that x-client-data gets stripped.
  69. header6 = {'x-client-data': 'testdata'}
  70. self.assertEqual(request._TrimHeaders(header6), [])
  71. def test_matches(self):
  72. headers = {}
  73. request1 = httparchive.ArchivedHttpRequest(
  74. 'GET', 'www.test.com', '/index.html?hello=world', None, headers)
  75. request2 = httparchive.ArchivedHttpRequest(
  76. 'GET', 'www.test.com', '/index.html?foo=bar', None, headers)
  77. self.assert_(not request1.matches(
  78. request2.command, request2.host, request2.full_path, use_query=True))
  79. self.assert_(request1.matches(
  80. request2.command, request2.host, request2.full_path, use_query=False))
  81. self.assert_(request1.matches(
  82. request2.command, request2.host, None, use_query=True))
  83. self.assert_(request1.matches(
  84. request2.command, None, request2.full_path, use_query=False))
  85. empty_request = httparchive.ArchivedHttpRequest(
  86. None, None, None, None, headers)
  87. self.assert_(not empty_request.matches(
  88. request2.command, request2.host, None, use_query=True))
  89. self.assert_(not empty_request.matches(
  90. request2.command, None, request2.full_path, use_query=False))
  91. def setup_find_closest_request(self):
  92. headers = {}
  93. request1 = httparchive.ArchivedHttpRequest(
  94. 'GET', 'www.test.com', '/a?hello=world', None, headers)
  95. request2 = httparchive.ArchivedHttpRequest(
  96. 'GET', 'www.test.com', '/a?foo=bar', None, headers)
  97. request3 = httparchive.ArchivedHttpRequest(
  98. 'GET', 'www.test.com', '/b?hello=world', None, headers)
  99. request4 = httparchive.ArchivedHttpRequest(
  100. 'GET', 'www.test.com', '/c?hello=world', None, headers)
  101. archive = httparchive.HttpArchive()
  102. # Add requests 2 and 3 and find closest match with request1
  103. archive[request2] = self.RESPONSE
  104. archive[request3] = self.RESPONSE
  105. return archive, request1, request2, request3, request4
  106. def test_find_closest_request(self):
  107. archive, request1, request2, request3, request4 = (
  108. self.setup_find_closest_request())
  109. # Always favor requests with same paths, even if use_path=False.
  110. self.assertEqual(
  111. request2, archive.find_closest_request(request1, use_path=False))
  112. # If we match strictly on path, request2 is the only match
  113. self.assertEqual(
  114. request2, archive.find_closest_request(request1, use_path=True))
  115. # request4 can be matched with request3, if use_path=False
  116. self.assertEqual(
  117. request3, archive.find_closest_request(request4, use_path=False))
  118. # ...but None, if use_path=True
  119. self.assertEqual(
  120. None, archive.find_closest_request(request4, use_path=True))
  121. def test_find_closest_request_delete_simple(self):
  122. archive, request1, request2, request3, request4 = (
  123. self.setup_find_closest_request())
  124. del archive[request3]
  125. self.assertEqual(
  126. request2, archive.find_closest_request(request1, use_path=False))
  127. self.assertEqual(
  128. request2, archive.find_closest_request(request1, use_path=True))
  129. def test_find_closest_request_delete_complex(self):
  130. archive, request1, request2, request3, request4 = (
  131. self.setup_find_closest_request())
  132. del archive[request2]
  133. self.assertEqual(
  134. request3, archive.find_closest_request(request1, use_path=False))
  135. self.assertEqual(
  136. None, archive.find_closest_request(request1, use_path=True))
  137. def test_find_closest_request_timestamp(self):
  138. headers = {}
  139. request1 = httparchive.ArchivedHttpRequest(
  140. 'GET', 'www.test.com', '/index.html?time=100000000&important=true',
  141. None, headers)
  142. request2 = httparchive.ArchivedHttpRequest(
  143. 'GET', 'www.test.com', '/index.html?time=99999999&important=true',
  144. None, headers)
  145. request3 = httparchive.ArchivedHttpRequest(
  146. 'GET', 'www.test.com', '/index.html?time=10000000&important=false',
  147. None, headers)
  148. archive = httparchive.HttpArchive()
  149. # Add requests 2 and 3 and find closest match with request1
  150. archive[request2] = self.RESPONSE
  151. archive[request3] = self.RESPONSE
  152. # Although request3 is lexicographically closer, request2 is semantically
  153. # more similar.
  154. self.assertEqual(
  155. request2, archive.find_closest_request(request1, use_path=True))
  156. def test_get_cmp_seq(self):
  157. # The order of key-value pairs in query and header respectively should not
  158. # matter.
  159. headers = {'k2': 'v2', 'k1': 'v1'}
  160. request = httparchive.ArchivedHttpRequest(
  161. 'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers)
  162. self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'),
  163. ('k1', 'v1'), ('k2', 'v2')],
  164. request._GetCmpSeq('c=d&a=b;e=f'))
  165. def test_get_simple(self):
  166. request = self.REQUEST
  167. response = self.RESPONSE
  168. archive = self.archive
  169. self.assertEqual(archive.get(request), response)
  170. false_request_headers = {'foo': 'bar'}
  171. false_request = create_request(false_request_headers)
  172. self.assertEqual(archive.get(false_request, default=None), None)
  173. def test_get_modified_headers(self):
  174. request = self.REQUEST
  175. response = self.RESPONSE
  176. archive = self.archive
  177. not_modified_response = httparchive.create_response(304)
  178. # Fail check and return response again
  179. request_headers = {'if-modified-since': self.DATE_PAST}
  180. request = create_request(request_headers)
  181. self.assertEqual(archive.get(request), response)
  182. # Succeed check and return 304 Not Modified
  183. request_headers = {'if-modified-since': self.DATE_FUTURE}
  184. request = create_request(request_headers)
  185. self.assertEqual(archive.get(request), not_modified_response)
  186. # Succeed check and return 304 Not Modified
  187. request_headers = {'if-modified-since': self.DATE_PRESENT}
  188. request = create_request(request_headers)
  189. self.assertEqual(archive.get(request), not_modified_response)
  190. # Invalid date, fail check and return response again
  191. request_headers = {'if-modified-since': self.DATE_INVALID}
  192. request = create_request(request_headers)
  193. self.assertEqual(archive.get(request), response)
  194. # fail check since the request is not a GET or HEAD request (as per RFC)
  195. request_headers = {'if-modified-since': self.DATE_FUTURE}
  196. request = httparchive.ArchivedHttpRequest(
  197. 'POST', 'www.test.com', '/', None, request_headers)
  198. self.assertEqual(archive.get(request), response)
  199. def test_get_unmodified_headers(self):
  200. request = self.REQUEST
  201. response = self.RESPONSE
  202. archive = self.archive
  203. not_modified_response = httparchive.create_response(304)
  204. # Succeed check
  205. request_headers = {'if-unmodified-since': self.DATE_PAST}
  206. request = create_request(request_headers)
  207. self.assertEqual(archive.get(request), not_modified_response)
  208. # Fail check
  209. request_headers = {'if-unmodified-since': self.DATE_FUTURE}
  210. request = create_request(request_headers)
  211. self.assertEqual(archive.get(request), response)
  212. # Succeed check
  213. request_headers = {'if-unmodified-since': self.DATE_PRESENT}
  214. request = create_request(request_headers)
  215. self.assertEqual(archive.get(request), not_modified_response)
  216. # Fail check
  217. request_headers = {'if-unmodified-since': self.DATE_INVALID}
  218. request = create_request(request_headers)
  219. self.assertEqual(archive.get(request), response)
  220. # Fail check since the request is not a GET or HEAD request (as per RFC)
  221. request_headers = {'if-modified-since': self.DATE_PAST}
  222. request = httparchive.ArchivedHttpRequest(
  223. 'POST', 'www.test.com', '/', None, request_headers)
  224. self.assertEqual(archive.get(request), response)
  225. def test_get_etags(self):
  226. request = self.REQUEST
  227. response = self.RESPONSE
  228. archive = self.archive
  229. not_modified_response = httparchive.create_response(304)
  230. precondition_failed_response = httparchive.create_response(412)
  231. # if-match headers
  232. request_headers = {'if-match': self.ETAG_VALID}
  233. request = create_request(request_headers)
  234. self.assertEqual(archive.get(request), response)
  235. request_headers = {'if-match': self.ETAG_INVALID}
  236. request = create_request(request_headers)
  237. self.assertEqual(archive.get(request), precondition_failed_response)
  238. # if-none-match headers
  239. request_headers = {'if-none-match': self.ETAG_VALID}
  240. request = create_request(request_headers)
  241. self.assertEqual(archive.get(request), not_modified_response)
  242. request_headers = {'if-none-match': self.ETAG_INVALID}
  243. request = create_request(request_headers)
  244. self.assertEqual(archive.get(request), response)
  245. def test_get_multiple_match_headers(self):
  246. request = self.REQUEST
  247. response = self.RESPONSE
  248. archive = self.archive
  249. not_modified_response = httparchive.create_response(304)
  250. precondition_failed_response = httparchive.create_response(412)
  251. # if-match headers
  252. # If the request would, without the If-Match header field,
  253. # result in anything other than a 2xx or 412 status,
  254. # then the If-Match header MUST be ignored.
  255. request_headers = {
  256. 'if-match': self.ETAG_VALID,
  257. 'if-modified-since': self.DATE_PAST,
  258. }
  259. request = create_request(request_headers)
  260. self.assertEqual(archive.get(request), response)
  261. # Invalid etag, precondition failed
  262. request_headers = {
  263. 'if-match': self.ETAG_INVALID,
  264. 'if-modified-since': self.DATE_PAST,
  265. }
  266. request = create_request(request_headers)
  267. self.assertEqual(archive.get(request), precondition_failed_response)
  268. # 304 response; ignore if-match header
  269. request_headers = {
  270. 'if-match': self.ETAG_VALID,
  271. 'if-modified-since': self.DATE_FUTURE,
  272. }
  273. request = create_request(request_headers)
  274. self.assertEqual(archive.get(request), not_modified_response)
  275. # 304 response; ignore if-match header
  276. request_headers = {
  277. 'if-match': self.ETAG_INVALID,
  278. 'if-modified-since': self.DATE_PRESENT,
  279. }
  280. request = create_request(request_headers)
  281. self.assertEqual(archive.get(request), not_modified_response)
  282. # Invalid etag, precondition failed
  283. request_headers = {
  284. 'if-match': self.ETAG_INVALID,
  285. 'if-modified-since': self.DATE_INVALID,
  286. }
  287. request = create_request(request_headers)
  288. self.assertEqual(archive.get(request), precondition_failed_response)
  289. def test_get_multiple_none_match_headers(self):
  290. request = self.REQUEST
  291. response = self.RESPONSE
  292. archive = self.archive
  293. not_modified_response = httparchive.create_response(304)
  294. precondition_failed_response = httparchive.create_response(412)
  295. # if-none-match headers
  296. # If the request would, without the If-None-Match header field,
  297. # result in anything other than a 2xx or 304 status,
  298. # then the If-None-Match header MUST be ignored.
  299. request_headers = {
  300. 'if-none-match': self.ETAG_VALID,
  301. 'if-modified-since': self.DATE_PAST,
  302. }
  303. request = create_request(request_headers)
  304. self.assertEqual(archive.get(request), response)
  305. request_headers = {
  306. 'if-none-match': self.ETAG_INVALID,
  307. 'if-modified-since': self.DATE_PAST,
  308. }
  309. request = create_request(request_headers)
  310. self.assertEqual(archive.get(request), response)
  311. # etag match, precondition failed
  312. request_headers = {
  313. 'if-none-match': self.ETAG_VALID,
  314. 'if-modified-since': self.DATE_FUTURE,
  315. }
  316. request = create_request(request_headers)
  317. self.assertEqual(archive.get(request), not_modified_response)
  318. request_headers = {
  319. 'if-none-match': self.ETAG_INVALID,
  320. 'if-modified-since': self.DATE_PRESENT,
  321. }
  322. request = create_request(request_headers)
  323. self.assertEqual(archive.get(request), not_modified_response)
  324. request_headers = {
  325. 'if-none-match': self.ETAG_INVALID,
  326. 'if-modified-since': self.DATE_INVALID,
  327. }
  328. request = create_request(request_headers)
  329. self.assertEqual(archive.get(request), response)
  330. def test_response__TrimHeaders(self):
  331. response = httparchive.ArchivedHttpResponse
  332. header1 = [('access-control-allow-origin', '*'),
  333. ('content-type', 'image/jpeg'),
  334. ('content-length', 2878)]
  335. self.assertEqual(response._TrimHeaders(header1), header1)
  336. header2 = [('content-type', 'text/javascript; charset=utf-8'),
  337. ('connection', 'keep-alive'),
  338. ('cache-control', 'private, must-revalidate, max-age=0'),
  339. ('content-encoding', 'gzip')]
  340. self.assertEqual(response._TrimHeaders(header2), header2)
  341. header3 = [('content-security-policy', """\
  342. default-src 'self' http://*.cnn.com:* https://*.cnn.com:* \
  343. *.cnn.net:* *.turner.com:* *.ugdturner.com:* *.vgtf.net:*; \
  344. script-src 'unsafe-inline' 'unsafe-eval' 'self' *; \
  345. style-src 'unsafe-inline' 'self' *; frame-src 'self' *; \
  346. object-src 'self' *; img-src 'self' * data:; media-src 'self' *; \
  347. font-src 'self' *; connect-src 'self' *"""),
  348. ('access-control-allow-origin', '*'),
  349. ('content-type', 'text/html; charset=utf-8'),
  350. ('content-encoding', 'gzip')]
  351. self.assertEqual(response._TrimHeaders(header3), [
  352. ('access-control-allow-origin', '*'),
  353. ('content-type', 'text/html; charset=utf-8'),
  354. ('content-encoding', 'gzip')
  355. ])
  356. header4 = [('content-security-policy', """\
  357. default-src * data: blob:;script-src *.facebook.com *.fbcdn.net \
  358. *.facebook.net *.google-analytics.com *.virtualearth.net *.google.com \
  359. 127.0.0.1:* *.spotilocal.com:* 'unsafe-inline' 'unsafe-eval' \
  360. fbstatic-a.akamaihd.net fbcdn-static-b-a.akamaihd.net *.atlassolutions.com \
  361. blob: chrome-extension://lifbcibllhkdhoafpjfnlhfpfgnpldfl \
  362. *.liverail.com;style-src * 'unsafe-inline' data:;connect-src *.facebook.com \
  363. *.fbcdn.net *.facebook.net *.spotilocal.com:* *.akamaihd.net \
  364. wss://*.facebook.com:* https://fb.scanandcleanlocal.com:* \
  365. *.atlassolutions.com attachment.fbsbx.com ws://localhost:* \
  366. blob: 127.0.0.1:* *.liverail.com""")]
  367. self.assertEqual(response._TrimHeaders(header4), [])
  368. class ArchivedHttpResponse(unittest.TestCase):
  369. PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT'
  370. PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT' # PAST_DATE_A -1 hour
  371. PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT' # PAST_DATE_A +1 hour
  372. NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT'
  373. NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT' # NOW_DATE_A -1 hour
  374. NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT' # NOW_DATE_A +1 hour
  375. NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A))
  376. def setUp(self):
  377. self.response = create_response([('date', self.PAST_DATE_A)])
  378. def test_update_date_same_date(self):
  379. self.assertEqual(
  380. self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS),
  381. self.NOW_DATE_A)
  382. def test_update_date_before_date(self):
  383. self.assertEqual(
  384. self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
  385. self.NOW_DATE_B)
  386. def test_update_date_after_date(self):
  387. self.assertEqual(
  388. self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS),
  389. self.NOW_DATE_C)
  390. def test_update_date_bad_date_param(self):
  391. self.assertEqual(
  392. self.response.update_date('garbage date', now=self.NOW_SECONDS),
  393. 'garbage date')
  394. def test_update_date_bad_date_header(self):
  395. self.response.set_header('date', 'garbage date')
  396. self.assertEqual(
  397. self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
  398. self.PAST_DATE_B)
  399. if __name__ == '__main__':
  400. unittest.main()