Skip to content

Commit

Permalink
feat: datasets streaming export
Browse files Browse the repository at this point in the history
  • Loading branch information
dartt0n committed Jul 21, 2024
1 parent 2a1f4af commit c3c06d7
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 11 deletions.
14 changes: 11 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from dotenv import load_dotenv

from src.adapter.external.auth.telegram import TelegramOauthAdapter
from src.adapter.internal.database.memorydb.service import MemoryDBAdapter

# from src.adapter.internal.database.memorydb.service import MemoryDBAdapter
from src.adapter.internal.database.mongodb.service import MongoDBAdapter
from src.app.http.server import build_server
from src.service.allocation import AllocationService
from src.service.answer import AnswerService
Expand All @@ -18,8 +20,8 @@
async def app():
load_dotenv()

# repo = await MongoDBAdapter.create("mongodb://localhost:27017/randorm")
repo = MemoryDBAdapter()
repo = await MongoDBAdapter.create("mongodb://localhost:27017/randorm")
# repo = MemoryDBAdapter()

secret_token = os.getenv("SECRET_TOKEN")
if secret_token is None:
Expand All @@ -29,7 +31,12 @@ async def app():
if jwt_secret is None:
raise RuntimeError("JWT_SECRET is not set")

service_secret_key = os.getenv("SERVICE_SECRET_KEY")
if service_secret_key is None:
raise RuntimeError("SERVICE_SECRET_KEY is not set")

user_service = UserService(repo)

allocation_service = AllocationService(
allocation_repo=repo,
form_field_repo=repo,
Expand Down Expand Up @@ -63,6 +70,7 @@ async def app():
oauth_adapter = TelegramOauthAdapter(secret_token, jwt_secret, user_service)

return build_server(
service_secret_key,
user_service,
answer_service,
allocation_service,
Expand Down
5 changes: 4 additions & 1 deletion src/adapter/external/graphql/operation/form_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ class ChoiceOptionInput: ...
def choice_option_to_domain_list(
options: Iterable[ChoiceOptionInput],
) -> list[domain.ChoiceOption]: # type: ignore
return [domain.ChoiceOption.model_validate(option) for option in options]
return [
domain.ChoiceOption.model_validate(option, from_attributes=True)
for option in options
]


@sb.experimental.pydantic.input(model=proto.UpdateChoiceOption, all_fields=True)
Expand Down
29 changes: 28 additions & 1 deletion src/adapter/external/graphql/tool/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
if TYPE_CHECKING:
from src.adapter.external.graphql.tool.context import Context
from src.adapter.external.graphql.type.allocation import AllocationType
from src.adapter.external.graphql.type.form_field import FormFieldType
from src.adapter.external.graphql.type.form_field import (
ChoiceOptionType,
FormFieldType,
)
from src.adapter.external.graphql.type.participant import ParticipantType
from src.adapter.external.graphql.type.room import RoomType
from src.adapter.external.graphql.type.user import UserType


LazyFormFieldType = Annotated[
"FormFieldType", # type: ignore
sb.lazy(module_path="src.adapter.external.graphql.type.form_field"),
Expand Down Expand Up @@ -42,6 +46,11 @@
sb.lazy(module_path="src.adapter.external.graphql.type.participant"),
]

LazyChoiceOptionType = Annotated[
"ChoiceOptionType", # type: ignore
sb.lazy(module_path="src.adapter.external.graphql.type.form_field"),
]


class WithFormFields(Protocol):
form_fields_ids: list[scalar.ObjectID]
Expand Down Expand Up @@ -173,3 +182,21 @@ async def load_target(
info: sb.Info[LazyContext, WithTarget],
) -> LazyUserType:
return await info.context.user.loader.load(root.target_id)


class WithOptions(Protocol):
form_field_id: scalar.ObjectID
option_indexes: list[scalar.ObjectID]


async def load_options(
root: WithOptions,
info: sb.Info[LazyContext, WithOptions],
) -> list[LazyChoiceOptionType]:
form_field = await info.context.form_field.loader.load(root.form_field_id)

selected = []
for option_idx in root.option_indexes:
selected.append(form_field.options[option_idx])

return selected
2 changes: 1 addition & 1 deletion src/adapter/external/graphql/type/form_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ChoiceAnswerType(BaseAnswerType):
kind: FormFieldKindType = sb.field(default=FormFieldKindType.CHOICE) # type: ignore

option_indexes: list[int]
options: sb.Private[list[ChoiceOptionType]]
options: list[ChoiceOptionType] = sb.field(resolver=resolver.load_options)


AnswerType = sb.union("AnswerType", types=(TextAnswerType, ChoiceAnswerType))
Expand Down
29 changes: 29 additions & 0 deletions src/adapter/internal/database/memorydb/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,19 @@ async def read_many_answers(
) -> list[domain.Answer | None]:
return [self._answer_collection.get(answer.id, None) for answer in answers]

async def read_all_answers(self) -> list[domain.Answer]:
try:
answers = self._answer_collection.values()
return [domain.AnswerResolver.validate_python(answer) for answer in answers]
except (ValidationError, AttributeError) as e:
raise exception.ReflectAnswerException(
f"failed to reflect answer type with error: {e}"
) from e
except Exception as e:
raise exception.ReadAnswerException(
f"failed to read all answers with error: {e}"
) from e

async def create_user(
self,
user: proto.CreateUser,
Expand Down Expand Up @@ -989,6 +1002,22 @@ async def read_many_participants(
for participant in participants
]

async def read_all_participants(self) -> list[domain.Participant]:
try:
participants = self._participant_collection.values()
return [
domain.ParticipantResolver.validate_python(participant)
for participant in participants
]
except (ValidationError, AttributeError) as e:
raise exception.ReflectParticipantException(
f"failed to reflect participant type with error: {e}"
) from e
except Exception as e:
raise exception.ReadParticipantException(
f"failed to read all participants with error: {e}"
) from e

async def create_preference(
self,
preference: proto.CreatePreference,
Expand Down
50 changes: 46 additions & 4 deletions src/adapter/internal/database/mongodb/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ class MongoDBAdapter(
):
_client: AsyncIOMotorClient

def __init__(self):
raise ValueError(
"MongoDBAdapter should be initialized with MongoDBAdapter.create method"
)
def __init__(self): ...

@classmethod
async def create(cls, dsn: str, **client_args: Any):
Expand Down Expand Up @@ -778,6 +775,28 @@ async def read_many_answers(
f"failed to read answers with ids {ids} with error: {e}"
) from e

async def read_all_answers(self) -> list[domain.Answer]:
try:
log.debug("reading all answers")
documents = await models.AnswerDocument.find_many(
{}, with_children=True
).to_list()
log.info(f"read {len(documents)} answers")
return [
domain.AnswerResolver.validate_python(document)
for document in documents
]
except (ValidationError, AttributeError) as e:
log.error("failed to reflect answer type with error: {}", e)
raise exception.ReflectAnswerException(
f"failed to reflect answer type with error: {e}"
) from e
except Exception as e:
log.error("failed to read all answers with error: {}", e)
raise exception.ReadAnswerException(
f"failed to read all answers with error: {e}"
) from e

async def create_user(
self,
user: proto.CreateUser,
Expand Down Expand Up @@ -1416,6 +1435,29 @@ async def read_many_participants(
f"failed to read participants with ids {ids} with error: {e}"
) from e

async def read_all_participants(self) -> list[domain.Participant]:
try:
log.debug("reading all participants")
documents = await models.ParticipantDocument.find_many(
{}, with_children=True
).to_list()

log.info(f"read {len(documents)} participants")
return [
domain.ParticipantResolver.validate_python(document)
for document in documents
]
except (ValidationError, AttributeError) as e:
log.error("failed to reflect participant type with error: {}", e)
raise exception.ReflectParticipantException(
f"failed to reflect participant type with error: {e}"
) from e
except Exception as e:
log.error("failed to read all participants with error: {}", e)
raise exception.ReadParticipantException(
f"failed to read all participants with error: {e}"
) from e

async def create_preference(
self, preference: proto.CreatePreference
) -> domain.Preference:
Expand Down
113 changes: 113 additions & 0 deletions src/app/http/routes/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import ujson
from aiohttp import web

from src.protocol.internal.database.user import ReadUser
from src.service.answer import AnswerService
from src.service.participant import ParticipantService
from src.service.user import UserService
from src.utils.logger.logger import Logger

log = Logger("dataset-router")


class DatasetRouter:
_user_form_redirect_url: str

def __init__(
self,
secret_key: str,
answer_service: AnswerService,
participant_service: ParticipantService,
user_service: UserService,
):
self._secret_key = secret_key
self._answer_service = answer_service
self._participant_service = participant_service
self._user_service = user_service

def regiter_routers(self, app: web.Application):
app.add_routes(
[
web.get(
"/private/dataset/answers",
self.answer_handler,
name="dataset_answers_router",
),
web.get(
"/private/dataset/participants",
self.participants_handler,
name="dataset_participants_router",
),
]
)

async def answer_handler(self, request: web.Request) -> web.Response:
secret_key = request.headers.get("X-Secret-Key")
if secret_key is None or secret_key != self._secret_key:
log.error("invalid secret key")
return web.Response(status=403)

try:
answers = await self._answer_service.read_all()

response = web.StreamResponse(
status=200,
reason="OK",
headers={"Content-Type": "application/json"},
)
await response.prepare(request)

await response.write(b"[")
for i, answer in enumerate(answers):
data = answer.model_dump(mode="json")

# TODO: recommendational models do not support multiple option_indexes
data["option_indexes"] = data["option_indexes"][0]

await response.write(ujson.dumps(data, ensure_ascii=False).encode())

if i != len(answers) - 1:
await response.write(b",")

await response.write(b"]")
await response.write_eof()
return response # type: ignore
except Exception as e:
return web.Response(status=500, text=str(e))

async def participants_handler(self, request: web.Request) -> web.Response:
secret_key = request.headers.get("X-Secret-Key")
if secret_key is None or secret_key != self._secret_key:
log.error("invalid secret key")
return web.Response(status=403)

try:
participants = await self._participant_service.read_all()
user_read_request = [
ReadUser(_id=participant.user_id) for participant in participants
]
users = await self._user_service.read_many(user_read_request)

response = web.StreamResponse(
status=200,
reason="OK",
headers={"Content-Type": "application/json"},
)
await response.prepare(request)

await response.write(b"[")

for user_idx, participant in enumerate(participants):
data = participant.model_dump(mode="json")
data["gender"] = users[user_idx].profile.gender

await response.write(ujson.dumps(data, ensure_ascii=False).encode())

if user_idx != len(participants) - 1:
await response.write(b",")

await response.write(b"]")
await response.write_eof()
return response # type: ignore
except Exception as e:
return web.Response(status=500, text=str(e))
10 changes: 9 additions & 1 deletion src/app/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from src.adapter.external.graphql.schema import SCHEMA
from src.adapter.external.graphql.view import RandormGraphQLView
from src.app.http.routes import oauth
from src.app.http.routes import dataset, oauth
from src.protocol.external.auth.oauth import OauthProtocol
from src.service.allocation import AllocationService
from src.service.answer import AnswerService
Expand All @@ -14,6 +14,7 @@


def build_server(
service_secret_key: str,
user_service: UserService,
answer_service: AnswerService,
allocation_service: AllocationService,
Expand All @@ -32,6 +33,13 @@ def build_server(
service=user_service,
).regiter_routers(app)

dataset.DatasetRouter(
secret_key=service_secret_key,
answer_service=answer_service,
user_service=user_service,
participant_service=participant_service,
).regiter_routers(app)

app.router.add_route(
"*",
"/graphql",
Expand Down
3 changes: 3 additions & 0 deletions src/protocol/internal/database/form_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,6 @@ async def delete_answer(self, answer: DeleteAnswer) -> Answer: ...
async def read_many_answers(
self, answers: list[ReadAnswer]
) -> list[Answer | None]: ...

@abstractmethod
async def read_all_answers(self) -> list[Answer]: ...
3 changes: 3 additions & 0 deletions src/protocol/internal/database/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ async def delete_participant(
async def read_many_participants(
self, participants: list[ReadParticipant]
) -> list[domain.Participant | None]: ...

@abstractmethod
async def read_all_participants(self) -> list[domain.Participant]: ...
Loading

0 comments on commit c3c06d7

Please sign in to comment.