Skip to content

Commit

Permalink
Add when session expiry timer started in pending state (microsoft#19369)
Browse files Browse the repository at this point in the history
Stash when session expiry timer started in a newly created container to
use it as the original t=0 for resolved containers using stashed info.

---------

Co-authored-by: Mark Fields <[email protected]>
  • Loading branch information
dannimad and markfields authored Feb 2, 2024
1 parent c7d18ac commit 5fc085e
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 21 deletions.
6 changes: 6 additions & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ export interface IPendingRuntimeState {
* Pending idCompressor state
*/
pendingIdCompressorState?: SerializedIdCompressorWithOngoingSession;
/**
* Time at which session expiry timer started.
*/
sessionExpiryTimerStarted?: number | undefined;
}

const maxConsecutiveReconnectsKey = "Fluid.ContainerRuntime.MaxConsecutiveReconnects";
Expand Down Expand Up @@ -1419,6 +1423,7 @@ export class ContainerRuntime
getLastSummaryTimestampMs: () => this.messageAtLastSummary?.timestamp,
readAndParseBlob: async <T>(id: string) => readAndParse<T>(this.storage, id),
submitMessage: (message: ContainerRuntimeGCMessage) => this.submit(message),
sessionExpiryTimerStarted: pendingRuntimeState?.sessionExpiryTimerStarted,
});

const loadedFromSequenceNumber = this.deltaManager.initialSequenceNumber;
Expand Down Expand Up @@ -3961,6 +3966,7 @@ export class ContainerRuntime
pending,
pendingAttachmentBlobs,
pendingIdCompressorState,
sessionExpiryTimerStarted: this.garbageCollector.sessionExpiryTimerStarted,
};
event.end({
attachmentBlobsSize: Object.keys(pendingAttachmentBlobs ?? {}).length,
Expand Down
16 changes: 15 additions & 1 deletion packages/runtime/container-runtime/src/gc/garbageCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class GarbageCollector implements IGarbageCollector {
return this.configs.shouldRunGC;
}

public readonly sessionExpiryTimerStarted: number | undefined;
// Keeps track of the GC state from the last run.
private gcDataFromLastRun: IGarbageCollectionData | undefined;
// Keeps a list of references (edges in the GC graph) between GC runs. Each entry has a node id and a list of
Expand Down Expand Up @@ -175,6 +176,7 @@ export class GarbageCollector implements IGarbageCollector {

const baseSnapshot = createParams.baseSnapshot;
const readAndParseBlob = createParams.readAndParseBlob;
const pendingSessionExpiryTimerStarted = createParams.sessionExpiryTimerStarted;

this.mc = createChildMonitoringContext({
logger: createParams.baseLogger,
Expand All @@ -192,14 +194,26 @@ export class GarbageCollector implements IGarbageCollector {
const overrideSessionExpiryTimeoutMs = this.mc.config.getNumber(
"Fluid.GarbageCollection.TestOverride.SessionExpiryMs",
);
const timeoutMs = overrideSessionExpiryTimeoutMs ?? this.configs.sessionExpiryTimeoutMs;
let timeoutMs = this.configs.sessionExpiryTimeoutMs;

if (pendingSessionExpiryTimerStarted) {
// NOTE: This assumes the client clock hasn't been tampered with since the original session
const timeLapsedSincePendingTimer = Date.now() - pendingSessionExpiryTimerStarted;
timeoutMs -= timeLapsedSincePendingTimer;
}
timeoutMs = overrideSessionExpiryTimeoutMs ?? timeoutMs;
if (timeoutMs <= 0) {
this.runtime.closeFn(
new ClientSessionExpiredError(`Client session expired.`, timeoutMs),
);
}
this.sessionExpiryTimer = new Timer(timeoutMs, () => {
this.runtime.closeFn(
new ClientSessionExpiredError(`Client session expired.`, timeoutMs),
);
});
this.sessionExpiryTimer.start();
this.sessionExpiryTimerStarted = Date.now();
}

this.summaryStateTracker = new GCSummaryStateTracker(
Expand Down
7 changes: 7 additions & 0 deletions packages/runtime/container-runtime/src/gc/gcDefinitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ export interface IGarbageCollectionRuntime {

/** Defines the contract for the garbage collector. */
export interface IGarbageCollector {
/**
* Tells the time at which session expiry timer started in a previous container.
* This is only set when loading from a stashed container and will be equal to the
* original container's local client time when it was loaded (and started the session expiry timer).
*/
readonly sessionExpiryTimerStarted: number | undefined;
/** Tells whether GC should run or not. */
readonly shouldRunGC: boolean;
/** Tells whether the GC state in summary needs to be reset in the next summary. */
Expand Down Expand Up @@ -394,6 +400,7 @@ export interface IGarbageCollectorCreateParams {
readonly getLastSummaryTimestampMs: () => number | undefined;
readonly readAndParseBlob: ReadAndParseBlob;
readonly submitMessage: (message: ContainerRuntimeGCMessage) => void;
readonly sessionExpiryTimerStarted?: number | undefined;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ describe("Garbage Collection Tests", () => {
getNodePackagePath: async (nodeId: string) => testPkgPath,
getLastSummaryTimestampMs: () => Date.now(),
submitMessage: (message: ContainerRuntimeGCMessage) => {},
sessionExpiryTimerStarted: createParams.sessionExpiryTimerStarted,
}) as GcWithPrivates;
}
let gc: GcWithPrivates | undefined;
Expand All @@ -207,29 +208,62 @@ describe("Garbage Collection Tests", () => {
clock.restore();
});

it("Session expiry closes container", () => {
let closeCalled = false;
function closeCalledAfterExactTicks(ticks: number) {
clock.tick(ticks - 1);
if (closeCalled) {
return false;
describe("Session expiry", () => {
it("Session expiry closes container", () => {
let closeCalled = false;
function closeCalledAfterExactTicks(ticks: number) {
clock.tick(ticks - 1);
if (closeCalled) {
return false;
}
clock.tick(1);
return closeCalled;
}

gc = createGarbageCollector(
{},
undefined /* gcBlobsMap */,
undefined /* gcMetadata */,
() => {
closeCalled = true;
},
);
assert(
closeCalledAfterExactTicks(defaultSessionExpiryDurationMs),
"Close should have been called at exactly defaultSessionExpiryDurationMs",
);
});

it("Session expiry is adjusted by sessionExpiryTimerStarted", () => {
let closeCalled = false;
const sessionExpiryTimerStarted = defaultSessionExpiryDurationMs - 1; // arbitrary number
clock.tick(sessionExpiryTimerStarted + defaultSessionExpiryDurationMs - 1);
gc = createGarbageCollector(
{ sessionExpiryTimerStarted },
undefined /* gcBlobsMap */,
undefined /* gcMetadata */,
() => {
closeCalled = true;
},
);
assert(closeCalled === false, "Close should not have been called");
clock.tick(1);
return closeCalled;
}
assert(closeCalled, "Close should have been called");
});

gc = createGarbageCollector(
{},
undefined /* gcBlobsMap */,
undefined /* gcMetadata */,
() => {
closeCalled = true;
},
);
assert(
closeCalledAfterExactTicks(defaultSessionExpiryDurationMs),
"Close should have been called at exactly defaultSessionExpiryDurationMs",
);
it("it throws when already expired", () => {
clock.tick(defaultSessionExpiryDurationMs + 1);
assert.throws(() =>
createGarbageCollector(
{ sessionExpiryTimerStarted: 1 },
undefined /* gcBlobsMap */,
undefined /* gcMetadata */,
() => {
throw new Error("Session expired");
},
),
);
});
});

describe("addedOutboundReference", () => {
Expand Down
15 changes: 15 additions & 0 deletions packages/test/test-end-to-end-tests/src/test/stashedOps.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,21 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => {
);
});

it("fails when session time expires using stashed time", async function () {
const pendingOps = await getPendingOps(provider, false, async (c, d) => {
const map = await d.getSharedObject<SharedMap>(mapId);
[...Array(lots).keys()].map((i) => map.set(i.toString(), i));
});
const pendingState = JSON.parse(pendingOps);
assert.ok(pendingState.pendingRuntimeState.sessionExpiryTimerStarted);
pendingState.pendingRuntimeState.sessionExpiryTimerStarted = 1;
const pendingOps2 = JSON.stringify(pendingState);
await assert.rejects(
async () => loader.resolve({ url }, pendingOps2),
/Client session expired./,
);
});

it("can make changes offline and stash them", async function () {
const pendingOps = await getPendingOps(provider, false, async (c, d) => {
const map = await d.getSharedObject<SharedMap>(mapId);
Expand Down

0 comments on commit 5fc085e

Please sign in to comment.