"""Directory class
This represents a physical directory at storage level (imap)
"""
from .email.email import Email
from .email.address import Address
from ..tools.compare import list_compare
[docs]class Directory: # :TODO: # pylint: disable=too-many-public-methods
"""Directory class"""
def __init__(self, storage, folder):
self.storage = storage
self.imap = storage.imap
if ' ' in folder:
folder = folder.replace(' ', '_')
self.folder = folder
self.path = folder
self._emails = None
self._uids = None
@property
def parent(self):
"""parent directory
Returns:
Directory:
"""
splitted = self.path.split('.')
if len(splitted) > 1:
path = '.'.join(splitted[:-1])
directory = self.storage.directory_by_path(path)
else:
directory = None
return directory
@property
def childs(self):
"""childs of this directory
Returns:
list: of directories [Directory, ...]
"""
dirs = []
for directory in self.storage.directories:
path = directory.path
is_subdir = len(self.path.split('.'))+1 == len(path.split('.'))
if path.startswith(self.path) and is_subdir:
dirs.append(self.storage.directory_by_path(path))
return dirs
@property
def breadcrumbs(self):
"""path of this directory in breadcrumb format
Returns:
list: [self.parent.parent.parent, self.parent.parent, self.parent]
"""
breadcrumbs = []
directory_iter = self
while directory_iter.parent:
directory_iter = directory_iter.parent
breadcrumbs.append(directory_iter)
return sorted(breadcrumbs)
@property
def uids(self):
"""keep this uptodate because self.emails compares to it"""
if self._uids is None or True: # ever refreshing
self.imap.create_folder(self.path)
self._uids = self.imap.search()
return self._uids
@property
def emails(self):
"""cannot be dynamic because of self.fetch methods"""
if self._emails is None:
self._emails = [Email(self, uid) for uid in self.uids]
else:
email_uids = [email.uid for email in self._emails if email.uid]
rem, add = list_compare(email_uids, self.uids)
for uid in rem:
self._emails.remove(Email(self, uid))
for uid in add:
self._emails.append(Email(self, uid))
return self._emails
@property
def files(self):
"""List files of all emails inside this directory
Returns:
list: of files
"""
files = []
for email in self.emails:
for file in email.files:
files.append(file)
return files
@property
def app_name(self): # :TODO:
"""the name of the application of this directory
Returns:
str:
"""
splitted = self.path.split('.')
return splitted[1] if len(splitted) > 1 else splitted[0]
@property
def item_name(self): # :TODO:
"""this is the name of the item
Returns:
str
"""
return self.path.split('.')[-1]
@property
def url(self):
"""makes the Django life easier
Returns:
str: path as url
"""
directory = self.imap.config.directory
if self.path.startswith(directory):
url = self.path[len(directory)+1:]
else:
url = self.path
return url.replace('.', '/')
[docs] def refresh(self):
"""refresh the directories cached properties"""
self._uids = None
[docs] def email_by_uid(self, uid):
"""get email by uid
Args:
uid(int): uid you want to have the email object
Returns:
Email: if email with uid exists in this directory or None
"""
uid = int(uid)
for email in self.emails:
if email.uid == uid:
return email
return None
[docs] def add_file_email(self, file):
"""Create new Email with one file
Args:
file(File): object to append as file to the directory
Returns:
Email: new created file Email
"""
if file.name in [file.name for file in self.files]:
return False
email = self.new_email(file.name)
email.add_file(file)
email.save()
self.refresh()
return self.email_by_uid(email.uid)
[docs] def file_by_name(self, name):
"""get file by name
Args:
name(str): name of the file to get
Returns:
File: first file with name
"""
for file in self.files:
if file.name == name:
return file
return False
[docs] def new_email(self, item_name, from_addr=None, from_displ=None):
"""needs to be runned if its a ne Email with no uid"""
config = self.storage.imap.config
from_addr_obj = Address(
addr_spec=from_addr or config.imap.user,
display_name=from_displ or config.imap.user
)
to_addr_obj = Address(
addr_spec=config.imap.user,
display_name=config.imap.user
)
email = Email(self, None)
email.head = email.new_head(
'{} {}'.format(self.imap.config.tag, item_name),
from_addr_obj,
to_addr_obj
)
email.body = email.new_body()
# email.save()
return email
[docs] def delete_email(self, email_uid_or_obj):
"""delete email within this directory
Args:
email_uid_or_obj(Email, int, str): email to delete
Returns:
bool: True if success
"""
if isinstance(email_uid_or_obj, Email):
uid = email_uid_or_obj.uid
else:
uid = email_uid_or_obj
result = self.imap.delete_messages([uid]) # immer true :-(
return result
[docs] def delete(self):
"""Delete this storage
Warning:
Also deletes its subdirectories recursively
Returns:
bool: True if success
"""
return self.storage.delete_directory(self.path)
# ### Fetch methods ###
[docs] def fetch_subjects(self, email=None):
"""fetch subjects
Args:
email(Email, optional): fetch only from this email
Returns:
dict: {uid: subject}
"""
if email:
uids = [email.uid]
else:
uids = self.uids
subjects = self.storage.get_subjects(folder=self.path)
for subject, uids in subjects.items():
for uid in uids:
self.email_by_uid(uid).subject = subject
return subjects
[docs] def fetch_head(self, email):
"""fetch email head
Args:
email(Email): fetch head of this email
Returns:
str: head of email
"""
return self.storage.get_heads(email.uid)[email.uid]
[docs] def fetch_body(self, email):
"""fetch email body
Args:
email(Email): fetch body of this email
Returns:
str: body of email
"""
return self.storage.get_bodies(email.uid)[email.uid]
[docs] def fetch_payloads(self, email):
"""
:returns: payloads as string
"""
return self.storage.get_file_payloads(email.uid)[email.uid]
[docs] def save_message(self, msg_obj):
"""save msg_obj to imap directory
:returns: new uid on success or False
"""
old_uid = msg_obj.uid
result = self.imap.append(self.folder, str(msg_obj.plain))
if old_uid:
self.imap.delete_messages(old_uid)
self.refresh()
return int(result.decode('utf-8').split(']')[0].split()[-1])
def __hash__(self):
return hash(self.path)
def __lt__(self, other):
return self.path < other.path
def __eq__(self, other):
return self.path == other.path
def __ne__(self, other):
return not self == other
def __repr__(self):
return '{}: {}'.format(self.__class__.__name__, self.path)
def __str__(self):
return self.path