Skip to content

Commit

Permalink
feat: Document metadata, document IDs, delete from index and tests (A…
Browse files Browse the repository at this point in the history
…nswerDotAI#36)

* first pass changes to include document metadata

* Added working tests for all metadata and document id functionality
except for add_to_index and delete_from_index

* Added working delete_from_index functionality

* Fixed tests and functionality for add_to_index, and updated the intro notebook

* Updated docs and doc strings

* Updated metadata return logic to account for
possible documents in the collection without metadata

* Added check for document metadata length vs documents

* - Reverted names
- Made document ids independent of collection and saved as it's own map file
- Added full document return flag
- Updated tests

* - Fixed tests
- Updated README and basic usage notebook
- Removed return_entire_source_document functionaliity
because document splitting introduces overlaps

* Updated with ruff import sorting

* Added ruff ignore for test fixture setup

* renamed test file to match other files better
and added TODOs to move tests

* fix: formatted files that failed ruff CI check

* style: formatted files that failed ruff CI check

* chore: remove code duplication and ensure process_corpus can run indepently

* chore: typo

---------

Co-authored-by: bclavie <[email protected]>
  • Loading branch information
adharm and bclavie authored Jan 24, 2024
1 parent 65aee5f commit a245fba
Show file tree
Hide file tree
Showing 8 changed files with 693 additions and 415 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,6 @@ archive/

*/.ragatouille

local/
local/

.vscode/
42 changes: 31 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,29 @@ To create an index, you'll need to load a trained model, this can be one of your
```python
from ragatouille import RAGPretrainedModel
from ragatouille.utils import get_wikipedia_page
from ragatouille.data import CorpusProcessor


RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
my_documents = [get_wikipedia_page("Hayao_Miyazaki"), get_wikipedia_page("Studio_Ghibli")]
processor = CorpusProcessor()
my_documents = processor.process_corpus(my_documents)
index_path = RAG.index(index_name="my_index", collection=my_documents)
```
You can also optionally add document IDs or document metadata when creating the index:

```python
document_ids = ["miyazaki", "ghibli"]
document_metadatas = [
{"entity": "person", "source": "wikipedia"},
{"entity": "organisation", "source": "wikipedia"},
]
index_path = RAG.index(
index_name="my_index_with_ids_and_metadata",
collection=my_documents,
document_ids=document_ids,
document_metadatas=document_metadatas,
)
```

Once this is done running, your index will be saved on-disk and ready to be queried! RAGatouille and ColBERT handle everything here:
- Splitting your documents
- Tokenizing your documents
- Identifying the individual terms
- Embedding the documents and generating the bags-of-embeddings
Expand Down Expand Up @@ -163,25 +175,33 @@ RAG.search(["What manga did Hayao Miyazaki write?",
```python
# single-query result
[
{"content": "blablabla", "score": 42.424242, "rank": 1},
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x"},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k},
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y"},
]
# multi-query result
[
[
{"content": "blablabla", "score": 42.424242, "rank": 1},
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x"},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k},
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y"},
],
[
{"content": "blablabla", "score": 42.424242, "rank": 1},
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x"},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k},
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y"},
],
],
```

If your index includes document metadata, it'll be returned as a dictionary in the `document_metadata` key of the result dictionary:

```python
[
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x", "document_metadata": {"A": 1, "B": 2}},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y", "document_metadata": {"A": 3, "B": 4}},
]
```

## I'm sold, can I integrate late-interaction RAG into my project?

Expand Down
373 changes: 25 additions & 348 deletions examples/01-basic_indexing_and_search.ipynb

Large diffs are not rendered by default.

133 changes: 121 additions & 12 deletions ragatouille/RAGPretrainedModel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, List, Optional, TypeVar, Union
from uuid import uuid4

from langchain.retrievers.document_compressors.base import BaseDocumentCompressor
from langchain_core.retrievers import BaseRetriever
Expand Down Expand Up @@ -53,19 +54,23 @@ def from_pretrained(
pretrained_model_name_or_path: Union[str, Path],
n_gpu: int = -1,
verbose: int = 1,
index_root: Optional[str] = None,
):
"""Load a ColBERT model from a pre-trained checkpoint.
Parameters:
pretrained_model_name_or_path (str): Local path or huggingface model name.
n_gpu (int): Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.
verbose (int): The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.
index_root (Optional[str]): The root directory where indexes will be stored. If None, will use the default directory, '.ragatouille/'.
Returns:
cls (RAGPretrainedModel): The current instance of RAGPretrainedModel, with the model initialised.
"""
instance = cls()
instance.model = ColBERT(pretrained_model_name_or_path, n_gpu, verbose=verbose)
instance.model = ColBERT(
pretrained_model_name_or_path, n_gpu, index_root=index_root, verbose=verbose
)
return instance

@classmethod
Expand All @@ -90,48 +95,109 @@ def from_index(

return instance

def _process_metadata(
self,
document_ids: Optional[Union[TypeVar("T"), List[TypeVar("T")]]],
document_metadatas: Optional[list[dict[Any, Any]]],
collection_len: int,
) -> tuple[list[str], Optional[dict[Any, Any]]]:
if document_ids is None:
document_ids = [str(uuid4()) for i in range(collection_len)]
else:
if len(document_ids) != collection_len:
raise ValueError("document_ids must be the same length as collection")
if len(document_ids) != len(set(document_ids)):
raise ValueError("document_ids must be unique")
if any(not id.strip() for id in document_ids):
raise ValueError("document_ids must not contain empty strings")
if not all(isinstance(id, type(document_ids[0])) for id in document_ids):
raise ValueError("All document_ids must be of the same type")

if document_metadatas is not None:
if len(document_metadatas) != collection_len:
raise ValueError(
"document_metadatas must be the same length as collection"
)
docid_metadata_map = {
x: y for x, y in zip(document_ids, document_metadatas)
}
else:
docid_metadata_map = None

return document_ids, docid_metadata_map

def index(
self,
collection: list[str],
document_ids: Union[TypeVar("T"), List[TypeVar("T")]] = None,
document_metadatas: Optional[list[dict]] = None,
index_name: str = None,
overwrite_index: bool = True,
max_document_length: int = 256,
split_documents: bool = True,
document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
preprocessing_fn: Optional[Union[Callable, list[Callable]]] = None,
):
"""Build an index from a collection of documents.
"""Build an index from a list of documents.
Parameters:
collection (list[str]): The collection of documents to index.
document_ids (Optional[list[str]]): An optional list of document ids. Ids will be generated at index time if not supplied.
index_name (str): The name of the index that will be built.
overwrite_index (bool): Whether to overwrite an existing index with the same name.
max_document_length (int): The maximum length of a document. Documents longer than this will be split into chunks.
split_documents (bool): Whether to split documents into chunks.
document_splitter_fn (Optional[Callable]): A function to split documents into chunks. If None and by default, will use the llama_index_sentence_splitter.
preprocessing_fn (Optional[Union[Callable, list[Callable]]]): A function or list of functions to preprocess documents. If None and by default, will not preprocess documents.
Returns:
index (str): The path to the index that was built.
"""

document_ids, docid_metadata_map = self._process_metadata(
document_ids=document_ids,
document_metadatas=document_metadatas,
collection_len=len(collection),
)

if split_documents or preprocessing_fn is not None:
self.corpus_processor = CorpusProcessor(
document_splitter_fn=document_splitter_fn if split_documents else None,
preprocessing_fn=preprocessing_fn,
)
collection = self.corpus_processor.process_corpus(
collection_with_ids = self.corpus_processor.process_corpus(
collection,
document_ids,
chunk_size=max_document_length,
)
else:
collection_with_ids = [
{"document_id": x, "content": y}
for x, y in zip(document_ids, collection)
]

pid_docid_map = {
index: item["document_id"] for index, item in enumerate(collection_with_ids)
}
collection = [x["content"] for x in collection_with_ids]

overwrite = "reuse"
if overwrite_index:
overwrite = True
return self.model.index(
collection,
index_name,
pid_docid_map=pid_docid_map,
docid_metadata_map=docid_metadata_map,
index_name=index_name,
max_document_length=max_document_length,
overwrite=overwrite,
)

def add_to_index(
self,
new_documents: list[str],
new_collection: list[str],
new_document_ids: Union[TypeVar("T"), List[TypeVar("T")]],
new_document_metadatas: Optional[list[dict]] = None,
index_name: Optional[str] = None,
split_documents: bool = True,
document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
Expand All @@ -140,21 +206,59 @@ def add_to_index(
"""Add documents to an existing index.
Parameters:
new_documents (list[str]): The documents to add to the index.
new_collection (list[str]): The documents to add to the index.
new_document_metadatas (Optional[list[dict]]): An optional list of metadata dicts
index_name (Optional[str]): The name of the index to add documents to. If None and by default, will add documents to the already initialised one.
"""
new_document_ids, new_docid_metadata_map = self._process_metadata(
document_ids=new_document_ids,
document_metadatas=new_document_metadatas,
collection_len=len(new_collection),
)

if split_documents or preprocessing_fn is not None:
self.corpus_processor = CorpusProcessor(
document_splitter_fn=document_splitter_fn if split_documents else None,
preprocessing_fn=preprocessing_fn,
)
new_documents = self.corpus_processor.process_corpus(
new_documents,
new_collection_with_ids = self.corpus_processor.process_corpus(
new_collection,
new_document_ids,
chunk_size=self.model.config.doc_maxlen,
)
else:
new_collection_with_ids = [
{"document_id": x, "content": y}
for x, y in zip(new_document_ids, new_collection)
]

new_collection = [x["content"] for x in new_collection_with_ids]

new_pid_docid_map = {
index: item["document_id"]
for index, item in enumerate(new_collection_with_ids)
}

self.model.add_to_index(
new_documents,
new_collection,
new_pid_docid_map,
new_docid_metadata_map=new_docid_metadata_map,
index_name=index_name,
)

def delete_from_index(
self,
document_ids: Union[TypeVar("T"), List[TypeVar("T")]],
index_name: Optional[str] = None,
):
"""Delete documents from an index by their IDs.
Parameters:
document_ids (Union[TypeVar("T"), List[TypeVar("T")]]): The IDs of the documents to delete.
index_name (Optional[str]): The name of the index to delete documents from. If None and by default, will delete documents from the already initialised one.
"""
self.model.delete_from_index(
document_ids,
index_name=index_name,
)

Expand All @@ -177,12 +281,17 @@ def search(
zero_index_ranks (bool): Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result
Returns:
results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score` and `rank`.
results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score`, `rank`, and 'document_id'. If metadata was indexed for the document, it will be returned under the "document_metadata" key.
Individual results are always in the format:
```python3
{"content": "text of the relevant passage", "score": 0.123456, "rank": 1}
{"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x"}
```
or
```python3
{"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x", "document_metadata": {"metadata_key": "metadata_value", ...}}
```
"""
return self.model.search(
query=query,
Expand Down
15 changes: 12 additions & 3 deletions ragatouille/data/corpus_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Callable, Optional, Union
from uuid import uuid4

from ragatouille.data.preprocessors import llama_index_sentence_splitter

Expand All @@ -15,15 +16,23 @@ def __init__(
def process_corpus(
self,
documents: list[str],
document_ids: Optional[list[str]] = None,
**splitter_kwargs,
) -> list[str]:
# TODO CHECK KWARGS
document_ids = (
[str(uuid4()) for _ in range(len(documents))]
if document_ids is None
else document_ids
)
if self.document_splitter_fn is not None:
documents = self.document_splitter_fn(documents, **splitter_kwargs)
documents = self.document_splitter_fn(
documents, document_ids, **splitter_kwargs
)
if self.preprocessing_fn is not None:
if isinstance(self.preprocessing_fn, list):
for fn in self.preprocessing_fn:
documents = fn(documents)
documents = fn(documents, document_ids)
return documents
return self.preprocessing_fn(documents)
return self.preprocessing_fn(documents, document_ids)
return documents
10 changes: 7 additions & 3 deletions ragatouille/data/preprocessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
from llama_index.text_splitter import SentenceSplitter


def llama_index_sentence_splitter(documents: list[str], chunk_size=256):
def llama_index_sentence_splitter(
documents: list[str], document_ids: list[str], chunk_size=256
):
chunk_overlap = min(chunk_size / 4, min(chunk_size / 2, 64))
chunks = []
node_parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
docs = [[Document(text=doc)] for doc in documents]
for doc in docs:
chunks += [node.text for node in node_parser(doc)]
for doc_id, doc in zip(document_ids, docs):
chunks += [
{"document_id": doc_id, "content": node.text} for node in node_parser(doc)
]
return chunks
Loading

0 comments on commit a245fba

Please sign in to comment.