From 3646ade8dbd8f4bda3fb7ef6523bdfae3155b13a Mon Sep 17 00:00:00 2001 From: Willie Habimana Date: Thu, 2 Jan 2025 13:18:01 -0500 Subject: [PATCH 1/3] initial commit --- .../framework/presence/src/systemWorkspace.ts | 111 ++++++++++++------ .../presence/src/test/presenceManager.spec.ts | 40 ++++--- 2 files changed, 95 insertions(+), 56 deletions(-) diff --git a/packages/framework/presence/src/systemWorkspace.ts b/packages/framework/presence/src/systemWorkspace.ts index 86d975cbecd9..cccca20d8a2d 100644 --- a/packages/framework/presence/src/systemWorkspace.ts +++ b/packages/framework/presence/src/systemWorkspace.ts @@ -17,6 +17,7 @@ import type { } from "./presence.js"; import { SessionClientStatus } from "./presence.js"; import type { PresenceStatesInternal } from "./presenceStates.js"; +import { TimerManager } from "./timerManager.js"; import type { PresenceStates, PresenceStatesSchema } from "./types.js"; /** @@ -37,17 +38,12 @@ class SessionClient implements ISessionClient { */ public order: number = 0; - private connectionStatus: SessionClientStatus; + private connectionStatus: SessionClientStatus = SessionClientStatus.Disconnected; public constructor( public readonly sessionId: ClientSessionId, - private connectionId: ClientConnectionId | undefined = undefined, - ) { - this.connectionStatus = - connectionId === undefined - ? SessionClientStatus.Disconnected - : SessionClientStatus.Connected; - } + public connectionId: ClientConnectionId | undefined = undefined, + ) {} public getConnectionId(): ClientConnectionId { if (this.connectionId === undefined) { @@ -60,8 +56,7 @@ class SessionClient implements ISessionClient { return this.connectionStatus; } - public setConnectionId(connectionId: ClientConnectionId): void { - this.connectionId = connectionId; + public setConnected(): void { this.connectionStatus = SessionClientStatus.Connected; } @@ -103,6 +98,12 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { */ private readonly attendees = new Map(); + // When local client disconnects, we lose the connectivity status updates for remote attendees in the session. + // Upon reconnect, we mark all other attendees connections as stale and update their status to disconnected after 30 seconds of inactivity. + private readonly staleConnectionClients = new Set(); + + private readonly staleConnectionTimer = new TimerManager(); + public constructor( clientSessionId: ClientSessionId, private readonly datastore: SystemWorkspaceDatastore, @@ -137,34 +138,23 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { ): void { const postUpdateActions: (() => void)[] = []; const audienceMembers = this.audience.getMembers(); - const connectedAttendees = new Set(); + const joiningAttendees = new Set(); for (const [clientConnectionId, value] of Object.entries( remoteDatastore.clientToSessionId, )) { const clientSessionId = value.value; - const { attendee, isNew } = this.ensureAttendee( + const { attendee, isJoining } = this.ensureAttendee( clientSessionId, clientConnectionId, /* order */ value.rev, + // If the attendee is present in audience OR if the attendee update is from the sending remote client itself, + // then the attendee is considered connected. + /* isConnected */ senderConnectionId === clientConnectionId || + audienceMembers.has(clientConnectionId), ); - - // Check new attendee against audience to see if they're currently connected - const isAttendeeConnected = audienceMembers.has(clientConnectionId); - - if (isAttendeeConnected) { - connectedAttendees.add(attendee); - if (attendee.getConnectionStatus() === SessionClientStatus.Disconnected) { - attendee.setConnectionId(clientConnectionId); - } - if (isNew) { - // If the attendee is both new and in audience (i.e. currently connected), emit an attendeeJoined event. - postUpdateActions.push(() => this.events.emit("attendeeJoined", attendee)); - } - } - - // If the attendee is not in the audience, they are considered disconnected. - if (!connectedAttendees.has(attendee)) { - attendee.setDisconnected(); + // If the attendee is joining the session, add them to the list of joining attendees to be announced later. + if (isJoining) { + joiningAttendees.add(attendee); } const knownSessionId: InternalTypes.ValueRequiredState | undefined = @@ -176,6 +166,10 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { } } + for (const announcedAttendee of joiningAttendees) { + postUpdateActions.push(() => this.events.emit("attendeeJoined", announcedAttendee)); + } + // TODO: reorganize processUpdate and caller to process actions after all updates are processed. for (const action of postUpdateActions) { action(); @@ -189,8 +183,32 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { value: this.selfAttendee.sessionId, }; - this.selfAttendee.setConnectionId(clientConnectionId); + // Clear the stale connection timer when the local client reconnects + this.staleConnectionTimer.clearTimeout(); + + // Mark 'Connected' remote attendees connections as stale + for (const staleConnecionClient of this.attendees.values()) { + if (staleConnecionClient.getConnectionStatus() === SessionClientStatus.Connected) { + this.staleConnectionClients.add(staleConnecionClient); + } + } + + // Update the self attendee + this.selfAttendee.connectionId = clientConnectionId; + this.selfAttendee.setConnected(); this.attendees.set(clientConnectionId, this.selfAttendee); + + // Start the stale connection timer + this.staleConnectionTimer.setTimeout(() => { + for (const client of this.staleConnectionClients) { + // Mark the client as disconnected and remove from the stale connection set + if (client.getConnectionStatus() === SessionClientStatus.Connected) { + client.setDisconnected(); + this.events.emit("attendeeDisconnected", client); + } + this.staleConnectionClients.delete(client); + } + }, 30_000); } public removeClientConnectionId(clientConnectionId: ClientConnectionId): void { @@ -199,6 +217,11 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { return; } + if (attendee === this.selfAttendee) { + // If the local connection is being removed, clear the stale connection timer + this.staleConnectionTimer.clearTimeout(); + } + // If the last known connectionID is different from the connection ID being removed, the attendee has reconnected, // therefore we should not change the attendee connection status or emit a disconnect event. const attendeeReconnected = attendee.getConnectionId() !== clientConnectionId; @@ -239,27 +262,41 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { clientSessionId: ClientSessionId, clientConnectionId: ClientConnectionId, order: number, - ): { attendee: SessionClient; isNew: boolean } { + isConnected: boolean, + ): { attendee: SessionClient; isJoining: boolean } { let attendee = this.attendees.get(clientSessionId); - let isNew = false; + let isJoining = false; if (attendee === undefined) { // New attendee. Create SessionClient and add session ID based // entry to map. attendee = new SessionClient(clientSessionId, clientConnectionId); this.attendees.set(clientSessionId, attendee); - isNew = true; + if (isConnected) { + attendee.setConnected(); + isJoining = true; + } } else if (order > attendee.order) { // The given association is newer than the one we have. // Update the order and current connection ID. attendee.order = order; - attendee.setConnectionId(clientConnectionId); - isNew = true; + // Known attendee is joining the session if they are currently disconnected + if (attendee.getConnectionStatus() === SessionClientStatus.Disconnected && isConnected) { + attendee.setConnected(); + isJoining = true; + } + attendee.connectionId = clientConnectionId; } + + if (this.staleConnectionClients.has(attendee) && isConnected) { + // If the attendee is connected, remove them from the stale connection set + this.staleConnectionClients.delete(attendee); + } + // Always update entry for the connection ID. (Okay if already set.) this.attendees.set(clientConnectionId, attendee); - return { attendee, isNew }; + return { attendee, isJoining }; } } diff --git a/packages/framework/presence/src/test/presenceManager.spec.ts b/packages/framework/presence/src/test/presenceManager.spec.ts index a77cae46ce4c..8dc30f4ee98c 100644 --- a/packages/framework/presence/src/test/presenceManager.spec.ts +++ b/packages/framework/presence/src/test/presenceManager.spec.ts @@ -80,7 +80,7 @@ describe("Presence", () => { const afterCleanUp: (() => void)[] = []; beforeEach(() => { - presence = prepareConnectedPresence(runtime, "seassionId-2", "client2", clock, logger); + presence = prepareConnectedPresence(runtime, "sessionId-2", "client2", clock, logger); }); afterEach(() => { @@ -203,7 +203,7 @@ describe("Presence", () => { verifyAttendee(joinedAttendees[0], rejoinAttendeeConnectionId, attendeeSessionId); }); - it.skip('second time is announced once via `attendeeJoined` with status "Connected" when prior is still connected', () => { + it('second time is announced once via `attendeeJoined` with status "Connected" when prior is still connected', () => { // Act - simulate join message from client const joinedAttendees = processJoinSignals([rejoinAttendeeSignal]); @@ -217,7 +217,7 @@ describe("Presence", () => { verifyAttendee(joinedAttendees[0], rejoinAttendeeConnectionId, attendeeSessionId); }); - it.skip('first time is announced via `attendeeJoined` with status "Connected" even if unknown to audience', () => { + it('first time is announced via `attendeeJoined` with status "Connected" even if unknown to audience', () => { // Setup - remove connection from audience runtime.removeMember(initialAttendeeConnectionId); @@ -283,7 +283,7 @@ describe("Presence", () => { verifyAttendee(joinedAttendees[0], rejoinAttendeeConnectionId, attendeeSessionId); }); - it.skip("as collateral with old connection info and connected is NOT announced via `attendeeJoined`", () => { + it("as collateral with old connection info and connected is NOT announced via `attendeeJoined`", () => { // Setup - generate signals // Both connection Id's unkonwn to audience @@ -384,7 +384,7 @@ describe("Presence", () => { // To retain symmetry across Joined and Disconnected events, do not announce // attendeeJoined when the attendee is already connected and we only see // a connection id update. This can happen when audience removal is late. - it.skip('is not announced via `attendeeJoined` when already "Connected"', () => { + it('is not announced via `attendeeJoined` when already "Connected"', () => { // Setup afterCleanUp.push( presence.events.on("attendeeJoined", () => { @@ -448,19 +448,21 @@ describe("Presence", () => { // (e.g. being in audience, sending an update, or (re)joining the session) before their connection status set to "Disconnected". // If an attendee with a stale connection becomes active, their "stale" status is removed. describe("and then local client disconnects", () => { - let disconnectedAttendees: ISessionClient[]; + let remoteDisconnectedAttendees: ISessionClient[]; beforeEach(() => { // Setup assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); - disconnectedAttendees = []; + remoteDisconnectedAttendees = []; afterCleanUp.push( presence.events.on("attendeeDisconnected", (attendee) => { - disconnectedAttendees.push(attendee); + if (attendee !== presence.getMyself()) { + remoteDisconnectedAttendees.push(attendee); + } }), ); }); - it.skip("updates status of attendee with stale connection after 30s delay upon local reconnection", () => { + it("updates status of attendee with stale connection after 30s delay upon local reconnection", () => { assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); // Act - disconnect & reconnect local client @@ -484,13 +486,13 @@ describe("Presence", () => { "Attendee with stale connection should be 'Disconnected' 30s after reconnection", ); assert.strictEqual( - disconnectedAttendees.length, + remoteDisconnectedAttendees.length, 1, "Exactly one attendee should be announced as disconnected", ); }); - it.skip("does not update status of attendee with stale connection if local client does not reconnect", () => { + it("does not update status of attendee with stale connection if local client does not reconnect", () => { assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); // Act - disconnect local client and advance timer @@ -505,7 +507,7 @@ describe("Presence", () => { ); }); - it.skip("does not update status of attendee with stale connection if local client reconnection lasts less than 30s", () => { + it("does not update status of attendee with stale connection if local client reconnection lasts less than 30s", () => { assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); // Act - disconnect, reconnect for 15 second, disconnect local client again, then advance timer @@ -524,7 +526,7 @@ describe("Presence", () => { ); }); - it.skip("does not update status of attendee with stale connection if attendee rejoins", () => { + it("does not update status of attendee with stale connection if attendee rejoins", () => { assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); // Setup - fail if attendee joined is announced @@ -552,7 +554,7 @@ describe("Presence", () => { ); }); - it.skip("does not update status of attendee with stale connection if attendee sends datastore update", () => { + it("does not update status of attendee with stale connection if attendee sends datastore update", () => { assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); // Setup - fail if attendee joined is announced @@ -598,7 +600,7 @@ describe("Presence", () => { ); }); - it.skip("announces `attendeeDisconnected` once when remote client disconnects after local client reconnects", () => { + it("announces `attendeeDisconnected` once when remote client disconnects after local client reconnects", () => { assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); // Setup - initial attendee joins before local client disconnects @@ -612,7 +614,7 @@ describe("Presence", () => { runtime.audience.removeMember(initialAttendeeConnectionId); // Remove remote client connection before 30s timeout // Confirm that `attendeeDisconnected` is announced for when active attendee disconnects assert.strictEqual( - disconnectedAttendees.length, + remoteDisconnectedAttendees.length, 1, "Exactly one attendee should be announced as disconnected", ); @@ -625,13 +627,13 @@ describe("Presence", () => { "Attendee should be 'Disconnected'", ); assert.strictEqual( - disconnectedAttendees.length, + remoteDisconnectedAttendees.length, 1, "Exactly one attendee should be announced as disconnected", ); }); - it.skip("updates status of attendee with stale connection only 30s after most recent local reconnection", () => { + it("updates status of attendee with stale connection only 30s after most recent local reconnection", () => { // Setup assert(knownAttendee !== undefined, "No attendee was set in beforeEach"); assert.strictEqual( @@ -667,7 +669,7 @@ describe("Presence", () => { "Attendee with stale connection has wrong status", ); assert.strictEqual( - disconnectedAttendees.length, + remoteDisconnectedAttendees.length, 1, "Exactly one attendee should be announced as disconnected", ); From 1aa65b9205652a932fb463d8b1e8f4f6698992ed Mon Sep 17 00:00:00 2001 From: WillieHabi <143546745+WillieHabi@users.noreply.github.com> Date: Thu, 2 Jan 2025 13:58:05 -0500 Subject: [PATCH 2/3] Update packages/framework/presence/src/systemWorkspace.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/framework/presence/src/systemWorkspace.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/framework/presence/src/systemWorkspace.ts b/packages/framework/presence/src/systemWorkspace.ts index cccca20d8a2d..43b6aba3c302 100644 --- a/packages/framework/presence/src/systemWorkspace.ts +++ b/packages/framework/presence/src/systemWorkspace.ts @@ -187,7 +187,7 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { this.staleConnectionTimer.clearTimeout(); // Mark 'Connected' remote attendees connections as stale - for (const staleConnecionClient of this.attendees.values()) { + for (const staleConnectionClient of this.attendees.values()) { if (staleConnecionClient.getConnectionStatus() === SessionClientStatus.Connected) { this.staleConnectionClients.add(staleConnecionClient); } From 4313d282ad961f0bdada35262cafad81f62b9ebf Mon Sep 17 00:00:00 2001 From: Willie Habimana Date: Thu, 2 Jan 2025 14:05:37 -0500 Subject: [PATCH 3/3] spelling --- packages/framework/presence/src/systemWorkspace.ts | 6 +++--- .../presence/src/test/presenceDatastoreManager.spec.ts | 8 ++++---- packages/framework/presence/src/test/testUtils.ts | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/framework/presence/src/systemWorkspace.ts b/packages/framework/presence/src/systemWorkspace.ts index cccca20d8a2d..fe99fc355876 100644 --- a/packages/framework/presence/src/systemWorkspace.ts +++ b/packages/framework/presence/src/systemWorkspace.ts @@ -187,9 +187,9 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace { this.staleConnectionTimer.clearTimeout(); // Mark 'Connected' remote attendees connections as stale - for (const staleConnecionClient of this.attendees.values()) { - if (staleConnecionClient.getConnectionStatus() === SessionClientStatus.Connected) { - this.staleConnectionClients.add(staleConnecionClient); + for (const staleConnectionClient of this.attendees.values()) { + if (staleConnectionClient.getConnectionStatus() === SessionClientStatus.Connected) { + this.staleConnectionClients.add(staleConnectionClient); } } diff --git a/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts b/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts index 021085a9d482..46a5adfbb6a1 100644 --- a/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts +++ b/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts @@ -50,14 +50,14 @@ describe("Presence", () => { it("sends join when connected during initialization", () => { // Setup, Act (call to createPresenceManager), & Verify (post createPresenceManager call) - prepareConnectedPresence(runtime, "seassionId-2", "client2", clock, logger); + prepareConnectedPresence(runtime, "sessionId-2", "client2", clock, logger); }); describe("responds to ClientJoin", () => { let presence: ReturnType; beforeEach(() => { - presence = prepareConnectedPresence(runtime, "seassionId-2", "client2", clock, logger); + presence = prepareConnectedPresence(runtime, "sessionId-2", "client2", clock, logger); // Pass a little time (to mimic reality) clock.tick(10); @@ -83,7 +83,7 @@ describe("Presence", () => { "client2": { "rev": 0, "timestamp": initialTime, - "value": "seassionId-2", + "value": "sessionId-2", }, }, }, @@ -153,7 +153,7 @@ describe("Presence", () => { "client2": { "rev": 0, "timestamp": initialTime, - "value": "seassionId-2", + "value": "sessionId-2", }, }, }, diff --git a/packages/framework/presence/src/test/testUtils.ts b/packages/framework/presence/src/test/testUtils.ts index 453304d9ceda..a668b0ef010c 100644 --- a/packages/framework/presence/src/test/testUtils.ts +++ b/packages/framework/presence/src/test/testUtils.ts @@ -39,7 +39,7 @@ export function createInstanceOf(): T { export function generateBasicClientJoin( fixedTime: number, { - clientSessionId = "seassionId-2", + clientSessionId = "sessionId-2", clientConnectionId = "client2", updateProviders = ["client0", "client1", "client3"], connectionOrder = 0,