Skip to content

Commit

Permalink
Use separate eventstream per namespace
Browse files Browse the repository at this point in the history
Signed-off-by: Nicko Guyer <[email protected]>
  • Loading branch information
nguyer committed Nov 3, 2023
1 parent a189de9 commit 60ebd20
Show file tree
Hide file tree
Showing 17 changed files with 363 additions and 306 deletions.
6 changes: 3 additions & 3 deletions .env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PORT=3000
ETHCONNECT_URL=http://127.0.0.1:5102
ETHCONNECT_TOPIC=token
FACTORY_CONTRACT_ADDRESS=
AUTO_INIT=true
ETHCONNECT_TOPIC=tokens_local
FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e"
AUTO_INIT=false
USE_LEGACY_ERC20_SAMPLE=false
USE_LEGACY_ERC721_SAMPLE=false
33 changes: 33 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run Tests",
"runtimeExecutable": "npm",
"args": ["run", "test"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"name": "Run E2E Tests",
"runtimeExecutable": "npm",
"args": ["run", "test:e2e"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${file}",
"preLaunchTask": "tsc: build - tsconfig.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
]
}
4 changes: 1 addition & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@
},
"eslint.validate": ["javascript"],
"solidity.defaultCompiler": "remote",
"cSpell.words": [
"fftm"
]
"cSpell.words": ["eventstream", "fftm"]
}
8 changes: 5 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
FROM node:16-alpine3.15 as build
FROM node:20-alpine3.17 as build
USER node
WORKDIR /home/node
ADD --chown=node:node package*.json ./
RUN npm install
ADD --chown=node:node . .
RUN npm run build

FROM node:16-alpine3.15 as solidity-build
FROM node:20-alpine3.17 as solidity-build
RUN apk add python3 alpine-sdk
USER node
WORKDIR /home/node
Expand All @@ -15,7 +15,7 @@ RUN npm install
ADD --chown=node:node ./samples/solidity .
RUN npx hardhat compile

FROM node:16-alpine3.15
FROM node:20-alpine3.17
RUN apk add curl jq
RUN mkdir -p /app/contracts/source \
&& chgrp -R 0 /app/ \
Expand All @@ -31,6 +31,8 @@ COPY --from=solidity-build --chown=1001:0 /home/node/contracts /home/node/packag
RUN npm install --production
WORKDIR /app/contracts
COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json ./
# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI
COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json /home/node/contracts/
WORKDIR /app
COPY --from=build --chown=1001:0 /home/node/dist ./dist
COPY --from=build --chown=1001:0 /home/node/package.json /home/node/package-lock.json ./
Expand Down
18 changes: 14 additions & 4 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { AxiosRequestConfig } from 'axios';
import { lastValueFrom } from 'rxjs';
import WebSocket from 'ws';
import { FFRequestIDHeader } from '../request-context/constants';
import { Context } from '../request-context/request-context.decorator';
import { Context, newContext } from '../request-context/request-context.decorator';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { getHttpRequestOptions, getWebsocketOptions } from '../utils';
import {
Expand All @@ -46,6 +46,7 @@ export class EventStreamSocket {
constructor(
private url: string,
private topic: string,
private namespace: string,
private username: string,
private password: string,
private handleEvents: (events: EventBatch) => void,
Expand All @@ -67,7 +68,7 @@ export class EventStreamSocket {
} else {
this.logger.log('Event stream websocket connected');
}
this.produce({ type: 'listen', topic: this.topic });
this.produce({ type: 'listen', topic: `${this.topic}/${this.namespace}` });
this.produce({ type: 'listenreplies' });
this.ping();
})
Expand Down Expand Up @@ -109,7 +110,11 @@ export class EventStreamSocket {
}

ack(batchNumber: number | undefined) {
this.produce({ type: 'ack', topic: this.topic, batchNumber });
this.produce({ type: 'ack', topic: `${this.topic}/${this.namespace}`, batchNumber });
}

nack(batchNumber: number | undefined) {
this.produce({ type: 'nack', topic: `${this.topic}/${this.namespace}`, batchNumber });
}

close() {
Expand Down Expand Up @@ -331,15 +336,20 @@ export class EventStreamService {
return true;
}

connect(
async connect(
url: string,
topic: string,
namespace: string,
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
const name = `${topic}/${namespace}`;
await this.createOrUpdateStream(newContext(), name, topic);

return new EventStreamSocket(
url,
topic,
namespace,
this.username,
this.password,
handleEvents,
Expand Down
133 changes: 82 additions & 51 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import { Logger } from '@nestjs/common';
import { MessageBody, SubscribeMessage } from '@nestjs/websockets';
import { v4 as uuidv4 } from 'uuid';
import { Context, newContext } from '../request-context/request-context.decorator';
import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces';
import { EventBatch, EventStream, EventStreamReply } from '../event-stream/event-stream.interfaces';
import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service';
import {
WebSocketActionBase,
WebSocketEventsBase,
WebSocketEx,
WebSocketMessage,
WebSocketStart,
} from '../websocket-events/websocket-events.base';
import {
AckMessageData,
Expand All @@ -40,20 +42,20 @@ import {
* @WebSocketGateway({ path: '/api/stream' })
*/
export abstract class EventStreamProxyBase extends WebSocketEventsBase {
socket?: EventStreamSocket;
namespaceClients: Map<string, Set<WebSocketEx>> = new Map();
namespaceEventStreamSocket: Map<string, EventStreamSocket> = new Map();
url?: string;
topic?: string;

private connectListeners: ConnectionListener[] = [];
private eventListeners: EventListener[] = [];
private awaitingAck: WebSocketMessageWithId[] = [];
private currentClient: WebSocketEx | undefined;
private subscriptionNames = new Map<string, string>();
private queue = Promise.resolve();

constructor(
protected readonly logger: Logger,
protected eventstream: EventStreamService,
protected eventStreamService: EventStreamService,
requireAuth = false,
) {
super(logger, requireAuth);
Expand All @@ -66,55 +68,75 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {

handleConnection(client: WebSocketEx) {
super.handleConnection(client);
if (this.server.clients.size === 1) {
this.logger.log(`Initializing event stream proxy`);
Promise.all(this.connectListeners.map(l => l.onConnect()))
.then(() => {
this.setCurrentClient(client);
this.startListening();
})
.catch(err => {
this.logger.error(`Error initializing event stream proxy: ${err}`);
});
}
client.on('message', (message: string) => {
const action = JSON.parse(message) as WebSocketActionBase;
switch (action.type) {
case 'start':
const startAction = action as WebSocketStart;
this.startListening(client, startAction.namespace);
break;
}
});
}

private queueTask(task: () => void) {
this.queue = this.queue.finally(task);
}

private startListening() {
private async startListening(client: WebSocketEx, namespace: string) {
if (this.url === undefined || this.topic === undefined) {
return;
}
this.socket = this.eventstream.connect(
this.url,
this.topic,
events => {
this.queueTask(() => this.processEvents(events));
},
receipt => {
this.broadcast('receipt', <EventStreamReply>receipt);
},
);
try {
if (!this.namespaceEventStreamSocket.has(namespace)) {
const eventStreamSocket = await this.eventStreamService.connect(
this.url,
this.topic,
namespace,
events => {
this.queueTask(() => this.processEvents(events, namespace));
},
receipt => {
this.broadcast('receipt', <EventStreamReply>receipt);
},
);
this.namespaceEventStreamSocket.set(namespace, eventStreamSocket);
}
let clientSet = this.namespaceClients.get(namespace);
if (!clientSet) {
clientSet = new Set<WebSocketEx>();
}
clientSet.add(client);
this.namespaceClients.set(namespace, clientSet);
} catch (e) {
this.logger.error(`Error connecting to event stream websocket: ${e.message}`);
}
}

handleDisconnect(client: WebSocketEx) {
super.handleDisconnect(client);
if (this.server.clients.size === 0) {
this.stopListening();
} else if (client.id === this.currentClient?.id) {
for (const newClient of this.server.clients) {
this.setCurrentClient(newClient as WebSocketEx);
break;
}
}
}

private stopListening() {
this.socket?.close();
this.socket = undefined;
this.currentClient = undefined;
// Iterate over all the namespaces this client was subscribed to
this.namespaceClients.forEach((clientSet, namespace) => {
clientSet.delete(client);

// Nack any messages that are inflight for that namespace
const nackedMessageIds: Set<string> = new Set();
this.awaitingAck
.filter(msg => msg.namespace === namespace)
.map(msg => {
this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber);
nackedMessageIds.add(msg.id);
});
this.awaitingAck = this.awaitingAck.filter(msg => nackedMessageIds.has(msg.id));

// If all clients for this namespace have disconnected, also close the connection to EVMConnect
if (clientSet.size == 0) {
this.namespaceEventStreamSocket.get(namespace)?.close();
this.namespaceEventStreamSocket.delete(namespace);
this.namespaceClients.delete(namespace);
}
});
}

addConnectionListener(listener: ConnectionListener) {
Expand All @@ -125,7 +147,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
this.eventListeners.push(listener);
}

private async processEvents(batch: EventBatch) {
private async processEvents(batch: EventBatch, namespace: string) {
const messages: WebSocketMessage[] = [];
const eventHandlers: Promise<WebSocketMessage | undefined>[] = [];
for (const event of batch.events) {
Expand Down Expand Up @@ -156,6 +178,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
}
}
const message: WebSocketMessageWithId = {
namespace: namespace,
id: uuidv4(),
event: 'batch',
data: <WebSocketMessageBatchData>{
Expand All @@ -164,7 +187,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
batchNumber: batch.batchNumber,
};
this.awaitingAck.push(message);
this.currentClient?.send(JSON.stringify(message));
this.send(namespace, JSON.stringify(message));
}

private async getSubscriptionName(ctx: Context, subId: string) {
Expand All @@ -174,7 +197,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
}

try {
const sub = await this.eventstream.getSubscription(ctx, subId);
const sub = await this.eventStreamService.getSubscription(ctx, subId);
if (sub !== undefined) {
this.subscriptionNames.set(subId, sub.name);
return sub.name;
Expand All @@ -185,13 +208,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
return undefined;
}

private setCurrentClient(client: WebSocketEx) {
this.currentClient = client;
for (const message of this.awaitingAck) {
this.currentClient.send(JSON.stringify(message));
}
}

@SubscribeMessage('ack')
handleAck(@MessageBody() data: AckMessageData) {
if (data.id === undefined) {
Expand All @@ -201,7 +217,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {

const inflight = this.awaitingAck.find(msg => msg.id === data.id);
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`);
if (this.socket !== undefined && inflight !== undefined) {
if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) {
this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id);
if (
// If nothing is left awaiting an ack - then we clearly need to ack
Expand All @@ -212,7 +228,22 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
!this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber))
) {
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`);
this.socket.ack(inflight.batchNumber);
this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber);
}
}
}

send(namespace, payload: string) {
const clients = this.namespaceClients.get(namespace);
if (clients) {
// Randomly select a connected client for this namespace to distribute load
const selected = Math.floor(Math.random() * clients.size);
let i = 0;
for (let client of clients.keys()) {
if (i++ == selected) {
client.send(payload);
return;
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface EventListener {
}

export interface WebSocketMessageWithId extends WebSocketMessage {
namespace: string;
id: string;
batchNumber: number | undefined;
}
Expand Down
4 changes: 0 additions & 4 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ async function bootstrap() {
.configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders, blockchainRetryCfg);
app.get(AbiMapperService).configure(legacyERC20, legacyERC721);

if (autoInit.toLowerCase() !== 'false') {
await app.get(TokensService).init(newContext());
}

const port = config.get<number>('PORT', 3000);
console.log(`Listening on port ${port}`);
await app.listen(port);
Expand Down
Loading

0 comments on commit 60ebd20

Please sign in to comment.