''' Email and Attachment class '''
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate
from copy import deepcopy
from .head import Head
from .body import Body
from .file import file_from_payload, file_from_xml
[docs]class Email:
# :TODO:
# pylint: disable=too-many-public-methods
# pylint: disable=too-many-instance-attributes
"""Emails as data storage interface
:param imap: Imap connection
:param uid: new object if None - make sure to run *new* method
"""
def __init__(self, directory, uid):
self.directory = directory
self.uid = uid
self._subject = None
self._head = None
self._body = None
self._files = None
@property
def path(self):
"""path of the email
Returns:
str: directory path/uid
"""
return self.directory.path + '/' + str(self.uid)
@property
def subject(self):
"""subject of the email
It is a cached value and fetches one time when accessing the property
Returns:
str: subject
"""
if self._subject is None:
self.directory.fetch_subjects()
return self._subject
@property
def name(self):
"""name of the email
Returns:
str: name
"""
tag = self.directory.imap.config.tag
if not tag:
raise AttributeError
name = self.subject
if not self.subject:
raise AttributeError("Couldn't fetch subject when needed")
if self.subject.startswith(tag):
tag = self.directory.imap.config.tag
name = self.subject[len(tag):].strip()
return name
@subject.setter
def subject(self, subject):
self._subject = subject
@property
def head(self):
"""access to the head object, fetch if not already done"""
if not self._head:
self._head = self.directory.fetch_head(self)
if isinstance(self._head, str):
self._head = Head(self._head)
return self._head
@head.setter
def head(self, head):
if isinstance(head, Head):
self._head = head
elif isinstance(head, str):
self._head = Head(head)
[docs] def new_head(self, subject, from_addr_obj, to_addr_obj):
"""Create new message and save it
:param subject: subject as string
:param from_addr_obj: from-address as *Address* Object
:param to_addr_obj: to-address as *Address* Object
:returns: self
"""
head = Head()
head['From'] = str(from_addr_obj)
head['To'] = str(to_addr_obj)
head['Subject'] = subject
head['Date'] = formatdate(localtime=True)
self.head = head
return self.head
@property
def body(self):
"""access to the body object, fetch if not already done"""
if not self._body:
self.body = self.directory.fetch_body(self)
return self._body
@body.setter
def body(self, body):
if isinstance(body, Body):
self._body = body
elif isinstance(body, str):
self._body = Body(body)
[docs] def new_body(self):
"""create new empty body for this email"""
self.body = Body(None).new()
return self.body
@property
def files(self):
"""access to the file objects, fetch if not already done"""
if self._files is None:
self._files = []
self.fetch_payloads()
return self._files
[docs] def fetch_payloads(self):
"""fetch payloads of this email
Returns:
list: self.files with appended payloads
"""
if self.uid is not None:
payloads = self.directory.fetch_payloads(self)
for payload in payloads:
if not payload['Content-Type'].startswith(
'multipart/alternative;'):
self.files.append(file_from_payload(self, payload))
return self.files
@property
def xml_files(self):
"""
:returns: list of files that are registered in xml body
"""
files = [file_from_xml(self, file)
for file in self.body.get_by_tag('file')]
return sorted(files)
@property
def plain(self):
"""get the whole email with files as plain text"""
return self.to_string(html=False)
@property
def html(self):
"""get the whole email with files as html"""
return self.to_string(html=True)
[docs] def to_string(self, html=False):
""" https://stackoverflow.com/a/43157340
mixed
|---alternative
| |---text
| |---related
| |---html
| |---inline image
| |---inline image
|---attachment
|---attachment
"""
msg = deepcopy(self.head)
msg_alt = MIMEMultipart('alternative')
body = MIMEText(None, 'plain', 'utf-8')
body.replace_header('content-transfer-encoding', 'quoted-printable')
body.set_payload(str(self.body), 'utf-8')
msg_alt.attach(body)
if html:
msg_rel = MIMEMultipart('related')
body_html = MIMEText(None, 'html', 'utf-8')
body_html.replace_header(
'content-transfer-encoding', 'quoted-printable')
body_html.set_payload(str(self.body), 'utf-8')
msg_rel.attach(body_html)
msg_alt.attach(msg_rel)
msg.attach(msg_alt)
files = self.files
for file in files:
msg.attach(file.mime_obj)
return str(msg)
[docs] def file_by_name(self, name):
"""get file by name
Args:
name(str): of the file to get
Returns:
File: object
"""
return [file for file in self.files if file.name == name][0]
[docs] def file_by_id(self, id_):
"""get file by id
Id is saved in the body (xml)
Args:
id_(str): id of the file to get
Returns:
File: object
"""
for file in self.xml_files:
if file.id_ == id_:
return self.file_by_name(file.name)
return False
[docs] def add_item(self, tag, text=None, attribs=None, parent=None):
"""forwards to body method"""
return self.body.add_item(
tag,
text=text,
attribs=attribs,
parent=parent
)
# def remove_item(self, id_):
# """forwards to body method"""
# self.body.remove_item(id_)
[docs] def add_file(self, file_obj):
"""add file to email
:param file_obj: Object of class storage.file.File
:returns: success bool
"""
if file_obj.name not in [file.name for file in self.files]: # genexp
attribs = {
'name': file_obj.name,
'size': file_obj.size,
'mime': file_obj.mime,
}
if file_obj.time:
attribs['time'] = file_obj.time
child = self.add_item('file', attribs=attribs)
file_obj.id_ = child.attrib['id']
self.files.append(file_obj)
#self.save()
return file_obj in self.files
[docs] def remove_file_by_attrib(self, attrib, value):
"""remove file from email
eg. remove_file_by_attrib('id', 'ldsKLfds')
:param attrib: attribute to select
:param value: value of the attribute
"""
for bad in self.body.xml.xpath(
"//*[@{}=\'{}\']".format(attrib, value)):
name = bad.get('name')
bad.getparent().remove(bad)
for file in self.files:
if file.name == name:
self.files.remove(file)
self.save()
[docs] def remove_file(self, file):
"""removes file object from email
Args:
file(File): File object to delete from email
"""
self.remove_file_by_attrib('name', file.name)
[docs] def save(self):
"""Produce new Email from body, head and files, save it, delete old"""
self.uid = int(self.directory.save_message(self))
self._files = None
return self.uid
[docs] def delete(self):
"""delete this email
Returns:
bool: True if success
"""
return self.directory.delete_email(self)
def __hash__(self):
return hash((self.uid))
def __eq__(self, other):
return self.uid == other.uid
def __ne__(self, other):
return not self == other
def __lt__(self, other):
return self.uid < other.uid
def __repr__(self):
return str(self)
def __str__(self):
return '{}: {}'.format(
self.__class__.__name__,
self.uid,
)