Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"version": "0.1.7",
"dependencies": {
"@standard-schema/spec": "^1.0.0",
"@tanstack/db-ivm": "workspace:*"
"@tanstack/db-ivm": "workspace:*",
"nanoevents": "^9.0.0"
},
"devDependencies": {
"@vitest/coverage-istanbul": "^3.0.9",
Expand Down
19 changes: 19 additions & 0 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createNanoEvents } from "nanoevents"
import { withArrayChangeTracking, withChangeTracking } from "./proxy"
import { SortedMap } from "./SortedMap"
import {
Expand Down Expand Up @@ -37,6 +38,7 @@ import {
UpdateKeyNotFoundError,
} from "./errors"
import { createFilteredCallback, currentStateAsChanges } from "./change-events"
import type { Emitter } from "nanoevents"
import type { Transaction } from "./transactions"
import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { SingleRowRefProxy } from "./query/builder/ref-proxy"
Expand Down Expand Up @@ -70,6 +72,10 @@ interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
deletedKeys: Set<string | number>
}

interface CollectionEvents {
subscriberCountChanged: (count: number) => void
}

/**
* Enhanced Collection interface that includes both data type T and utilities TUtils
* @template T - The type of items in the collection
Expand All @@ -85,6 +91,8 @@ export interface Collection<
TInsertInput extends object = T,
> extends CollectionImpl<T, TKey, TUtils, TSchema, TInsertInput> {
readonly utils: TUtils
readonly subscriberCount: number
readonly events: Emitter<CollectionEvents>
}

/**
Expand Down Expand Up @@ -308,6 +316,7 @@ export class CollectionImpl<
// Event system
private changeListeners = new Set<ChangeListener<T, TKey>>()
private changeKeyListeners = new Map<TKey, Set<ChangeListener<T, TKey>>>()
public events = createNanoEvents<CollectionEvents>()

// Utilities namespace
// This is populated by createCollection
Expand Down Expand Up @@ -412,6 +421,13 @@ export class CollectionImpl<
return this._status
}

/**
* Gets the current number of active subscribers
*/
public get subscriberCount(): number {
return this.activeSubscribersCount
}

/**
* Validates that the collection is in a usable state for data operations
* @private
Expand Down Expand Up @@ -770,6 +786,7 @@ export class CollectionImpl<
private addSubscriber(): void {
this.activeSubscribersCount++
this.cancelGCTimer()
this.events.emit(`subscriberCountChanged`, this.activeSubscribersCount)

// Start sync if collection was cleaned up
if (this._status === `cleaned-up` || this._status === `idle`) {
Expand All @@ -789,6 +806,8 @@ export class CollectionImpl<
} else if (this.activeSubscribersCount < 0) {
throw new NegativeActiveSubscribersError()
}

this.events.emit(`subscriberCountChanged`, this.activeSubscribersCount)
}

/**
Expand Down
42 changes: 39 additions & 3 deletions packages/query-db-collection/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,10 @@ export function queryCollectionOptions<
TQueryKey
>(queryClient, observerOptions)

const actualUnsubscribeFn = localObserver.subscribe((result) => {
let isSubscribed = false
let actualUnsubscribeFn: (() => void) | null = null

const handleQueryResult = (result: any) => {
if (result.isSuccess) {
const newItemsArray = result.data

Expand Down Expand Up @@ -581,10 +584,43 @@ export function queryCollectionOptions<
// Mark collection as ready even on error to avoid blocking apps
markReady()
}
})
}

const subscribeToQuery = () => {
if (!isSubscribed) {
actualUnsubscribeFn = localObserver.subscribe(handleQueryResult)
isSubscribed = true
}
}

const unsubscribeFromQuery = () => {
if (isSubscribed && actualUnsubscribeFn) {
actualUnsubscribeFn()
actualUnsubscribeFn = null
isSubscribed = false
}
}

// If startSync=true or there are subscribers to the collection, subscribe to the query straight away
if (config.startSync || collection.subscriberCount > 0) {
subscribeToQuery()
}

// Set up event listener for subscriber changes
const unsubscribeFromCollectionEvents = collection.events.on(
`subscriberCountChanged`,
(count) => {
if (count > 0) {
subscribeToQuery()
} else if (count === 0) {
unsubscribeFromQuery()
}
}
)

return async () => {
actualUnsubscribeFn()
unsubscribeFromCollectionEvents()
unsubscribeFromQuery()
await queryClient.cancelQueries({ queryKey })
queryClient.removeQueries({ queryKey })
}
Expand Down
Loading