-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
210 lines (179 loc) · 7.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import os
import logging
import uuid
from dotenv import load_dotenv
import google.generativeai as genai
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from flask import Flask, request, jsonify
from flask_restful import Api, Resource
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_caching import Cache
from bs4 import BeautifulSoup
from langchain_core.documents import Document
load_dotenv()
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Configuration initialization
try:
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
API_KEYS = os.getenv("API_KEYS", "").split(",")
except Exception as e:
logger.error(f"Configuration error: {e}")
exit(1)
# Model configuration
generation_config = {
"temperature": float(os.getenv("TEMPERATURE", 0.7)),
"top_p": float(os.getenv("TOP_P", 0.95)),
"top_k": int(os.getenv("TOP_K", 40)),
"max_output_tokens": int(os.getenv("MAX_OUTPUT_TOKENS", 8192)),
}
model = genai.GenerativeModel(
model_name=os.getenv("MODEL_NAME", "gemini-2.0-flash-exp"),
generation_config=generation_config,
)
def get_limiter_key():
"""Custom rate limiter key function using session ID when available"""
try:
if request.is_json:
session_id = request.get_json().get("session_id")
if session_id:
return session_id
except Exception:
pass
return get_remote_address()
app = Flask(__name__)
api = Api(app)
limiter = Limiter(get_limiter_key, app=app, default_limits=["10 per minute"])
cache = Cache(app, config={"CACHE_TYPE": "SimpleCache", "CACHE_DEFAULT_TIMEOUT": 300})
# Global session management (consider persistent storage for production)
sessions = {}
def load_data(url):
"""Enhanced data loader with HTML content extraction"""
try:
logger.info(f"Loading data from: {url}")
loader = WebBaseLoader(url)
raw_docs = loader.load()
processed_docs = []
for doc in raw_docs:
try:
soup = BeautifulSoup(doc.page_content, 'html.parser')
# Custom extraction based on actual website structure
courses = soup.find_all('div', class_='course-card') # Update selector
for course in courses:
try:
title = course.find('h3').get_text(strip=True)
description = course.find('div', class_='description').get_text(strip=True)
content = f"Course Title: {title}\nDescription: {description}"
processed_docs.append(Document(
page_content=content,
metadata={"source": url, "title": title}
))
except Exception as e:
logger.warning(f"Error processing course: {e}")
continue
except Exception as e:
logger.error(f"Error parsing content {e}")
continue
logger.info(f"Extracted {len(processed_docs)} courses from {url}")
return processed_docs
except Exception as e:
logger.error(f"Data loading failed: {e}")
raise
def split_documents(data):
"""Optimal text splitting for technical content"""
try:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=int(os.getenv("CHUNK_SIZE", 768)),
chunk_overlap=int(os.getenv("CHUNK_OVERLAP", 128)),
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""]
)
return text_splitter.split_documents(data)
except Exception as e:
logger.error(f"Error splitting documents: {e}")
raise
def create_vector_store(docs):
"""Vector store with persistence management"""
persist_dir = "./chroma_db"
try:
embeddings = HuggingFaceEmbeddings(
model_name=os.getenv("EMBEDDINGS_MODEL", "sentence-transformers/all-mpnet-base-v2")
)
if os.path.exists(persist_dir) and not os.getenv("REFRESH_VECTOR_STORE", "false").lower() == "true":
logger.info("Loading existing vector store")
return Chroma(persist_directory=persist_dir, embedding_function=embeddings)
logger.info("Creating new vector store")
vector_store = Chroma.from_documents(
documents=docs,
embedding=embeddings,
persist_directory=persist_dir
)
vector_store.persist()
return vector_store
except Exception as e:
logger.error(f"Error creating or loading vector store: {e}")
raise
# Data pipeline execution
try:
urls = os.getenv("DATA_URLS", "https://brainlox.com/courses/category/technical").split(',')
all_docs = []
for url in urls:
all_docs.extend(load_data(url.strip()))
split_docs = split_documents(all_docs)
vector_store = create_vector_store(split_docs)
except Exception as e:
logger.error(f"Failed to initialize data pipeline: {e}")
exit(1)
class ChatBot(Resource):
@limiter.limit("10/minute")
def post(self):
"""Enhanced chat endpoint with session management and context-aware responses"""
# Request validation
data = request.get_json()
user_input = data.get("message", "").strip()
session_id = data.get("session_id")
if not user_input or len(user_input) > 1000:
return jsonify({"error": "Invalid message format"}), 400
# Session management
if not session_id or session_id not in sessions:
session_id = str(uuid.uuid4())
sessions[session_id] = {
"chat": model.start_chat(history=[]),
"history": []
}
session = sessions[session_id]
try:
# Context retrieval
relevant_docs = vector_store.similarity_search(user_input, k=3)
context = "\n".join([f"Source: {doc.metadata['source']}\n{doc.page_content}"
for doc in relevant_docs])
# Enhanced prompt engineering
prompt = f"""You are a technical course advisor. Use this context:
{context}
Current conversation:
{session['history']}
Question: {user_input}
Helpful Answer:"""
# Response generation
response = session["chat"].send_message(prompt)
bot_response = response.text
# Update session history
session["history"].append((user_input, bot_response))
return jsonify({
"response": bot_response,
"session_id": session_id,
"sources": list({doc.metadata["source"] for doc in relevant_docs})
})
except Exception as e:
logger.error(f"Chat error: {e}")
return jsonify({"error": "Processing error"}), 500
class HealthCheck(Resource):
def get(self):
return jsonify({"status": "ok", "model": model.model_name})
api.add_resource(ChatBot, "/chat")
api.add_resource(HealthCheck, "/health")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=os.getenv("DEBUG", "false").lower() == "true")