Skip to content

Commit

Permalink
Feature/polling clustering and UI (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Stelath authored Dec 30, 2024
1 parent 0157ece commit 33e28b2
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 261 deletions.
22 changes: 19 additions & 3 deletions mailfox/cli/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from pathlib import Path
import shutil
from ..core.config_manager import read_config
from ..vector import VectorDatabase
from ..core.auth import read_credentials
from ..vector import VectorDatabase, EmbeddingFunctions

database_app = typer.Typer(help="Manage email database")

Expand All @@ -28,13 +29,28 @@ def create_database(
)
return

# Create parent directory if it doesn't exist
# Create parent directory if it doesn't exist
db_path.parent.mkdir(parents=True, exist_ok=True)

openai_api_key = None
if config["default_embedding_function"] == EmbeddingFunctions.OPENAI:
# Try to get API key from credentials
try:
_, _, api_key = read_credentials()
openai_api_key = api_key
except Exception as e:
print(f"Error reading credentials: {e}")
raise typer.Exit(1)

if not openai_api_key:
typer.echo("OpenAI API key is required for OpenAI embeddings. Set it using 'mailfox credentials set'")
raise typer.Exit(1)

# Initialize empty database
vector_db = VectorDatabase(
db_path=str(db_path),
embedding_function=config["default_embedding_function"]
embedding_function=config["default_embedding_function"],
openai_api_key=openai_api_key
)

typer.echo(f"✨ Created new email database at {db_path}")
Expand Down
92 changes: 45 additions & 47 deletions mailfox/cli/run.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,44 @@
import typer
import os
from typing import Callable, Optional
from typing import Callable, Optional, Set
import pandas as pd
from ..core.email_processor import process_new_mail, initialize_clustering
from ..core.auth import read_credentials
from ..core.config_manager import read_config
from ..core.database_manager import get_vector_db, initialize_database
from ..email_interface import EmailHandler
from ..vector import VectorDatabase

def process_folder_update(
folder: str,
emails: pd.DataFrame,
email_handler: EmailHandler,
vector_db: VectorDatabase,
recache: bool = False
recache: bool = False,
fetched_uids: Optional[Set[int]] = None,
current_uids: Optional[Set[int]] = None
) -> None:
"""Process updates in a monitored folder."""
try:
# Store any fetched UIDs
if fetched_uids:
vector_db.add_seen_uids(fetched_uids, folder)
elif current_uids: # For new folders
vector_db.add_seen_uids(current_uids, folder)

if not emails.empty:
action = "Recaching" if recache else "Processing"
typer.echo(f"{action} {len(emails)} emails in {folder}")
vector_db.store_emails(emails.to_dict(orient="records"))
if not recache:
process_new_mail(folder, email_handler, vector_db)
if recache:
typer.echo(f"Recaching {len(emails)} emails in {folder}")
vector_db.store_emails(emails.to_dict(orient="records"))
else:
typer.echo(f"Processing {len(emails)} new emails in {folder}")
vector_db.store_emails(emails.to_dict(orient="records"))
except Exception as e:
typer.secho(
f"Error processing folder {folder} update: {str(e)}",
err=True,
fg=typer.colors.RED
)

def get_vector_db(api_key: Optional[str]) -> Optional[VectorDatabase]:
"""Initialize and return the vector database."""
try:
config = read_config()
email_db_path = os.path.expanduser(config["email_db_path"])

if os.path.exists(email_db_path) and not VectorDatabase(email_db_path).is_emails_empty():
return VectorDatabase(
db_path=email_db_path,
embedding_function=config["default_embedding_function"],
openai_api_key=api_key,
)
else:
typer.secho(
"No email database found. Please initialize the database first.",
err=True,
fg=typer.colors.RED
)
return None
except Exception as e:
typer.secho(
f"Error initializing vector database: {str(e)}",
err=True,
fg=typer.colors.RED
)
return None

def run_application() -> None:
"""Run the main MailFox application."""
email_handler = None
Expand All @@ -79,23 +61,39 @@ def run_application() -> None:
else:
typer.echo("Using existing clustering model.")

# Start monitoring
folders_to_monitor = config.get("flagged_folders", ["INBOX"])
# Get all folders including subfolders
classification_folders = config['flagged_folders']
all_folders = email_handler.get_subfolders(classification_folders)
typer.echo(f"Monitoring folders: {', '.join(all_folders)}")

# Initialize database if empty
initialize_database(email_handler, vector_db, all_folders)

# Initialize folder UIDs from seen UIDs in database
folder_uids = vector_db.get_seen_uids()

# Start monitoring for new emails and UID validity changes
check_interval = config.get("check_interval", 300)
enable_uid_validity = config.get("enable_uid_validity", True)
recache_limit = config.get("recache_limit", 100)

typer.echo(f"Starting email monitoring (checking every {check_interval} seconds)")
try:
email_handler.poll_folders(
folders=folders_to_monitor,
callback=lambda folder, emails, recache=False: process_folder_update(
folder, emails, email_handler, vector_db, recache
),
check_interval=check_interval,
enable_uid_validity=enable_uid_validity,
recache_limit=recache_limit
)
while not email_handler.stop_event.is_set():
# First check inbox
process_new_mail("INBOX", email_handler, vector_db)

# Then poll folders
email_handler.poll_folders(
folders=all_folders,
folder_uids=folder_uids,
callback=lambda folder, emails, recache=False, fetched_uids=None, current_uids=None: process_folder_update(
folder, emails, vector_db, recache, fetched_uids, current_uids
),
enable_uid_validity=enable_uid_validity
)

# Wait before next iteration
email_handler.stop_event.wait(check_interval)
except KeyboardInterrupt:
typer.echo("\nShutting down gracefully...")
finally:
Expand Down
19 changes: 4 additions & 15 deletions mailfox/cli/wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from typing import Optional, List
from ..core.auth import save_credentials
from ..core.config_manager import save_config
from ..core.database_manager import get_vector_db, initialize_database
from ..email_interface import EmailHandler
from ..vector import VectorDatabase, EmbeddingFunctions
from ..vector import EmbeddingFunctions
import os

def run_setup_wizard() -> None:
Expand Down Expand Up @@ -98,24 +99,12 @@ def _handle_initial_download(

try:
email_handler = EmailHandler(username, password)
vector_db = VectorDatabase(
os.path.expanduser(config["email_db_path"]),
embedding_function=config["default_embedding_function"],
openai_api_key=api_key
)
vector_db = get_vector_db(api_key)

folders = config["flagged_folders"]
typer.echo(f"Downloading emails from folders: {', '.join(folders)}")

for folder in folders:
emails = email_handler.get_mail(
filter="all",
folders=[folder],
return_dataframe=True
)
if not emails.empty:
typer.echo(f"Storing {len(emails)} emails from {folder}")
vector_db.store_emails(emails.to_dict(orient="records"))
initialize_database(email_handler, vector_db, folders)

except Exception as e:
typer.secho(
Expand Down
59 changes: 59 additions & 0 deletions mailfox/core/database_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Dict, Set, List
import os
import typer
from ..vector import VectorDatabase
from ..email_interface import EmailHandler
from .config_manager import read_config

def initialize_database(email_handler: EmailHandler, vector_db: VectorDatabase, folders: List[str]) -> None:
"""Initialize the vector database with emails from all folders."""
if not vector_db.is_emails_empty():
typer.echo("Database already contains emails. Skipping initialization.")
return

typer.echo("Email database is empty. Downloading all emails...")
for folder in folders:
emails, all_uids = email_handler.get_mail(
filter="all",
folders=[folder],
return_dataframe=True,
return_uids=True
)
if not emails.empty:
typer.echo(f"Storing {len(emails)} emails from {folder}")
vector_db.store_emails(emails.to_dict(orient="records"))

# Store all UIDs from the folder
for folder_name, uids in all_uids.items():
vector_db.add_seen_uids(uids, folder_name)

def get_vector_db(api_key: str = None) -> VectorDatabase:
"""Initialize and return the vector database."""
try:
config = read_config()
email_db_path = os.path.expanduser(config["email_db_path"])

# Create database directory if it doesn't exist
os.makedirs(email_db_path, exist_ok=True)

# Initialize vector database
vector_db = VectorDatabase(
db_path=email_db_path,
embedding_function=config["default_embedding_function"],
openai_api_key=api_key,
)

if vector_db.is_emails_empty():
typer.secho(
"Email database is empty. It will be initialized when emails are processed.",
fg=typer.colors.YELLOW
)

return vector_db
except Exception as e:
typer.secho(
f"Error initializing vector database: {str(e)}",
err=True,
fg=typer.colors.RED
)
raise
26 changes: 13 additions & 13 deletions mailfox/core/email_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import Counter
import typer
from ..vector import KMeansCluster
from ..vector.classifiers.linear_svm import LinearSVMClassifier
from ..core.config_manager import read_config
import os
import numpy as np
Expand All @@ -16,7 +16,7 @@ def initialize_clustering(vector_db, n_clusters=10):
folders = [metadata['folder'] for metadata in docs['metadatas']]

# Create and fit new clustering model
clustering = KMeansCluster(pca_components=None)
clustering = LinearSVMClassifier()
clustering.fit(all_embeddings, folders=folders)

# Save the model
Expand All @@ -27,7 +27,7 @@ def get_clustering_model():
"""Load the existing clustering model."""
config = read_config()
clustering_path = os.path.expanduser(config['clustering_path'])
clustering = KMeansCluster()
clustering = LinearSVMClassifier()
clustering.load_model(clustering_path)
return clustering

Expand All @@ -50,18 +50,18 @@ def classify_emails(new_emails, vector_db, email_handler):

for idx, mail in new_emails.iterrows():
try:
paragraphs = mail['paragraphs']
email_embeddings = vector_db.embed_paragraphs(paragraphs)
predicted_classes = clustering.find_closest_class(email_embeddings, clustering.kmeans.labels_)
# Look up embeddings for this email's uuid in vector db
email_ids = [f"{mail['uuid']}_{i}" for i in range(len(mail['paragraphs']))]
email_docs = vector_db.emails_collection.get(
ids=email_ids,
include=['embeddings']
)
embeddings = email_docs['embeddings']

if isinstance(predicted_classes[0], list):
class_counts = Counter([cls for classes in predicted_classes for cls in classes])
else:
class_counts = Counter(predicted_classes)

predicted_folder = class_counts.most_common(1)[0][0]
# Use the LRClassifier to predict the folder directly
predicted_folder = clustering.classify_email(embeddings)

if predicted_folder:
if predicted_folder and predicted_folder != "UNKNOWN":
email_handler.move_mail([mail['uid']], predicted_folder)
print(f"Moved email {mail['uuid']} to folder: {predicted_folder}")
else:
Expand Down
Loading

0 comments on commit 33e28b2

Please sign in to comment.