| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- # Copyright 2015 Google Inc. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- r"""Rules parser.
- The input syntax is:
- [{"comment": ignored_value},
- {"rule_class_name1": {"arg1": value, "arg2": value, ...}},
- {"rule_class_name2": {"arg1": value, "arg2": value, ...}},
- ...]
- E.g.:
- [{"comment": "this text is ignored"},
- {"SendStatus": {"url": "example\\.com/ss.*", "status": 204}},
- {"ModifyUrl": {"url": "(example\\.com)(/.*)", "new_url": "{1}"}}
- ]
- """
- import json
- import re
- class Error(Exception):
- pass
- class Rules(object):
- """A parsed sequence of Rule objects."""
- def __init__(self, file_obj=None, allowed_imports=None):
- """Initializes from the given file object.
- Args:
- file_obj: A file object.
- allowed_imports: A set of strings, defaults to {'rules'}.
- Use {'*'} to allow any import path.
- """
- if allowed_imports is None:
- allowed_imports = {'rules'}
- self._rules = [] if file_obj is None else _Load(file_obj, allowed_imports)
- def Contains(self, rule_type_name):
- """Returns true if any rule matches the given type name.
- Args:
- rule_type_name: a string.
- Returns:
- True if any rule matches, else False.
- """
- return any(rule for rule in self._rules if rule.IsType(rule_type_name))
- def Find(self, rule_type_name):
- """Returns a _Rule object containing all rules with the given type name.
- Args:
- rule_type_name: a string.
- Returns:
- A callable object that expects two arguments:
- request: the httparchive ArchivedHttpRequest
- response: the httparchive ArchivedHttpResponse
- and returns the rule return_value of the first rule that returns
- should_stop == True, or the last rule's return_value if all rules returns
- should_stop == False.
- """
- matches = [rule for rule in self._rules if rule.IsType(rule_type_name)]
- return _Rule(matches)
- def __str__(self):
- return _ToString(self._rules)
- def __repr__(self):
- return str(self)
- class _Rule(object):
- """Calls a sequence of Rule objects until one returns should_stop."""
- def __init__(self, rules):
- self._rules = rules
- def __call__(self, request, response):
- """Calls the rules until one returns should_stop.
- Args:
- request: the httparchive ArchivedHttpRequest.
- response: the httparchive ArchivedHttpResponse, which may be None.
- Returns:
- The rule return_value of the first rule that returns should_stop == True,
- or the last rule's return_value if all rules return should_stop == False.
- """
- return_value = None
- for rule in self._rules:
- should_stop, return_value = rule.ApplyRule(
- return_value, request, response)
- if should_stop:
- break
- return return_value
- def __str__(self):
- return _ToString(self._rules)
- def __repr__(self):
- return str(self)
- def _ToString(rules):
- """Formats a sequence of Rule objects into a string."""
- return '[\n%s\n]' % '\n'.join('%s' % rule for rule in rules)
- def _Load(file_obj, allowed_imports):
- """Parses and evaluates all rules in the given file.
- Args:
- file_obj: a file object.
- allowed_imports: a sequence of strings, e.g.: {'rules'}.
- Returns:
- a list of rules.
- """
- rules = []
- entries = json.load(file_obj)
- if not isinstance(entries, list):
- raise Error('Expecting a list, not %s', type(entries))
- for i, entry in enumerate(entries):
- if not isinstance(entry, dict):
- raise Error('%s: Expecting a dict, not %s', i, type(entry))
- if len(entry) != 1:
- raise Error('%s: Expecting 1 item, not %d', i, len(entry))
- name, args = next(entry.iteritems())
- if not isinstance(name, basestring):
- raise Error('%s: Expecting a string TYPE, not %s', i, type(name))
- if not re.match(r'(\w+\.)*\w+$', name):
- raise Error('%s: Expecting a classname TYPE, not %s', i, name)
- if name == 'comment':
- continue
- if not isinstance(args, dict):
- raise Error('%s: Expecting a dict ARGS, not %s', i, type(args))
- fullname = str(name)
- if '.' not in fullname:
- fullname = 'rules.%s' % fullname
- modulename, classname = fullname.rsplit('.', 1)
- if '*' not in allowed_imports and modulename not in allowed_imports:
- raise Error('%s: Package %r is not in allowed_imports', i, modulename)
- module = __import__(modulename, fromlist=[classname])
- clazz = getattr(module, classname)
- missing = {s for s in ('IsType', 'ApplyRule') if not hasattr(clazz, s)}
- if missing:
- raise Error('%s: %s lacks %s', i, clazz.__name__, ' and '.join(missing))
- rule = clazz(**args)
- rules.append(rule)
- return rules
|