Source code for okcupyd.filter

import inspect

from . import util
from . import magicnumbers


[docs]class Filters(object): """Registrar for functions that construct filters for submission in requests to okcupid.com """ def __init__(self): self.builder_to_keys = {} self.builder_to_decider = {} self.keys = set() self._key_to_type = {} self._key_to_values = {} self._key_to_string = {}
[docs] def build_documentation_lines(self): """Build a parameter documentation string that can appended to the docstring of a function that uses this :class:`~.Filters` instance to build filters. """ return [ line_string for key in sorted(self.keys) for line_string in self.build_paramter_string(key) ]
[docs] def build_paramter_string(self, key): description_string = u'' if key in self._key_to_string: description_string = u' {0}'.format(self._key_to_string[key]) if key in self._key_to_values: description_string += u' expected values: {0}'.format( u', '.join([repr(value) for value in self._key_to_values[key]]) ) parameter_string_lines = [u':param {0}:{1}'.format( key, description_string )] if key in self._key_to_type: the_type = self._key_to_type[key] parameter_string_lines.append(u':type {0}: {1}'.format( key, the_type.__name__ if isinstance(the_type, type) else the_type )) return parameter_string_lines
@staticmethod
[docs] def any_decider(function, incoming, accepted_keys): return bool(set(incoming).intersection(accepted_keys))
@staticmethod
[docs] def all_decider(function, incoming, accepted_keys): return set(accepted_keys).issubset(set(incoming))
@staticmethod
[docs] def all_not_none_decider(function, incoming, accepted_keys): return all(incoming.get(key) is not None for key in accepted_keys)
@staticmethod
[docs] def any_not_none_decider(function, incoming, accepted_keys): return any(incoming.get(key) is not None for key in accepted_keys)
def __repr__(self): return "{0}({1})".format(type(self).__name__, repr(self.builder_to_keys.keys()))
[docs] def filters(self, **kwargs): builders = [builder for builder, decider in self.builder_to_decider.items() if decider(builder, kwargs, self.builder_to_keys[builder])] return [builder(*[kwargs.get(key) for key in self.builder_to_keys[builder]]) for builder in builders]
[docs] def build(self, **kwargs): if not self.keys.issuperset(kwargs.keys()): raise TypeError("build() got unexpected keyword arguments: " "{0}".format(', '.join( repr(k) for k in kwargs.keys() if k not in self.keys ))) return { u'filter{0}'.format(filter_number): filter_string for filter_number, filter_string in enumerate(self.filters(**kwargs), 1) }
@util.curry
[docs] def register_filter_builder(self, function, keys=(), decider=None, acceptable_values=None, types=None, descriptions=None): """Register a filter function with this :class:`~.Filters` instance. This function is curried with :class:`~okcupyd.util.currying.curry` -- that is, it can be invoked partially before it is fully evaluated. This allows us to pass kwargs to this function when it is used as a decorator: .. code-block:: python @register_filter_builder(keys=('real_name',), decider=Filters.any_decider) def my_filter_function(argument): return '4,{0}'.format(argument) :param function: The filter function to register. :param keys: Keys that should be used as the argument names for `function`, if none are provided, the filter functions argument names will be used instead. :param decider: a function of signature `(function, incoming_keys, accepted_keys)` that returns True if the filter function should be called and False otherwise. Defaults to :meth:`~.all_not_none_decider` :param acceptable_values: A list of acceptable values for the parameter of the filter function (or a list of lists if the filter function takes multiple parameters) :param types: The type of the parameter accepted by the incoming filter function (or a list of types if the function takes multiple parameters) :param descriptions: A description for the incoming filter function's argument (or a list of descriptions if the filter function takes multiple arguments) """ decider = decider or self.all_not_none_decider function_arguments = inspect.getargspec(function).args if keys: assert len(keys) == len(function_arguments) else: keys = function_arguments self.builder_to_keys[function] = keys self.builder_to_decider[function] = decider self.keys.update(keys) self._update_docs_dict(self._key_to_type, types, keys) self._update_docs_dict(self._key_to_string, descriptions, keys) self._update_docs_dict(self._key_to_values, acceptable_values, keys) return function
def _update_docs_dict(self, docs_dict, incoming, keys): if incoming: if not isinstance(incoming, dict): if len(keys) > 1: assert len(keys) == len(incoming), ( "Got {0}, for keys: {1}".format(incoming, keys) ) incoming = zip(keys, incoming) else: incoming = {key: incoming for key in keys} docs_dict.update(incoming)
[docs]def gentation_filter(gentation): return u'0,{0}'.format( magicnumbers.gentation_to_number[gentation.strip().lower()] )
[docs]def age_filter(age_min=18, age_max=99): if age_min == None: age_min = 18 return u'2,{0},{1}'.format(age_min, age_max)
[docs]def location_filter(radius): return u'3,{0}'.format(radius)