Source code for okcupyd.session

import logging
import random
import time

import requests

from . import profile
from . import settings
from . import util
from .errors import AuthenticationError


log = logging.getLogger(__name__)


[docs]class Session(object): """A `requests.Session` with convenience methods for interacting with okcupid.com """ default_login_headers = { 'user-agent': ('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/37.0.2062.94 ' 'Safari/537.36') } @classmethod
[docs] def login( cls, username=None, password=None, requests_session=None, rate_limit=None ): """Get a session that has authenticated with okcupid.com. If no username and password is supplied, the ones stored in :class:`okcupyd.settings` will be used. :param username: The username to log in with. :type username: str :param password: The password to log in with. :type password: str :param rate_limit: Average time in seconds to wait between requests to OKC. :type rate_limit: float """ requests_session = requests_session or requests.Session() session = cls(requests_session, rate_limit) # settings.USERNAME and settings.PASSWORD should not be made # the defaults to their respective arguments because doing so # would prevent this function from picking up any changes made # to those values after import time. username = username or settings.USERNAME password = password or settings.PASSWORD session.do_login(username, password) return session
def __init__(self, requests_session, rate_limit=None): self._requests_session = requests_session self.log_in_name = None if isinstance(rate_limit, RateLimiter): self.rate_limiter = rate_limit else: self.rate_limiter = RateLimiter(rate_limit) def __getattr__(self, name): return getattr(self._requests_session, name)
[docs] def do_login(self, username, password): credentials = { 'username': username, 'password': password, 'okc_api': 1 } login_response = self.okc_post('login', data=credentials, headers=self.default_login_headers, secure=True) login_json = login_response.json() log_in_name = login_json['screenname'] if log_in_name is None: raise AuthenticationError(u'Could not log in as {0}'.format(username)) if log_in_name.lower() != username.lower(): log.warning(u'Expected to log in as {0} but ' u'got {1}'.format(username, log_in_name)) log.debug(login_response.content.decode('utf8')) self.access_token = login_json.get("oauth_accesstoken") self.log_in_name = log_in_name self.headers.update(self.default_login_headers)
[docs] def build_path(self, path, secure=None): if secure is None: secure = ('secure_login' in self.cookies and int(self.cookies['secure_login']) != 0) return u'{0}://{1}/{2}'.format('https' if secure else 'http', util.DOMAIN, path)
[docs] def get_profile(self, username): """Get the profile associated with the supplied username :param username: The username of the profile to retrieve.""" return profile.Profile(self, username)
[docs] def get_current_user_profile(self): """Get the `okcupyd.profile.Profile` associated with the supplied username. :param username: The username of the profile to retrieve. """ return self.get_profile(self.log_in_name)
[docs]class RateLimiter(object): def __init__(self, rate_limit, wait_std_dev=None): self.rate_limit = rate_limit if rate_limit is not None and wait_std_dev is None: wait_std_dev = float(rate_limit) / 5 self.wait_std_dev = wait_std_dev self.last_request = None
[docs] def wait(self): if self.rate_limit is None: return if self.last_request is not None: wait_time = random.gauss(self.rate_limit, self.wait_std_dev) elapsed = time.time() - self.last_request if elapsed < wait_time: time.sleep(wait_time - elapsed) self.last_request = time.time()
[docs]def build_okc_method(method_name): def okc_method(self, path, secure=None, **kwargs): base_method = getattr(self, method_name) self.rate_limiter.wait() response = base_method(self.build_path(path, secure), **kwargs) response.raise_for_status() return response return okc_method
for method_name in ('get', 'put', 'post', 'delete'): setattr(Session, 'okc_{0}'.format(method_name), build_okc_method(method_name))