Skip to content

Commit

Permalink
handleEvent to async to deal with increased firehose volume (bluesky-…
Browse files Browse the repository at this point in the history
…social#81)

* speed up subscription by calling handleEvent async

Moving error handling to src/subscription, this will allow handleEvent to occur in parallel

* Error handling in handleEvent

Verifying getOpsByType within the function now that handleEvent is managed async

* simplify exception handling

* dependency updates

* remove semis

---------

Co-authored-by: Hailey <[email protected]>
Co-authored-by: Daniel Holmgren <[email protected]>
  • Loading branch information
3 people authored Oct 31, 2024
1 parent 5f2d936 commit dcaa673
Show file tree
Hide file tree
Showing 3 changed files with 759 additions and 397 deletions.
8 changes: 7 additions & 1 deletion src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
export class FirehoseSubscription extends FirehoseSubscriptionBase {
async handleEvent(evt: RepoEvent) {
if (!isCommit(evt)) return
const ops = await getOpsByType(evt)

const ops = await getOpsByType(evt).catch(e => {
console.error('repo subscription could not handle message', e)
return undefined
})

if (!ops) return

// This logs the text of every post off the firehose.
// Just for fun :)
Expand Down
6 changes: 1 addition & 5 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ export abstract class FirehoseSubscriptionBase {
async run(subscriptionReconnectDelay: number) {
try {
for await (const evt of this.sub) {
try {
await this.handleEvent(evt)
} catch (err) {
console.error('repo subscription could not handle message', err)
}
this.handleEvent(evt)
// update stored cursor every 20 events or so
if (isCommit(evt) && evt.seq % 20 === 0) {
await this.updateCursor(evt.seq)
Expand Down
Loading

0 comments on commit dcaa673

Please sign in to comment.