Skip to content

Commit

Permalink
feat: Finish Superlinked ingestion & server logic
Browse files Browse the repository at this point in the history
  • Loading branch information
iusztinpaul committed Jul 17, 2024
1 parent e9b32e2 commit 55c8b70
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 498 deletions.
19 changes: 13 additions & 6 deletions 3-feature-pipeline/data_flow/stream_input.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
from datetime import datetime
import time
from typing import Generic, Iterable, List, Optional, TypeVar

from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition
from config import settings
from utils.logging import get_logger

from mq import RabbitMQConnection
from utils.logging import get_logger

logger = get_logger(__name__)

Expand All @@ -30,12 +30,19 @@ def __init__(self, queue_name: str, resume_state: MessageT | None = None) -> Non
def next_batch(self, sched: Optional[datetime]) -> Iterable[DataT]:
try:
method_frame, header_frame, body = self.channel.basic_get(
queue=self.queue_name, auto_ack=False
queue=self.queue_name, auto_ack=True
)
except Exception:
logger.error(
f"Error while fetching message from queue.", queue_name=self.queue_name
)
except Exception as e:
logger.warning(f"Error while fetching message from queue.", queue_name=self.queue_name)
time.sleep(10) # Sleep for 10 seconds before retrying to access the queue.

self.connection.connect()
self.channel = self.connection.get_channel()

return []

if method_frame:
message_id = method_frame.delivery_tag
self._in_flight_msg_ids.add(message_id)
Expand Down
37 changes: 34 additions & 3 deletions 6-bonus-superlinked-rag/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# Install Local Setup
# Dependencies

- Docker
- Poetry
- PyEnv
- Python
- GNU Make

# Install

## 1. Start the Superlinked server

Expand All @@ -8,7 +16,7 @@ brew update
brew install pyenv
```

Now, let's the Superlinked server by running the following commands:
Now, let's start the Superlinked server by running the following commands:
```shell
# Create a virtual environment and install all necesary dependencies to deploy the server.
cd 6-bonus-superlinked-rag/server
Expand All @@ -21,19 +29,42 @@ cd ..
./tools/deploy.py up
```

> [NOTE!]
> [!NOTE]
> After the server started, you can check out it works and also it's API at http://localhost:8080/docs/
You can test that the Superlinked server started successfully by running the following command from the `root directory` of the `llm-twin-course`:
```
make test-superlinked-server
```
You should see that some mock data has been sent to the Superlinked server and it was queried successfully.

## 2. Start the rest of the infrastructure

From the root of the repository, run the following to start all necessary components to run locally the LLM twin project powered by Superlinked:
```shell
make local-start-superlinked
```

> [!IMPORTANT]
> Before starting, make sure that you have your `.env` file filled with everything required to run the system.
To stop the local infrastructure, run:
```shell
make local-stop-superlinked
```

> [!NOTE]
> After running the ingestion pipeline, you can visualize what's inside the Redis vector DB at http://localhost:8001/redis-stack/browser
To trigger the ingestion, run:
```shell
make local-test-medium
# OR
make local-test-github
```
You can use other Medium or GitHub links to populate the vector DB with more data.

To query the vector DB, run:
```shell
make ... # TO BE ADDED
```
2 changes: 1 addition & 1 deletion 6-bonus-superlinked-rag/data_flow/stream_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def next_batch(self, sched: Optional[datetime]) -> Iterable[DataT]:
f"Error while fetching message from queue.", queue_name=self.queue_name
)
time.sleep(10) # Sleep for 10 seconds before retrying to access the queue.

self.connection.connect()
self.channel = self.connection.get_channel()

Expand Down
4 changes: 2 additions & 2 deletions 6-bonus-superlinked-rag/data_flow/stream_output.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from models.clean import CleanedModel
from models.documents import Document
from superlinked_client import SuperlinkedClient
from tqdm import tqdm
from utils.logging import get_logger
Expand All @@ -19,7 +19,7 @@ class SuperlinkedSinkPartition(StatelessSinkPartition):
def __init__(self, client: SuperlinkedClient):
self._client = client

def write_batch(self, items: list[CleanedModel]) -> None:
def write_batch(self, items: list[Document]) -> None:
for item in tqdm(items, desc="Sending items to Superlinked..."):
match item.type:
case "repositories":
Expand Down
28 changes: 11 additions & 17 deletions 6-bonus-superlinked-rag/data_logic/cleaning_data_handlers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
from abc import ABC, abstractmethod

from models.clean import (
ArticleCleanedModel,
CleanedModel,
PostCleanedModel,
RepositoryCleanedModel,
)
from models.raw import ArticleRawModel, PostsRawModel, RawModel, RepositoryRawModel
from models.documents import ArticleDocument, Document, PostDocument, RepositoryDocument
from models.raw import ArticleRawModel, PostsRawModel, RawModel, RepositoryRawModel
from utils.cleaning import clean_text

Expand All @@ -18,41 +12,41 @@ class CleaningDataHandler(ABC):
"""

@abstractmethod
def clean(self, data_model: RawModel) -> CleanedModel:
def clean(self, data_model: RawModel) -> Document:
pass


class PostCleaningHandler(CleaningDataHandler):
def clean(self, data_model: PostsRawModel) -> PostCleanedModel:
return PostCleanedModel(
def clean(self, data_model: PostsRawModel) -> PostDocument:
return PostDocument(
id=data_model.id,
platform=data_model.platform,
cleaned_content=clean_text("".join(data_model.content.values())),
content=clean_text("".join(data_model.content.values())),
author_id=data_model.author_id,
type=data_model.type,
)


class ArticleCleaningHandler(CleaningDataHandler):
def clean(self, data_model: ArticleRawModel) -> ArticleCleanedModel:
return ArticleCleanedModel(
def clean(self, data_model: ArticleRawModel) -> ArticleDocument:
return ArticleDocument(
id=data_model.id,
platform=data_model.platform,
link=data_model.link,
cleaned_content=clean_text("".join(data_model.content.values())),
content=clean_text("".join(data_model.content.values())),
author_id=data_model.author_id,
type=data_model.type,
)


class RepositoryCleaningHandler(CleaningDataHandler):
def clean(self, data_model: RepositoryRawModel) -> RepositoryCleanedModel:
return RepositoryCleanedModel(
def clean(self, data_model: RepositoryRawModel) -> RepositoryDocument:
return RepositoryDocument(
id=data_model.id,
platform=data_model.platform,
name=data_model.name,
link=data_model.link,
cleaned_content=clean_text("".join(data_model.content.values())),
content=clean_text("".join(data_model.content.values())),
author_id=data_model.owner_id,
type=data_model.type,
)
6 changes: 3 additions & 3 deletions 6-bonus-superlinked-rag/data_logic/dispatchers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from models.clean import CleanedModel
from models.documents import Document
from models.raw import ArticleRawModel, PostsRawModel, RawModel, RepositoryRawModel
from utils.logging import get_logger

Expand Down Expand Up @@ -46,7 +46,7 @@ class CleaningDispatcher:
cleaning_factory = CleaningHandlerFactory()

@classmethod
def dispatch_cleaner(cls, data_model: RawModel) -> CleanedModel:
def dispatch_cleaner(cls, data_model: RawModel) -> Document:
logger.info("Cleaning data.", data_type=data_model.type)

data_type = data_model.type
Expand All @@ -56,7 +56,7 @@ def dispatch_cleaner(cls, data_model: RawModel) -> CleanedModel:
logger.info(
"Data cleaned successfully.",
data_type=data_type,
cleaned_content_len=len(clean_model.cleaned_content),
content_len=len(clean_model.content),
)

return clean_model
53 changes: 0 additions & 53 deletions 6-bonus-superlinked-rag/dummy_requests.py

This file was deleted.

Loading

0 comments on commit 55c8b70

Please sign in to comment.