Source code for okcupyd.messaging

import logging

from lxml import html
from requests import exceptions
import simplejson

from . import errors
from . import helpers
from . import util
from .xpath import XPathBuilder, xpb


log = logging.getLogger(__name__)
thread_element_xpath = XPathBuilder(relative=False).li.with_classes('thread',
                                                                    'message')


[docs]def ThreadFetcher(session, mailbox_number): return util.FetchMarshall( ThreadHTMLFetcher(session, mailbox_number), util.SimpleProcessor( session, lambda elem: MessageThread(session, elem), thread_element_xpath ) )
[docs]class ThreadHTMLFetcher(object): def __init__(self, session, mailbox_number): self._session = session self._mailbox_number = mailbox_number def _query_params(self, start_at): return { 'low': start_at, 'folder': self._mailbox_number, 'infiniscroll': 1 }
[docs] def fetch(self, start_at): response = self._session.okc_get('messages', params=self._query_params(start_at)) return response.content.strip()
def __repr__(self): return '{0}(mailbox_number={1})'.format(type(self).__name__, self._mailbox_number)
[docs]class MessageFetcher(object): def __init__(self, session, message_thread, read_messages=False): self._session = session self._read_messages = read_messages self._message_thread = message_thread @property def params(self): return { 'readmsg': str(self._read_messages).lower(), 'threadid': self._message_thread.id, 'folder': 1 } @util.cached_property def messages_tree(self): messages_response = self._session.okc_get('messages', params=self.params) return html.fromstring(messages_response.content.decode('utf8'))
[docs] def refresh(self): util.cached_property.bust_caches(self) return self.messages_tree
[docs] def fetch(self): for message_element in self.message_elements: if message_element.attrib['id'] == 'compose': continue yield Message(message_element, self._message_thread)
_message_elements_xpb = xpb.li.with_classes('to_me', 'from_me').or_ @util.cached_property def message_elements(self): return self._message_elements_xpb.apply_(self.messages_tree)
_base_timestamp_xpb = ( xpb.span.with_class('timestamp').span.with_class('fancydate') ) _timestamp_xpb = _base_timestamp_xpb.text_ _em_timestamp_xpb = _base_timestamp_xpb.em.text_
[docs]class Message(object): """Represent a message sent on okcupid.com""" def __init__(self, message_element, message_thread): self._message_element = message_element self._message_thread = message_thread @property def id(self): """ :returns: The id assigned to this message by okcupid.com. """ return int(self._message_element.attrib['id'].split('_')[-1]) @util.cached_property def sender(self): """ :returns: A :class:`~okcupyd.profile.Profile` instance belonging to the sender of this message. """ return (self._message_thread.user_profile if 'from_me' in self._message_element.attrib['class'] else self._message_thread.correspondent_profile) @util.cached_property def recipient(self): """ :returns: A :class:`~okcupyd.profile.Profile` instance belonging to the recipient of this message. """ return (self._message_thread.correspondent_profile if 'from_me' in self._message_element.attrib['class'] else self._message_thread.user_profile) _content_xpb = xpb.div.with_class('message_body') @util.cached_property def content(self): """ :returns: The text body of the message. """ # The code that follows is obviously pretty disgusting. # It seems like it might be impossible to completely replicate # the text of the original message if it has trailing whitespace message = self._content_xpb.one_(self._message_element) first_line = message.text if message.text[:2] == ' ': first_line = message.text[2:] else: log.debug("message did not have expected leading whitespace") subsequent_lines = ''.join([ html.tostring(child, encoding='unicode').replace('<br>', '\n') for child in message.iterchildren() ]) message_text = first_line + subsequent_lines if len(message_text) > 0 and message_text[-1] == ' ': message_text = message_text[:-1] else: log.debug("message did not have expected leading whitespace") return message_text @util.cached_property def time_sent(self): try: timestamp_text = _timestamp_xpb.one_(self._message_element) except IndexError: timestamp_text = _em_timestamp_xpb.one_(self._message_element) return helpers.parse_date_updated(timestamp_text) def __repr__(self): return '<{0}: {1} sent {2} "{3}{4}">'.format( type(self).__name__, self.sender.username, self.recipient.username, self.content[:10], '...' if len(self.content) > 10 else '' )
[docs]class MessageThread(object): """Represent a message thread between two users.""" @classmethod
[docs] def delete_threads(cls, session, thread_ids_or_threads, authcode=None): """ :param session: A logged in :class:`~okcupyd.session.Session`. :param thread_ids_or_threads: A list whose members are either :class:`~.MessageThread` instances or okc_ids of message threads. :param authcode: Authcode to use for this request. If none is provided A request to the logged in user's messages page will be made to retrieve one. """ thread_ids = [thread.id if isinstance(thread, cls) else thread for thread in thread_ids_or_threads] if not authcode: authcode = helpers.get_authcode(html.fromstring( session.okc_get('messages').content )) data = {'access_token': authcode, 'threadids': simplejson.dumps(thread_ids)} return session.okc_delete('apitun/messages/threads', params=data, data=data)
def __init__(self, session, thread_element): self._session = session self._thread_element = thread_element self.reply = self.correspondent_profile.message(thread_id=self.id) self._message_fetcher = MessageFetcher(self._session, self) #: A :class:`~okcupyd.util.fetchable.Fetchable` of :class:`~.Message` #: objects. self.messages = util.Fetchable(self._message_fetcher) @util.cached_property def id(self): """ :returns: The id assigned to this message by okcupid.com. """ return self._thread_element.attrib['data-threadid'] @util.cached_property def correspondent_id(self): """ :returns: The id assigned to the correspondent of this message. """ try: return int(self._thread_element.attrib['data-personid']) except (ValueError, KeyError): try: return int(self.correspondent_profile.id) except: pass _correspondent_xpb = xpb.div.with_class('inner').a.with_class('open').\ span.with_class('subject').text_ @util.cached_property def correspondent(self): """ :returns: The username of the user with whom the logged in user is conversing in this :class:`~.MessageThread`. """ try: return self._correspondent_xpb.one_(self._thread_element).strip() except IndexError: raise errors.NoCorrespondentError() @util.cached_property def read(self): """ :returns: Whether or not the user has read all the messages in this :class:`~.MessageThread`. """ return not 'unreadMessage' in self._thread_element.attrib['class'] @util.cached_property def date(self): return self.datetime.date() @util.cached_property def datetime(self): return helpers.parse_date_updated( _timestamp_xpb.one_(self._thread_element) ) @property def with_deleted_user(self): try: self.correspondent_profile.id except exceptions.HTTPError: return True else: return False @property def initiator(self): """ :returns: A :class:`~okcupyd.profile.Profile` instance belonging to the initiator of this :class:`~.MessageThread`. """ try: return self.messages[0].sender except IndexError: pass @property def respondent(self): """ :returns: A :class:`~okcupyd.profile.Profile` instance belonging to the respondent of this :class:`~.MessageThread`. """ try: return self.messages[0].recipient except IndexError: pass @util.cached_property def correspondent_profile(self): """ :returns: The :class:`~okcupyd.profile.Profile` of the user with whom the logged in user is conversing in this :class:`~.MessageThread`. """ return self._session.get_profile(self.correspondent) @util.cached_property def user_profile(self): """ :returns: A :class:`~okcupyd.profile.Profile` belonging to the logged in user. """ return self._session.get_current_user_profile() @property def message_count(self): return len(self.messages) @property def has_messages(self): return bool(self.messages) @property def got_response(self): """ :returns: Whether or not the :class:`~.MessageThread`. has received a response. """ return any(message.sender != self.initiator for message in self.messages)
[docs] def delete(self): """Delete this thread for the logged in user.""" return self.delete_threads(self._session, [self.id])
def __hash__(self): return hash(self.id) def __eq__(self, other): return self.id == other.id def __repr__(self): return '<{0}({1}, {2})>'.format( type(self).__name__, self.user_profile.username, self.correspondent_profile.username )