Skip to content

Commit

Permalink
Initial Prototype Complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Stelath committed May 29, 2024
1 parent 082ef39 commit b0f4070
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 87 deletions.
3 changes: 3 additions & 0 deletions mailfox/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .mailfox import main

main()
48 changes: 48 additions & 0 deletions mailfox/email_interface/email_gen_ai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pandas as pd
from openai import OpenAI
from tqdm.auto import tqdm

class EmailLLM():
# gpt-4-turbo-preview
def __init__(self, api_key, model_name="gpt-3.5-turbo-0125"):
self.model_name = model_name
self.openai = OpenAI(api_key=api_key)

def generate_labels(self, emails):
# Format the emails so they're listed individually so the LLM can understand them better
formatted_emails = [f"Email {i}: {email['subject']}\n {email['body']}" for i, email in enumerate(emails)]

# Create a chat with GPT-3.5
response = self.openai.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": f"You are an email assistant that classifies emails into catagories. Given a list of emails respond with a catagory that characterizes the emails the best and nothing else."},
{"role": "user", "content": formatted_emails}
]
)

# Get the category from the response
category = response.choices[0].message.content

return category

def predict_folder(self, email, folders):
formatted_email = f"{email['from']} -> {email['to']}\n{email['subject']}\n{email['body']}"

response = self.openai.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": f"You are an email assistant that classifies emails into one of these categories: {folders}. Only respond with the email catagory and nothing else."},
{"role": "user", "content": formatted_email}
]
)

folder = response.choices[0].message.content

if folder.strip()[0] != '"':
folder = '"' + folder + '"'

if folder.strip() not in folders:
raise ValueError(f"Folder {folder} not in {folders}")

return folder
165 changes: 103 additions & 62 deletions mailfox/email_interface/email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
import datetime
import multiprocessing
import email
import hashlib
from email.header import decode_header
from tqdm.auto import tqdm
import pandas as pd

import re
from bs4 import BeautifulSoup

from tqdm.auto import tqdm

class EmailHandler():
def __init__(self, username, password):
self.mail = imaplib.IMAP4_SSL("imap.gmail.com")
Expand All @@ -20,9 +25,36 @@ def get_all_mail_uids(self):
result, data = self.mail.uid('search', None, "ALL")
return [d.decode('utf-8') for d in data[0].split()]

def format_folders(self, folders, plain=False):
formatted_folders = []

if not plain:
for folder in folders:
if folder[0] != '"':
folder = '"' + folder + '"'
formatted_folders.append(folder)
else:
formatted_folders.append(folder)
else:
for folder in folders:
if folder[0] == '"':
folder = folder[1:-1]
formatted_folders.append(folder)
else:
formatted_folders.append(folder)

return formatted_folders

def get_all_folders(self):
result, data = self.mail.list()
return [d.decode('utf-8').split(' "/" ')[1][1:-1] for d in data]
return [d.decode('utf-8').split(' "/" ')[1] for d in data]

def get_subfolders(self, folders):
folders = self.format_folders(folders, plain=True)
all_folders = self.get_all_folders()
subfolders = [folder for folder in all_folders if any(f in folder for f in folders)]

return subfolders

def get_folder_uids(self, folder):
result, data = self.mail.select(folder)
Expand All @@ -33,86 +65,87 @@ def get_folder_uids(self, folder):
print(f"Failed to select folder {folder}")
return []

def hash_email(self, email):
hash_string = email['from'] + email['to'] + email['subject'] + email['date']
uuid = hashlib.sha256(hash_string.encode()).hexdigest()
return uuid

def get_mail(self, filter='unseen', *, uids=None, folders=["INBOX"], return_dataframe=True):
folders = self.format_folders(folders)
emails = []

if folders != [] and folders is not None:
uids = []
uids = {}
for folder in folders:
result, data = self.mail.select(folder)
if result == 'OK':
if filter == 'unseen':
result, data = self.mail.uid('search', None, "(UNSEEN)")
uids += data[0].split()
uids[folder] = data[0].split()
elif filter == 'seen':
result, data = self.mail.uid('search', None, "(SEEN)")
uids += data[0].split()
uids[folder] = data[0].split()
elif filter == 'all':
result, data = self.mail.uid('search', None, "ALL")
uids += data[0].split()
uids[folder] = data[0].split()
elif filter == 'uids' and uids is not None:
result = 'OK'
uids += [uid.encode('utf-8') for uid in uids]
uids[folder] = [uid.encode('utf-8') for uid in uids]
else :
print("Invalid filter. Please use 'unseen', 'all', or 'uids'.")
return
else:
print(f"Failed to select folder {folder}")
continue
else:
if filter == 'unseen':
result, data = self.mail.uid('search', None, "(UNSEEN)") # search and return uids of unseen emails
uids = data[0].split()
elif filter == 'seen':
result, data = self.mail.uid('search', None, "(SEEN)")
uids = data[0].split()
elif filter == 'all':
result, data = self.mail.uid('search', None, "ALL")
uids = data[0].split()
elif filter == 'uids' and uids is not None:
result = 'OK'
uids = [uid.encode('utf-8') for uid in uids]
else:
print("Invalid filter. Please use 'unseen', 'all', or 'uids'.")
return
else:
print("No folders to read.")
return

if uids != [] and uids is not None:
for num in tqdm(uids, desc="Processing Emails", leave=False):
result, email_data = self.mail.uid('fetch', num, '(BODY.PEEK[])') # fetch the email body (peek = not mark as read)
raw_email = email_data[0][1]
try:
raw_email_string = raw_email.decode('utf-8')
except:
continue
email_message = email.message_from_string(raw_email_string)
for folder, uids in tqdm(uids.items(), desc="Processing Folders", position=0, leave=False):
self.mail.select(folder)
for num in tqdm(uids, desc=f"Getting Emails from {folder}", position=1, leave=False):
result, email_data = self.mail.uid('fetch', num, '(BODY.PEEK[])') # fetch the email body (peek = not mark as read)
raw_email = email_data[0][1]
try:
raw_email_string = raw_email.decode('utf-8')
except:
continue
email_message = email.message_from_string(raw_email_string)

# get the email details
date_tuple = email.utils.parsedate_tz(email_message['Date'])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
local_message_date = "%s" %(str(local_date.strftime("%a, %d %b %Y %H:%M:%S")))
# get the email details
date_tuple = email.utils.parsedate_tz(email_message['Date'])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
local_message_date = "%s" %(str(local_date.strftime("%a, %d %b %Y %H:%M:%S")))

try:
email_from = str(decode_header(email_message['From'])[0][0])
email_to = str(decode_header(email_message['To'])[0][0])
subject = str(decode_header(email_message['Subject'])[0][0])
uuid = self.hash_email({'from': email_from, 'to': email_to, 'subject': subject, 'date': local_message_date})
uid = num.decode('utf-8')
except:
continue

try:
email_from = str(decode_header(email_message['From'])[0][0])
email_to = str(decode_header(email_message['To'])[0][0])
subject = str(decode_header(email_message['Subject'])[0][0])
folder = str(decode_header(email_message['Folder'])[0][0])
id = num.decode('utf-8')
except:
continue

body = ""
if email_message.is_multipart():
for part in email_message.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode("latin-1")
# elif part.get_content_type() == 'text/html':
# body += part.get_payload(decode=True).decode()
else:
body = email_message.get_payload(decode=True).decode("latin-1")

# return the email details
emails.append({'id': id, 'from': email_from, 'to': email_to, 'subject': subject, 'date': local_message_date, 'body': body})
body = ""
if email_message.is_multipart():
for part in email_message.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode("latin-1")
# elif part.get_content_type() == 'text/html':
# body += part.get_payload(decode=True).decode()
else:
body = email_message.get_payload(decode=True).decode("latin-1")

raw_body = body

soup = BeautifulSoup(body, 'lxml')
body = soup.get_text()
body = self._strip_repeated_chars(body)

# return the email details
emails.append({'uid': uid, 'folder': folder, 'uuid': uuid, 'from': email_from, 'to': email_to, 'subject': subject, 'date': local_message_date, 'body': body, 'raw_body': raw_body})
else:
print("No new emails to read.")

Expand All @@ -121,6 +154,11 @@ def get_mail(self, filter='unseen', *, uids=None, folders=["INBOX"], return_data

return emails

def _strip_repeated_chars(self, s, chars = ['\n', '\t']):
for char in chars:
s = re.sub(f'{char}+', char, s)
return s

def create_folder(self, folder):
result = self.mail.create(folder)
if result[0] == 'OK':
Expand All @@ -129,17 +167,20 @@ def create_folder(self, folder):
print(f"Failed to create folder {folder}")

def move_mail(self, uids, folder):
folder = self.format_folders([folder])[0]

for uid in uids:
result = self.mail.uid('COPY', uid, folder)
if result[0] == 'OK':
mov, data = self.mail.uid('STORE', uid , '+FLAGS', '(\Deleted)')
self.mail.expunge()
print(f"Moved {uid} to {folder}")
else:
print(f"Failed to move {uid} to {folder}")

def delete_mail(self, uids):
for uid in uids:
mov, data = self.mail.uid('STORE', uid , '+FLAGS', '(\Deleted)')
self.mail.expunge()
print(f"Deleted {uid}")
folder = "INBOX.Trash"
self.move_mail(uids, folder)
# for uid in uids:
# mov, data = self.mail.uid('STORE', uid , '+FLAGS', '(\Deleted)')
# self.mail.expunge()
# print(f"Deleted {uid}")
Loading

0 comments on commit b0f4070

Please sign in to comment.