Source code for imap_storage.storage.email.email

''' Email and Attachment class '''
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate
from copy import deepcopy

from .head import Head
from .body import Body
from .file import file_from_payload, file_from_xml


[docs]class Email: # :TODO: # pylint: disable=too-many-public-methods # pylint: disable=too-many-instance-attributes """Emails as data storage interface :param imap: Imap connection :param uid: new object if None - make sure to run *new* method """ def __init__(self, directory, uid): self.directory = directory self.uid = uid self._subject = None self._head = None self._body = None self._files = None @property def path(self): """path of the email Returns: str: directory path/uid """ return self.directory.path + '/' + str(self.uid) @property def subject(self): """subject of the email It is a cached value and fetches one time when accessing the property Returns: str: subject """ if self._subject is None: self.directory.fetch_subjects() return self._subject @property def name(self): """name of the email Returns: str: name """ tag = self.directory.imap.config.tag if not tag: raise AttributeError name = self.subject if not self.subject: raise AttributeError("Couldn't fetch subject when needed") if self.subject.startswith(tag): tag = self.directory.imap.config.tag name = self.subject[len(tag):].strip() return name @subject.setter def subject(self, subject): self._subject = subject @property def head(self): """access to the head object, fetch if not already done""" if not self._head: self._head = self.directory.fetch_head(self) if isinstance(self._head, str): self._head = Head(self._head) return self._head @head.setter def head(self, head): if isinstance(head, Head): self._head = head elif isinstance(head, str): self._head = Head(head)
[docs] def new_head(self, subject, from_addr_obj, to_addr_obj): """Create new message and save it :param subject: subject as string :param from_addr_obj: from-address as *Address* Object :param to_addr_obj: to-address as *Address* Object :returns: self """ head = Head() head['From'] = str(from_addr_obj) head['To'] = str(to_addr_obj) head['Subject'] = subject head['Date'] = formatdate(localtime=True) self.head = head return self.head
@property def body(self): """access to the body object, fetch if not already done""" if not self._body: self.body = self.directory.fetch_body(self) return self._body @body.setter def body(self, body): if isinstance(body, Body): self._body = body elif isinstance(body, str): self._body = Body(body)
[docs] def new_body(self): """create new empty body for this email""" self.body = Body(None).new() return self.body
@property def files(self): """access to the file objects, fetch if not already done""" if self._files is None: self._files = [] self.fetch_payloads() return self._files
[docs] def fetch_payloads(self): """fetch payloads of this email Returns: list: self.files with appended payloads """ if self.uid is not None: payloads = self.directory.fetch_payloads(self) for payload in payloads: if not payload['Content-Type'].startswith( 'multipart/alternative;'): self.files.append(file_from_payload(self, payload)) return self.files
@property def xml_files(self): """ :returns: list of files that are registered in xml body """ files = [file_from_xml(self, file) for file in self.body.get_by_tag('file')] return sorted(files) @property def plain(self): """get the whole email with files as plain text""" return self.to_string(html=False) @property def html(self): """get the whole email with files as html""" return self.to_string(html=True)
[docs] def to_string(self, html=False): """ https://stackoverflow.com/a/43157340 mixed |---alternative | |---text | |---related | |---html | |---inline image | |---inline image |---attachment |---attachment """ msg = deepcopy(self.head) msg_alt = MIMEMultipart('alternative') body = MIMEText(None, 'plain', 'utf-8') body.replace_header('content-transfer-encoding', 'quoted-printable') body.set_payload(str(self.body), 'utf-8') msg_alt.attach(body) if html: msg_rel = MIMEMultipart('related') body_html = MIMEText(None, 'html', 'utf-8') body_html.replace_header( 'content-transfer-encoding', 'quoted-printable') body_html.set_payload(str(self.body), 'utf-8') msg_rel.attach(body_html) msg_alt.attach(msg_rel) msg.attach(msg_alt) files = self.files for file in files: msg.attach(file.mime_obj) return str(msg)
[docs] def file_by_name(self, name): """get file by name Args: name(str): of the file to get Returns: File: object """ return [file for file in self.files if file.name == name][0]
[docs] def file_by_id(self, id_): """get file by id Id is saved in the body (xml) Args: id_(str): id of the file to get Returns: File: object """ for file in self.xml_files: if file.id_ == id_: return self.file_by_name(file.name) return False
[docs] def add_item(self, tag, text=None, attribs=None, parent=None): """forwards to body method""" return self.body.add_item( tag, text=text, attribs=attribs, parent=parent )
# def remove_item(self, id_): # """forwards to body method""" # self.body.remove_item(id_)
[docs] def add_file(self, file_obj): """add file to email :param file_obj: Object of class storage.file.File :returns: success bool """ if file_obj.name not in [file.name for file in self.files]: # genexp attribs = { 'name': file_obj.name, 'size': file_obj.size, 'mime': file_obj.mime, } if file_obj.time: attribs['time'] = file_obj.time child = self.add_item('file', attribs=attribs) file_obj.id_ = child.attrib['id'] self.files.append(file_obj) #self.save() return file_obj in self.files
[docs] def remove_file_by_attrib(self, attrib, value): """remove file from email eg. remove_file_by_attrib('id', 'ldsKLfds') :param attrib: attribute to select :param value: value of the attribute """ for bad in self.body.xml.xpath( "//*[@{}=\'{}\']".format(attrib, value)): name = bad.get('name') bad.getparent().remove(bad) for file in self.files: if file.name == name: self.files.remove(file) self.save()
[docs] def remove_file(self, file): """removes file object from email Args: file(File): File object to delete from email """ self.remove_file_by_attrib('name', file.name)
[docs] def save(self): """Produce new Email from body, head and files, save it, delete old""" self.uid = int(self.directory.save_message(self)) self._files = None return self.uid
[docs] def delete(self): """delete this email Returns: bool: True if success """ return self.directory.delete_email(self)
def __hash__(self): return hash((self.uid)) def __eq__(self, other): return self.uid == other.uid def __ne__(self, other): return not self == other def __lt__(self, other): return self.uid < other.uid def __repr__(self): return str(self) def __str__(self): return '{}: {}'.format( self.__class__.__name__, self.uid, )