Skip to content

Commit

Permalink
feat: Finish Superlinked implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
iusztinpaul committed Jul 13, 2024
1 parent e129b47 commit 1a60f06
Show file tree
Hide file tree
Showing 20 changed files with 566 additions and 297 deletions.
48 changes: 48 additions & 0 deletions .docker/Dockerfile.bytewax.superlinked
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Use an official Python runtime as a parent image
FROM python:3.11-slim-bullseye

ENV WORKSPACE_ROOT=/usr/src/app \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
POETRY_HOME="/opt/poetry" \
POETRY_NO_INTERACTION=1

RUN mkdir -p $WORKSPACE_ROOT

# Install system dependencies
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends build-essential \
gcc \
python3-dev \
curl \
build-essential \
&& apt-get clean

# Install Poetry
RUN curl -sSL https://install.python-poetry.org | python -

# Add Poetry to PATH
ENV PATH="$POETRY_HOME/bin:$PATH"

RUN apt-get remove -y curl

# Copy the pyproject.toml and poetry.lock files from the root directory
COPY ./pyproject.toml ./poetry.lock ./

# Install dependencies
RUN poetry config virtualenvs.create false && poetry install

# Set the working directory
WORKDIR $WORKSPACE_ROOT

# Copy the 3-feature-pipeline and any other necessary directories
COPY ./3-feature-pipeline .
COPY ./core ./core

# Set the PYTHONPATH environment variable
ENV PYTHONPATH=/usr/src/app

RUN chmod +x /usr/src/app/scripts/bytewax_entrypoint.sh

# Command to run the Bytewax pipeline script
CMD ["/usr/src/app/scripts/bytewax_entrypoint.sh"]
8 changes: 4 additions & 4 deletions .docker/Dockerfile.crawlers
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ FROM public.ecr.aws/lambda/python:3.11 as build

# Install chrome driver and browser
RUN yum install -y unzip && \
curl -Lo "/tmp/chromedriver.zip" "https://chromedriver.storage.googleapis.com/114.0.5735.90/chromedriver_linux64.zip" && \
curl -Lo "/tmp/chrome-linux.zip" "https://www.googleapis.com/download/storage/v1/b/chromium-browser-snapshots/o/Linux_x64%2F1135561%2Fchrome-linux.zip?alt=media" && \
curl -Lo "/tmp/chromedriver.zip" "https://storage.googleapis.com/chrome-for-testing-public/126.0.6478.126/linux64/chromedriver-linux64.zip" && \
curl -Lo "/tmp/chrome-linux.zip" "https://storage.googleapis.com/chrome-for-testing-public/126.0.6478.126/linux64/chrome-linux64.zip" && \
unzip /tmp/chromedriver.zip -d /opt/ && \
unzip /tmp/chrome-linux.zip -d /opt/

Expand Down Expand Up @@ -41,8 +41,8 @@ RUN yum install -y \
libpq-dev


COPY --from=build /opt/chrome-linux /opt/chrome
COPY --from=build /opt/chromedriver /opt/
COPY --from=build /opt/chrome-linux64 /opt/chrome
COPY --from=build /opt/chromedriver-linux64 /opt/

COPY ./pyproject.toml ./poetry.lock ./

Expand Down
3 changes: 1 addition & 2 deletions 1-data-crawling/crawlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from abc import ABC, abstractmethod
from tempfile import mkdtemp

from db.documents import BaseDocument
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

from db.documents import BaseDocument


class BaseCrawler(ABC):
model: type[BaseDocument]
Expand Down
3 changes: 2 additions & 1 deletion 2-data-ingestion/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from bson import json_util
from mq import publish_to_rabbitmq

from config import settings
from db import MongoDatabaseConnector

# Configure logging
Expand Down Expand Up @@ -33,7 +34,7 @@ def stream_process():
logging.info(f"Change detected and serialized: {data}")

# Send data to rabbitmq
publish_to_rabbitmq(queue_name="default", data=data)
publish_to_rabbitmq(queue_name=settings.RABBITMQ_QUEUE_NAME, data=data)
logging.info("Data published to RabbitMQ.")

except Exception as e:
Expand Down
1 change: 1 addition & 0 deletions 2-data-ingestion/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Settings(BaseSettings):
RABBITMQ_PORT: int = 5673 # Port mapped in Docker Compose
RABBITMQ_DEFAULT_USERNAME: str = "guest" # Default username
RABBITMQ_DEFAULT_PASSWORD: str = "guest" # Default password
RABBITMQ_QUEUE_NAME: str = "default"


settings = Settings()
1 change: 1 addition & 0 deletions 3-feature-pipeline/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Settings(BaseSettings):
RABBITMQ_DEFAULT_PASSWORD: str = "guest"
RABBITMQ_HOST: str = "localhost"
RABBITMQ_PORT: int = 5673
RABBITMQ_QUEUE_NAME: str = "default"

# QdrantDB config
QDRANT_DATABASE_HOST: str = "localhost"
Expand Down
17 changes: 13 additions & 4 deletions 3-feature-pipeline/data_flow/stream_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
from typing import Generic, Iterable, List, Optional, TypeVar

from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition
from config import settings
from utils.logging import get_logger

from mq import RabbitMQConnection

logger = get_logger(__name__)

DataT = TypeVar("DataT")
MessageT = TypeVar("MessageT")

Expand All @@ -24,9 +28,14 @@ def __init__(self, queue_name: str, resume_state: MessageT | None = None) -> Non
self.channel = self.connection.get_channel()

def next_batch(self, sched: Optional[datetime]) -> Iterable[DataT]:
method_frame, header_frame, body = self.channel.basic_get(
queue=self.queue_name, auto_ack=False
)
try:
method_frame, header_frame, body = self.channel.basic_get(
queue=self.queue_name, auto_ack=False
)
except Exception as e:
logger.warning(f"Error while fetching message from queue.", queue_name=self.queue_name)
return []

if method_frame:
message_id = method_frame.delivery_tag
self._in_flight_msg_ids.add(message_id)
Expand Down Expand Up @@ -55,4 +64,4 @@ def list_parts(self) -> List[str]:
def build_part(
self, now: datetime, for_part: str, resume_state: MessageT | None = None
) -> StatefulSourcePartition[DataT, MessageT]:
return RabbitMQPartition(queue_name="test_queue")
return RabbitMQPartition(queue_name=settings.RABBITMQ_QUEUE_NAME)
149 changes: 0 additions & 149 deletions 6-superlinked-rag/_main.py

This file was deleted.

10 changes: 2 additions & 8 deletions 6-superlinked-rag/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@ class Settings(BaseSettings):
COMET_PROJECT: str | None = None

# Embeddings config
EMBEDDING_MODEL_ID: str = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_MODEL_MAX_INPUT_LENGTH: int = 256
EMBEDDING_SIZE: int = 384
EMBEDDING_MODEL_DEVICE: str = "cpu"

# OpenAI
OPENAI_MODEL_ID: str = "gpt-4-1106-preview"
OPENAI_API_KEY: str | None = None
EMBEDDING_MODEL_ID: str = "sentence-transformers/all-mpnet-base-v2"

# MQ config
RABBITMQ_DEFAULT_USERNAME: str = "guest"
RABBITMQ_DEFAULT_PASSWORD: str = "guest"
RABBITMQ_HOST: str = "localhost"
RABBITMQ_PORT: int = 5673
RABBITMQ_QUEUE_NAME: str = "default"


settings = Settings()
Loading

0 comments on commit 1a60f06

Please sign in to comment.