forked from decodingml/llm-twin-course
-
Notifications
You must be signed in to change notification settings - Fork 0
/
documents.py
127 lines (93 loc) · 3.43 KB
/
documents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import uuid
from typing import List, Optional
import core.logger_utils as logger_utils
from pydantic import UUID4, BaseModel, ConfigDict, Field
from pymongo import errors
from core.db.errors import ImproperlyConfigured
from core.db.mongo import connection
_database = connection.get_database("scrabble")
logger = logger_utils.get_logger(__name__)
class BaseDocument(BaseModel):
id: UUID4 = Field(default_factory=uuid.uuid4)
model_config = ConfigDict(from_attributes=True, populate_by_name=True)
@classmethod
def from_mongo(cls, data: dict):
"""Convert "_id" (str object) into "id" (UUID object)."""
if not data:
return data
id = data.pop("_id", None)
return cls(**dict(data, id=id))
def to_mongo(self, **kwargs) -> dict:
"""Convert "id" (UUID object) into "_id" (str object)."""
exclude_unset = kwargs.pop("exclude_unset", False)
by_alias = kwargs.pop("by_alias", True)
parsed = self.model_dump(
exclude_unset=exclude_unset, by_alias=by_alias, **kwargs
)
if "_id" not in parsed and "id" in parsed:
parsed["_id"] = str(parsed.pop("id"))
return parsed
def save(self, **kwargs):
collection = _database[self._get_collection_name()]
try:
result = collection.insert_one(self.to_mongo(**kwargs))
return result.inserted_id
except errors.WriteError:
logger.exception("Failed to insert document.")
return None
@classmethod
def get_or_create(cls, **filter_options) -> Optional[str]:
collection = _database[cls._get_collection_name()]
try:
instance = collection.find_one(filter_options)
if instance:
return str(cls.from_mongo(instance).id)
new_instance = cls(**filter_options)
new_instance = new_instance.save()
return new_instance
except errors.OperationFailure:
logger.exception("Failed to retrieve or create document.")
return None
@classmethod
def bulk_insert(cls, documents: List, **kwargs) -> Optional[List[str]]:
collection = _database[cls._get_collection_name()]
try:
result = collection.insert_many(
[doc.to_mongo(**kwargs) for doc in documents]
)
return result.inserted_ids
except errors.WriteError:
logger.exception("Failed to insert documents.")
return None
@classmethod
def _get_collection_name(cls):
if not hasattr(cls, "Settings") or not hasattr(cls.Settings, "name"):
raise ImproperlyConfigured(
"Document should define an Settings configuration class with the name of the collection."
)
return cls.Settings.name
class UserDocument(BaseDocument):
first_name: str
last_name: str
class Settings:
name = "users"
class RepositoryDocument(BaseDocument):
name: str
link: str
content: dict
owner_id: str = Field(alias="owner_id")
class Settings:
name = "repositories"
class PostDocument(BaseDocument):
platform: str
content: dict
author_id: str = Field(alias="author_id")
class Settings:
name = "posts"
class ArticleDocument(BaseDocument):
platform: str
link: str
content: dict
author_id: str = Field(alias="author_id")
class Settings:
name = "articles"