nzbToMedia/libs/common/rebulk/rules.py
2018-12-16 13:50:27 -05:00

374 lines
11 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Abstract rule class definition and rule engine implementation
"""
from abc import ABCMeta, abstractmethod
import inspect
from itertools import groupby
from logging import getLogger
import six
from .utils import is_iterable
from .toposort import toposort
from . import debug
log = getLogger(__name__).log
@six.add_metaclass(ABCMeta)
class Consequence(object):
"""
Definition of a consequence to apply.
"""
@abstractmethod
def then(self, matches, when_response, context): # pragma: no cover
"""
Action implementation.
:param matches:
:type matches: rebulk.match.Matches
:param context:
:type context:
:param when_response: return object from when call.
:type when_response: object
:return: True if the action was runned, False if it wasn't.
:rtype: bool
"""
pass
@six.add_metaclass(ABCMeta)
class Condition(object):
"""
Definition of a condition to check.
"""
@abstractmethod
def when(self, matches, context): # pragma: no cover
"""
Condition implementation.
:param matches:
:type matches: rebulk.match.Matches
:param context:
:type context:
:return: truthy if rule should be triggered and execute then action, falsy if it should not.
:rtype: object
"""
pass
@six.add_metaclass(ABCMeta)
class CustomRule(Condition, Consequence):
"""
Definition of a rule to apply
"""
# pylint: disable=no-self-use, unused-argument, abstract-method
priority = 0
name = None
dependency = None
properties = {}
def __init__(self, log_level=None):
self.defined_at = debug.defined_at()
if log_level is None and not hasattr(self, 'log_level'):
self.log_level = debug.LOG_LEVEL
def enabled(self, context):
"""
Disable rule.
:param context:
:type context:
:return: True if rule is enabled, False if disabled
:rtype: bool
"""
return True
def __lt__(self, other):
return self.priority > other.priority
def __repr__(self):
defined = ""
if self.defined_at:
defined = "@%s" % (self.defined_at,)
return "<%s%s>" % (self.name if self.name else self.__class__.__name__, defined)
def __eq__(self, other):
return self.__class__ == other.__class__
def __hash__(self):
return hash(self.__class__)
class Rule(CustomRule):
"""
Definition of a rule to apply
"""
# pylint:disable=abstract-method
consequence = None
def then(self, matches, when_response, context):
assert self.consequence
if is_iterable(self.consequence):
if not is_iterable(when_response):
when_response = [when_response]
iterator = iter(when_response)
for cons in self.consequence: #pylint: disable=not-an-iterable
if inspect.isclass(cons):
cons = cons()
cons.then(matches, next(iterator), context)
else:
cons = self.consequence
if inspect.isclass(cons):
cons = cons() # pylint:disable=not-callable
cons.then(matches, when_response, context)
class RemoveMatch(Consequence): # pylint: disable=abstract-method
"""
Remove matches returned by then
"""
def then(self, matches, when_response, context):
if is_iterable(when_response):
ret = []
when_response = list(when_response)
for match in when_response:
if match in matches:
matches.remove(match)
ret.append(match)
return ret
if when_response in matches:
matches.remove(when_response)
return when_response
class AppendMatch(Consequence): # pylint: disable=abstract-method
"""
Append matches returned by then
"""
def __init__(self, match_name=None):
self.match_name = match_name
def then(self, matches, when_response, context):
if is_iterable(when_response):
ret = []
when_response = list(when_response)
for match in when_response:
if match not in matches:
if self.match_name:
match.name = self.match_name
matches.append(match)
ret.append(match)
return ret
if self.match_name:
when_response.name = self.match_name
if when_response not in matches:
matches.append(when_response)
return when_response
class RenameMatch(Consequence): # pylint: disable=abstract-method
"""
Rename matches returned by then
"""
def __init__(self, match_name):
self.match_name = match_name
self.remove = RemoveMatch()
self.append = AppendMatch()
def then(self, matches, when_response, context):
removed = self.remove.then(matches, when_response, context)
if is_iterable(removed):
removed = list(removed)
for match in removed:
match.name = self.match_name
elif removed:
removed.name = self.match_name
if removed:
self.append.then(matches, removed, context)
class AppendTags(Consequence): # pylint: disable=abstract-method
"""
Add tags to returned matches
"""
def __init__(self, tags):
self.tags = tags
self.remove = RemoveMatch()
self.append = AppendMatch()
def then(self, matches, when_response, context):
removed = self.remove.then(matches, when_response, context)
if is_iterable(removed):
removed = list(removed)
for match in removed:
match.tags.extend(self.tags)
elif removed:
removed.tags.extend(self.tags) # pylint: disable=no-member
if removed:
self.append.then(matches, removed, context)
class RemoveTags(Consequence): # pylint: disable=abstract-method
"""
Remove tags from returned matches
"""
def __init__(self, tags):
self.tags = tags
self.remove = RemoveMatch()
self.append = AppendMatch()
def then(self, matches, when_response, context):
removed = self.remove.then(matches, when_response, context)
if is_iterable(removed):
removed = list(removed)
for match in removed:
for tag in self.tags:
if tag in match.tags:
match.tags.remove(tag)
elif removed:
for tag in self.tags:
if tag in removed.tags: # pylint: disable=no-member
removed.tags.remove(tag) # pylint: disable=no-member
if removed:
self.append.then(matches, removed, context)
class Rules(list):
"""
list of rules ready to execute.
"""
def __init__(self, *rules):
super(Rules, self).__init__()
self.load(*rules)
def load(self, *rules):
"""
Load rules from a Rule module, class or instance
:param rules:
:type rules:
:return:
:rtype:
"""
for rule in rules:
if inspect.ismodule(rule):
self.load_module(rule)
elif inspect.isclass(rule):
self.load_class(rule)
else:
self.append(rule)
def load_module(self, module):
"""
Load a rules module
:param module:
:type module:
:return:
:rtype:
"""
# pylint: disable=unused-variable
for name, obj in inspect.getmembers(module,
lambda member: hasattr(member, '__module__')
and member.__module__ == module.__name__
and inspect.isclass):
self.load_class(obj)
def load_class(self, class_):
"""
Load a Rule class.
:param class_:
:type class_:
:return:
:rtype:
"""
self.append(class_())
def execute_all_rules(self, matches, context):
"""
Execute all rules from this rules list. All when condition with same priority will be performed before
calling then actions.
:param matches:
:type matches:
:param context:
:type context:
:return:
:rtype:
"""
ret = []
for priority, priority_rules in groupby(sorted(self), lambda rule: rule.priority):
sorted_rules = toposort_rules(list(priority_rules)) # Group by dependency graph toposort
for rules_group in sorted_rules:
rules_group = list(sorted(rules_group, key=self.index)) # Sort rules group based on initial ordering.
group_log_level = None
for rule in rules_group:
if group_log_level is None or group_log_level < rule.log_level:
group_log_level = rule.log_level
log(group_log_level, "%s independent rule(s) at priority %s.", len(rules_group), priority)
for rule in rules_group:
when_response = execute_rule(rule, matches, context)
if when_response is not None:
ret.append((rule, when_response))
return ret
def execute_rule(rule, matches, context):
"""
Execute the given rule.
:param rule:
:type rule:
:param matches:
:type matches:
:param context:
:type context:
:return:
:rtype:
"""
if rule.enabled(context):
log(rule.log_level, "Checking rule condition: %s", rule)
when_response = rule.when(matches, context)
if when_response:
log(rule.log_level, "Rule was triggered: %s", when_response)
log(rule.log_level, "Running rule consequence: %s %s", rule, when_response)
rule.then(matches, when_response, context)
return when_response
else:
log(rule.log_level, "Rule is disabled: %s", rule)
def toposort_rules(rules):
"""
Sort given rules using toposort with dependency parameter.
:param rules:
:type rules:
:return:
:rtype:
"""
graph = {}
class_dict = {}
for rule in rules:
if rule.__class__ in class_dict:
raise ValueError("Duplicate class rules are not allowed: %s" % rule.__class__)
class_dict[rule.__class__] = rule
for rule in rules:
if not is_iterable(rule.dependency) and rule.dependency:
rule_dependencies = [rule.dependency]
else:
rule_dependencies = rule.dependency
dependencies = set()
if rule_dependencies:
for dependency in rule_dependencies:
if inspect.isclass(dependency):
dependency = class_dict.get(dependency)
if dependency:
dependencies.add(dependency)
graph[rule] = dependencies
return toposort(graph)