Skip to content

Commit

Permalink
Implement a real sync store (#34)
Browse files Browse the repository at this point in the history
* Refresh the contributing guide

* Sync store groundwork

* Introduce working sync store + unit tests

* Run tests on all branches on push

* Formatting

* Properly close the client and sync store on shutdown

Also fixes CI never finishing

* Add context manager for sync store

This allows start() to always call SyncStore.close(), even if the termination is abrupt.

* Log when the sync store is managed via the context manager

* typing.Self isn't in <3.11

* Log which rooms are added to the sync payload for replay

* perform an upsert instead of INSERT/REPLACE for set_next_batch

* Update the sync store test

* Prevent logging shoving megabytes into the logs in debug mode

* Properly close the db

* Prevent handle_sync spewing the entire sync into logs

* Why did start()'s initial sync have a timeout?

* Fix timeline/state insertion not inserting the raw event

* Fix get_next_batch & prevent handle_sync still spamming logs

* Add state and timeline events incrementally even on join

* Fix keyerror

* Fix mismatched logging arguments

* Use BLOBs for event storage rather than text

Also uses orjson for super fast JSON parsing (which should help in large rooms where states are huge, like Matrix HQ)

* Don't incrementally add state events on first join

* Fix sync replay holding invalid formats

* Don't fail with an invalid event with no source

* Reformat

* Fix source fallback falling back to the dataclass

* Automatically checkpoint every 25 changes

* Fix autosaving after every sync after 25 total changes

* Reformat

* Reject bad events in insert_{state,timeline}_event

* reformatting
  • Loading branch information
nexy7574 authored Nov 23, 2024
1 parent 66cabb2 commit 875faae
Show file tree
Hide file tree
Showing 9 changed files with 688 additions and 59 deletions.
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ however that is preferrable to errors.
* Fixed some faulty sync responses triggering commands twice
* Fixed a bug in the default help command that would display hidden commands regardless.
* Removed fallback replies in messages (see: [MSC2781](https://github.com/matrix-org/matrix-spec-proposals/pull/2781))
* Added a real sync store (**huge** optimisation, especially for larger accounts)
* Removed the legacy function-based argument parsers in favour of the class-based system

## v1.1.1 (2024-06-26)
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ blurhash-python~=1.2
pillow>=9.3.0
beautifulsoup4~=4.12
pydantic~=2.9
aiosqlite~=0.20
orjson~=3.10
143 changes: 84 additions & 59 deletions src/niobot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
MessageException,
NioBotException,
)
from .utils import MXID_REGEX, Mentions, Typing, deprecated, force_await, run_blocking
from .utils import MXID_REGEX, Mentions, SyncStore, Typing, _DudSyncStore, deprecated, force_await, run_blocking
from .utils.help_command import DefaultHelpCommand

try:
Expand Down Expand Up @@ -135,6 +135,7 @@ def __init__(
default_parse_mentions: bool = True,
force_initial_sync: bool = False,
use_fallback_replies: bool = False,
onsite_state_resolution: bool = False,
):
if user_id == owner_id and ignore_self is True:
warnings.warn(
Expand Down Expand Up @@ -278,6 +279,10 @@ def __init__(
self._event_id_cache = collections.deque(maxlen=1000)
self._message_process_lock = asyncio.Lock()

self.sync_store: typing.Union[SyncStore, _DudSyncStore] = _DudSyncStore()
if self.store_path:
self.sync_store = SyncStore(self, self.store_path + "/sync.db", resolve_state=onsite_state_resolution)

@property
def supported_server_versions(self) -> typing.List[typing.Tuple[int, int, int]]:
"""
Expand Down Expand Up @@ -328,6 +333,8 @@ async def sync(self, *args, **kwargs) -> U[nio.SyncResponse, nio.SyncError]:
sync = await super().sync(*args, **kwargs)
if isinstance(sync, nio.SyncResponse):
self._populate_dm_rooms(sync)
if self.sync_store:
await self.sync_store.handle_sync(sync)
return sync

def _populate_dm_rooms(self, sync: nio.SyncResponse):
Expand Down Expand Up @@ -1298,6 +1305,11 @@ async def redact_reaction(self, room: U[nio.MatrixRoom, str], reaction: U[nio.Ro
raise MessageException("Failed to delete reaction.", response)
return response

async def close(self):
if self.sync_store:
await self.sync_store.close()
await super().close()

async def start(
self,
password: typing.Optional[str] = None,
Expand All @@ -1311,67 +1323,80 @@ async def start(
self.log.info("Starting automatic key import")
await self.import_keys(*map(str, self.__key_import))

if password or sso_token:
self.log.info("Logging in with a password or SSO token")
login_response = await self.login(password=password, token=sso_token, device_name=self.device_id)
if isinstance(login_response, nio.LoginError):
raise LoginException("Failed to log in.", login_response)

self.log.info("Logged in as %s", login_response.user_id)
self.log.debug("Logged in: {0.access_token}, {0.user_id}".format(login_response))
self.start_time = time.time()
elif access_token:
self.log.info("Logging in with existing access token.")
if self.store_path:
try:
self.load_store()
except FileNotFoundError:
self.log.warning("Failed to load store.")
except nio.LocalProtocolError as e:
self.log.warning("No store? %r", e, exc_info=e)
self.access_token = access_token
self.start_time = time.time()
else:
raise LoginException("You must specify either a password/SSO token or an access token.")
async with self.sync_store:
if password or sso_token:
self.log.info("Logging in with a password or SSO token")
login_response = await self.login(password=password, token=sso_token, device_name=self.device_id)
if isinstance(login_response, nio.LoginError):
raise LoginException("Failed to log in.", login_response)

self.log.info("Logged in as %s", login_response.user_id)
self.log.debug("Logged in: {0.access_token}, {0.user_id}".format(login_response))
self.start_time = time.time()
elif access_token:
self.log.info("Logging in with existing access token.")
if self.store_path:
try:
self.load_store()
except FileNotFoundError:
self.log.warning("Failed to load store.")
except nio.LocalProtocolError as e:
self.log.warning("No store? %r", e, exc_info=e)
self.access_token = access_token
self.start_time = time.time()
else:
raise LoginException("You must specify either a password/SSO token or an access token.")

if self.should_upload_keys:
self.log.info("Uploading encryption keys...")
response = await self.keys_upload()
if isinstance(response, nio.KeysUploadError):
self.log.critical("Failed to upload encryption keys. Encryption may not work. Error: %r", response)
if self.should_upload_keys:
self.log.info("Uploading encryption keys...")
response = await self.keys_upload()
if isinstance(response, nio.KeysUploadError):
self.log.critical("Failed to upload encryption keys. Encryption may not work. Error: %r", response)
else:
self.log.info("Uploaded encryption keys.")
self.log.info("Fetching server details...")
response = await self.send("GET", "/_matrix/client/versions")
if response.status != 200:
self.log.warning("Failed to fetch server details. Status: %d", response.status)
else:
self.log.info("Uploaded encryption keys.")
self.log.info("Fetching server details...")
response = await self.send("GET", "/_matrix/client/versions")
if response.status != 200:
self.log.warning("Failed to fetch server details. Status: %d", response.status)
else:
self.server_info = await response.json()
self.log.debug("Server details: %r", self.server_info)
self.log.info("Performing first sync...")
self.server_info = await response.json()
self.log.debug("Server details: %r", self.server_info)

def presence_getter(stage: int) -> Optional[str]:
if self._startup_presence is False:
return
elif self._startup_presence is None:
return ("unavailable", "online")[stage]
return self._startup_presence

result = await self.sync(timeout=30000, full_state=self._sync_full_state, set_presence=presence_getter(0))
if not isinstance(result, nio.SyncResponse):
raise NioBotException("Failed to perform first sync.", result)
self.is_ready.set()
self.dispatch("ready", result)
self.log.info("Starting sync loop")
try:
await self.sync_forever(
timeout=30000,
full_state=self._sync_full_state,
set_presence=presence_getter(1),
)
finally:
self.log.info("Closing http session.")
await self.close()
if self.sync_store:
self.log.info("Resuming from sync store...")
try:
payload = await self.sync_store.generate_sync()
assert isinstance(payload, nio.SyncResponse), "Sync store did not return a SyncResponse."
self.log.info("Replaying sync...")
await self._handle_sync(payload)
self.log.info("Successfully resumed from store.")
except Exception as e:
self.log.error("Failed to replay sync: %r. Will not resume.", e, exc_info=e)

self.log.info("Performing first sync...")

def presence_getter(stage: int) -> Optional[str]:
if self._startup_presence is False:
return
elif self._startup_presence is None:
return ("unavailable", "online")[stage]
return self._startup_presence

result = await self.sync(timeout=0, full_state=self._sync_full_state, set_presence=presence_getter(0))
if not isinstance(result, nio.SyncResponse):
raise NioBotException("Failed to perform first sync.", result)
self.is_ready.set()
self.dispatch("ready", result)
self.log.info("Starting sync loop")
try:
await self.sync_forever(
timeout=30000,
full_state=self._sync_full_state,
set_presence=presence_getter(1),
)
finally:
self.log.info("Closing http session.")
await self.close()

def run(
self,
Expand Down
1 change: 1 addition & 0 deletions src/niobot/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
from .mentions import Mentions as Mentions
from .parsers import *
from .string_view import *
from .sync_store import *
from .typing import *
from .unblocking import *
Loading

0 comments on commit 875faae

Please sign in to comment.