Skip to content

Commit deb1444

Browse files
committed
Increase expires_at only when subscribing again
1 parent 24fcc0e commit deb1444

File tree

3 files changed

+23
-23
lines changed

3 files changed

+23
-23
lines changed

crates/core/src/sync/streaming_sync.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -594,10 +594,6 @@ impl StreamingSyncIteration {
594594
local.active = true;
595595
local.is_default = subscription.is_default;
596596
has_local = true;
597-
598-
if let Some(ttl) = local.ttl {
599-
local.expires_at = Some(now.0 + ttl);
600-
}
601597
}
602598

603599
if !has_local && subscription.is_default {

crates/core/src/sync/subscriptions.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,17 @@ pub fn apply_subscriptions(
8484
match subscription {
8585
SubscriptionChangeRequest::Subscribe(subscription) => {
8686
let stmt = db
87-
.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4")
87+
.prepare_v2(
88+
"
89+
INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl, expires_at)
90+
VALUES (?, ?2, ?, ?4, unixepoch() + ?4)
91+
ON CONFLICT DO UPDATE SET
92+
local_priority = min(coalesce(?2, local_priority),
93+
local_priority),
94+
ttl = ?4,
95+
expires_at = unixepoch() + ?4
96+
",
97+
)
8898
.into_db_result(db)?;
8999

90100
stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?;

dart/test/sync_stream_test.dart

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,18 @@ void main() {
270270
});
271271

272272
syncTest('ttl', (controller) {
273-
db.execute(
274-
'INSERT INTO ps_stream_subscriptions (stream_name, ttl) VALUES (?, ?);',
275-
['my_stream', 3600]);
273+
control(
274+
'subscriptions',
275+
json.encode({
276+
'subscribe': {
277+
'stream': 'my_stream',
278+
'ttl': 3600,
279+
}
280+
}),
281+
);
282+
283+
final [row] = db.select('SELECT * FROM ps_stream_subscriptions');
284+
expect(row, containsPair('expires_at', 1740826800));
276285

277286
var startInstructions = control('start', null);
278287
expect(
@@ -300,21 +309,6 @@ void main() {
300309
),
301310
),
302311
);
303-
304-
// Send a checkpoint containing the stream, increasing the TTL.
305-
control(
306-
'line_text',
307-
json.encode(
308-
checkpoint(
309-
lastOpId: 1,
310-
buckets: [],
311-
streams: [('my_stream', false)],
312-
),
313-
),
314-
);
315-
316-
final [row] = db.select('SELECT * FROM ps_stream_subscriptions');
317-
expect(row, containsPair('expires_at', 1740826800));
318312
control('stop', null);
319313

320314
// Elapse beyond end of TTL

0 commit comments

Comments
 (0)