forked from langchain-ai/chat-langchain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ingest.py
117 lines (97 loc) · 3.63 KB
/
ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Load html from files, clean up, split, ingest into Weaviate."""
import logging
import os
import re
import weaviate
from bs4 import BeautifulSoup, SoupStrainer
from langchain.document_loaders import SitemapLoader, RecursiveUrlLoader
from langchain.embeddings import OpenAIEmbeddings
from langchain.indexes import SQLRecordManager, index
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.utils.html import PREFIXES_TO_IGNORE_REGEX, SUFFIXES_TO_IGNORE_REGEX
from langchain.vectorstores import Weaviate
from constants import WEAVIATE_DOCS_INDEX_NAME
from parser import langchain_docs_extractor
logger = logging.getLogger(__name__)
WEAVIATE_URL = os.environ["WEAVIATE_URL"]
WEAVIATE_API_KEY = os.environ["WEAVIATE_API_KEY"]
RECORD_MANAGER_DB_URL = os.environ["RECORD_MANAGER_DB_URL"]
def load_langchain_docs():
return SitemapLoader(
"https://python.langchain.com/sitemap.xml",
filter_urls=["https://python.langchain.com/"],
parsing_function=langchain_docs_extractor,
default_parser="lxml",
bs_kwargs={
"parse_only": SoupStrainer(name="article"),
},
).load()
def simple_extractor(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
return re.sub(r"\n\n+", "\n\n", soup.text)
def load_api_docs():
return RecursiveUrlLoader(
url="https://api.python.langchain.com/en/latest/",
max_depth=8,
extractor=simple_extractor,
prevent_outside=True,
use_async=True,
timeout=600,
# Drop trailing / to avoid duplicate pages.
link_regex=(
f"href=[\"']{PREFIXES_TO_IGNORE_REGEX}((?:{SUFFIXES_TO_IGNORE_REGEX}.)*?)"
r"(?:[\#'\"]|\/[\#'\"])"
),
check_response_status=True,
exclude_dirs=("https://api.python.langchain.com/en/latest/_sources",),
).load()
def ingest_docs():
docs_from_documentation = load_langchain_docs()
logger.info(f"Loaded {len(docs_from_documentation)} docs from documentation")
docs_from_api = load_api_docs()
logger.info(f"Loaded {len(docs_from_api)} docs from API")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200)
docs_transformed = text_splitter.split_documents(
docs_from_documentation + docs_from_api
)
# We try to return 'source' and 'title' metadata when querying vector store and
# Weaviate will error at query time if one of the attributes is missing from a
# retrieved document.
for doc in docs_transformed:
if "source" not in doc.metadata:
doc.metadata["source"] = ""
if "title" not in doc.metadata:
doc.metadata["title"] = ""
client = weaviate.Client(
url=WEAVIATE_URL,
auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY),
)
embedding = OpenAIEmbeddings(
chunk_size=200,
) # rate limit
vectorstore = Weaviate(
client=client,
index_name=WEAVIATE_DOCS_INDEX_NAME,
text_key="text",
embedding=embedding,
by_text=False,
attributes=["source", "title"],
)
record_manager = SQLRecordManager(
f"weaviate/{WEAVIATE_DOCS_INDEX_NAME}", db_url=RECORD_MANAGER_DB_URL
)
record_manager.create_schema()
indexing_stats = index(
docs_transformed,
record_manager,
vectorstore,
cleanup="full",
source_id_key="source",
)
logger.info("Indexing stats: ", indexing_stats)
logger.info(
"LangChain now has this many vectors: ",
client.query.aggregate(WEAVIATE_DOCS_INDEX_NAME).with_meta_count().do(),
)
if __name__ == "__main__":
ingest_docs()