Source code for okcupyd.filter

import inspect
import itertools

import six

from . import util
from . import magicnumbers


[docs]def all_not_none_decider(function, incoming, accepted_keys): return all(incoming.get(key) is not None for key in accepted_keys)
[docs]def always_decider(*args, **kwargs): return True
[docs]class Filters(object): """Registrar for functions that construct filters for submission in requests to okcupid.com """ def __init__(self, strict=True): self.builders = [] self.keys = set() self._key_to_type = {} self._key_to_values = {} self._key_to_string = {} self._strict = strict @util.cached_property def filter_meta(filters_instance): class FilterMeta(util.decorate_all(staticmethod)): acceptable_values = None keys = () types = None descriptions = None output_key = None def __init__(cls, name, bases, attributes_dict): super(FilterMeta, cls).__init__(name, bases, attributes_dict) cls._set_defaults() filters_instance.register_builder(cls) def decide(cls, kwargs): return all_not_none_decider(cls.transform, kwargs, cls.keys) def transform_from_kwargs(cls, kwargs): return cls.transform(*[kwargs.get(key) for key in cls.keys]) def _set_defaults(cls): if isinstance(cls.keys, six.string_types): cls.keys = [cls.keys] if not hasattr(cls, 'transform'): cls.transform = staticmethod(lambda x: x) function_arguments = inspect.getargspec(cls.transform).args if cls.keys: assert len(cls.keys) == len(function_arguments) else: cls.keys = function_arguments if not cls.output_key: cls.output_key = cls.keys[0] return FilterMeta @util.cached_property def filter_class(self): return six.with_metaclass(self.filter_meta)
[docs] def build_documentation_lines(self): """Build a parameter documentation string that can appended to the docstring of a function that uses this :class:`~.Filters` instance to build filters. """ return [ line_string for key in sorted(self.keys) for line_string in self.build_paramter_string(key) ]
[docs] def build_paramter_string(self, key): description_string = u'' if key in self._key_to_string: description_string = u' {0}'.format(self._key_to_string[key]) if key in self._key_to_values: description_string += u' expected values: {0}'.format( u', '.join([repr(value) for value in self._key_to_values[key]]) ) parameter_string_lines = [u':param {0}:{1}'.format( key, description_string )] if key in self._key_to_type: the_type = self._key_to_type[key] parameter_string_lines.append(u':type {0}: {1}'.format( key, the_type.__name__ if isinstance(the_type, type) else the_type )) return parameter_string_lines
[docs] def add_to_docstring_of(self, target): target.__doc__ = '\n '.join( itertools.chain( (target.__doc__,), self.build_documentation_lines() ) )
all_not_none_decider = staticmethod(all_not_none_decider) @staticmethod
[docs] def any_decider(function, incoming, accepted_keys): return bool(set(incoming).intersection(accepted_keys))
@staticmethod
[docs] def all_decider(function, incoming, accepted_keys): return set(accepted_keys).issubset(set(incoming))
@staticmethod
[docs] def any_not_none_decider(function, incoming, accepted_keys): return any(incoming.get(key) is not None for key in accepted_keys)
def __repr__(self): return "{0}({1})".format(type(self).__name__, repr(self.builder_to_keys.keys()))
[docs] def filters(self, **kwargs): builders = [ builder for builder in self.builders if self._handle_decide(builder, kwargs) ] return [ builder.transform( *[kwargs.get(key) for key in builder.keys] ) for builder in builders ]
def _handle_decide(self, builder, kwargs): if len(inspect.getargspec(builder.decide).args) == 2: return builder.decide(kwargs) else: return builder.decide(builder.transform, kwargs, builder.keys) def _validate_incoming(self, kwargs): if self._strict and not self.keys.issuperset(kwargs.keys()): raise TypeError("build() got unexpected keyword arguments: " "{0}".format(', '.join( repr(k) for k in kwargs.keys() if k not in self.keys )))
[docs] def build(self, **kwargs): self._validate_incoming(kwargs) return { builder.output_key: builder.transform_from_kwargs(kwargs) for builder in self.builders if builder.decide(kwargs) }
[docs] def legacy_build(self, **kwargs): self._validate_incoming(kwargs) return { u'filter{0}'.format(filter_number): filter_string for filter_number, filter_string in enumerate(self.filters(**kwargs), 1) }
[docs] def register_builder(self, filter_object): self.builders.append(filter_object) self.keys.update(filter_object.keys) self._update_docs_dict( self._key_to_type, filter_object.types, filter_object.keys ) self._update_docs_dict( self._key_to_string, filter_object.descriptions, filter_object.keys ) self._update_docs_dict( self._key_to_values, filter_object.acceptable_values, filter_object.keys )
@util.curry def register_filter_builder(self, function, **kwargs): """Register a filter function with this :class:`~.Filters` instance. This function is curried with :class:`~okcupyd.util.currying.curry` -- that is, it can be invoked partially before it is fully evaluated. This allows us to pass kwargs to this function when it is used as a decorator: .. code-block:: python @register_filter_builder(keys=('real_name',), decider=Filters.any_decider) def my_filter_function(argument): return '4,{0}'.format(argument) :param function: The filter function to register. :param keys: Keys that should be used as the argument names for `function`, if none are provided, the filter functions argument names will be used instead. :param decider: a function of signature `(function, incoming_keys, accepted_keys)` that returns True if the filter function should be called and False otherwise. Defaults to :meth:`~.all_not_none_decider` :param acceptable_values: A list of acceptable values for the parameter of the filter function (or a list of lists if the filter function takes multiple parameters) :param types: The type of the parameter accepted by the incoming filter function (or a list of types if the function takes multiple parameters) :param descriptions: A description for the incoming filter function's argument (or a list of descriptions if the filter function takes multiple arguments) :param output_key: The key to use to output the provided value. Will default to the only value in keys if keys has length 1. """ kwargs['transform'] = function if kwargs.get('decider'): kwargs['decide'] = kwargs.get('decider') return type('filter', (self.filter_class,), kwargs) def _update_docs_dict(self, docs_dict, incoming, keys): if incoming: if not isinstance(incoming, dict): if len(keys) > 1: assert len(keys) == len(incoming), ( "Got {0}, for keys: {1}".format(incoming, keys) ) incoming = zip(keys, incoming) else: incoming = {key: incoming for key in keys} docs_dict.update(incoming)
[docs]def gentation_filter(gentation): return u'0,{0}'.format( magicnumbers.gentation_to_number[gentation.strip().lower()] )
[docs]def age_filter(age_min=18, age_max=99): if age_min == None: age_min = 18 return u'2,{0},{1}'.format(age_min, age_max)
[docs]def location_filter(radius): return u'3,{0}'.format(radius)