diff --git a/.env b/.env index 2bbb638..5e4d496 100644 --- a/.env +++ b/.env @@ -1,7 +1,7 @@ PORT=3000 ETHCONNECT_URL=http://127.0.0.1:5102 -ETHCONNECT_TOPIC=token -FACTORY_CONTRACT_ADDRESS= -AUTO_INIT=true +ETHCONNECT_TOPIC=tokens_local +FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e" +AUTO_INIT=false USE_LEGACY_ERC20_SAMPLE=false USE_LEGACY_ERC721_SAMPLE=false diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..1377495 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,33 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run Tests", + "runtimeExecutable": "npm", + "args": ["run", "test"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "name": "Run E2E Tests", + "runtimeExecutable": "npm", + "args": ["run", "test:e2e"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "type": "node", + "request": "launch", + "name": "Launch Program", + "skipFiles": ["/**"], + "program": "${file}", + "preLaunchTask": "tsc: build - tsconfig.json", + "outFiles": ["${workspaceFolder}/dist/**/*.js"] + } + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json index 108b279..4fb545a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,7 +5,5 @@ }, "eslint.validate": ["javascript"], "solidity.defaultCompiler": "remote", - "cSpell.words": [ - "fftm" - ] + "cSpell.words": ["eventstream", "fftm"] } diff --git a/Dockerfile b/Dockerfile index 2129032..126786d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:16-alpine3.15 as build +FROM node:20-alpine3.17 as build USER node WORKDIR /home/node ADD --chown=node:node package*.json ./ @@ -6,7 +6,7 @@ RUN npm install ADD --chown=node:node . . RUN npm run build -FROM node:16-alpine3.15 as solidity-build +FROM node:20-alpine3.17 as solidity-build RUN apk add python3 alpine-sdk USER node WORKDIR /home/node @@ -15,7 +15,7 @@ RUN npm install ADD --chown=node:node ./samples/solidity . RUN npx hardhat compile -FROM node:16-alpine3.15 +FROM node:20-alpine3.17 RUN apk add curl jq RUN mkdir -p /app/contracts/source \ && chgrp -R 0 /app/ \ @@ -31,6 +31,8 @@ COPY --from=solidity-build --chown=1001:0 /home/node/contracts /home/node/packag RUN npm install --production WORKDIR /app/contracts COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json ./ +# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI +COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json /home/node/contracts/ WORKDIR /app COPY --from=build --chown=1001:0 /home/node/dist ./dist COPY --from=build --chown=1001:0 /home/node/package.json /home/node/package-lock.json ./ diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 45a8cc1..0917e02 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -20,7 +20,7 @@ import { AxiosRequestConfig } from 'axios'; import { lastValueFrom } from 'rxjs'; import WebSocket from 'ws'; import { FFRequestIDHeader } from '../request-context/constants'; -import { Context } from '../request-context/request-context.decorator'; +import { Context, newContext } from '../request-context/request-context.decorator'; import { IAbiMethod } from '../tokens/tokens.interfaces'; import { getHttpRequestOptions, getWebsocketOptions } from '../utils'; import { @@ -46,6 +46,7 @@ export class EventStreamSocket { constructor( private url: string, private topic: string, + private namespace: string, private username: string, private password: string, private handleEvents: (events: EventBatch) => void, @@ -67,7 +68,7 @@ export class EventStreamSocket { } else { this.logger.log('Event stream websocket connected'); } - this.produce({ type: 'listen', topic: this.topic }); + this.produce({ type: 'listen', topic: `${this.topic}/${this.namespace}` }); this.produce({ type: 'listenreplies' }); this.ping(); }) @@ -109,7 +110,11 @@ export class EventStreamSocket { } ack(batchNumber: number | undefined) { - this.produce({ type: 'ack', topic: this.topic, batchNumber }); + this.produce({ type: 'ack', topic: `${this.topic}/${this.namespace}`, batchNumber }); + } + + nack(batchNumber: number | undefined) { + this.produce({ type: 'nack', topic: `${this.topic}/${this.namespace}`, batchNumber }); } close() { @@ -331,15 +336,20 @@ export class EventStreamService { return true; } - connect( + async connect( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) { + const name = `${topic}/${namespace}`; + await this.createOrUpdateStream(newContext(), name, topic); + return new EventStreamSocket( url, topic, + namespace, this.username, this.password, handleEvents, diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index e708fbd..2d3aeec 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -18,12 +18,14 @@ import { Logger } from '@nestjs/common'; import { MessageBody, SubscribeMessage } from '@nestjs/websockets'; import { v4 as uuidv4 } from 'uuid'; import { Context, newContext } from '../request-context/request-context.decorator'; -import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; +import { EventBatch, EventStream, EventStreamReply } from '../event-stream/event-stream.interfaces'; import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service'; import { + WebSocketActionBase, WebSocketEventsBase, WebSocketEx, WebSocketMessage, + WebSocketStart, } from '../websocket-events/websocket-events.base'; import { AckMessageData, @@ -40,20 +42,20 @@ import { * @WebSocketGateway({ path: '/api/stream' }) */ export abstract class EventStreamProxyBase extends WebSocketEventsBase { - socket?: EventStreamSocket; + namespaceClients: Map> = new Map(); + namespaceEventStreamSocket: Map = new Map(); url?: string; topic?: string; private connectListeners: ConnectionListener[] = []; private eventListeners: EventListener[] = []; private awaitingAck: WebSocketMessageWithId[] = []; - private currentClient: WebSocketEx | undefined; private subscriptionNames = new Map(); private queue = Promise.resolve(); constructor( protected readonly logger: Logger, - protected eventstream: EventStreamService, + protected eventStreamService: EventStreamService, requireAuth = false, ) { super(logger, requireAuth); @@ -66,55 +68,75 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { handleConnection(client: WebSocketEx) { super.handleConnection(client); - if (this.server.clients.size === 1) { - this.logger.log(`Initializing event stream proxy`); - Promise.all(this.connectListeners.map(l => l.onConnect())) - .then(() => { - this.setCurrentClient(client); - this.startListening(); - }) - .catch(err => { - this.logger.error(`Error initializing event stream proxy: ${err}`); - }); - } + client.on('message', (message: string) => { + const action = JSON.parse(message) as WebSocketActionBase; + switch (action.type) { + case 'start': + const startAction = action as WebSocketStart; + this.startListening(client, startAction.namespace); + break; + } + }); } private queueTask(task: () => void) { this.queue = this.queue.finally(task); } - private startListening() { + private async startListening(client: WebSocketEx, namespace: string) { if (this.url === undefined || this.topic === undefined) { return; } - this.socket = this.eventstream.connect( - this.url, - this.topic, - events => { - this.queueTask(() => this.processEvents(events)); - }, - receipt => { - this.broadcast('receipt', receipt); - }, - ); + try { + if (!this.namespaceEventStreamSocket.has(namespace)) { + const eventStreamSocket = await this.eventStreamService.connect( + this.url, + this.topic, + namespace, + events => { + this.queueTask(() => this.processEvents(events, namespace)); + }, + receipt => { + this.broadcast('receipt', receipt); + }, + ); + this.namespaceEventStreamSocket.set(namespace, eventStreamSocket); + } + let clientSet = this.namespaceClients.get(namespace); + if (!clientSet) { + clientSet = new Set(); + } + clientSet.add(client); + this.namespaceClients.set(namespace, clientSet); + } catch (e) { + this.logger.error(`Error connecting to event stream websocket: ${e.message}`); + } } handleDisconnect(client: WebSocketEx) { super.handleDisconnect(client); - if (this.server.clients.size === 0) { - this.stopListening(); - } else if (client.id === this.currentClient?.id) { - for (const newClient of this.server.clients) { - this.setCurrentClient(newClient as WebSocketEx); - break; - } - } - } - private stopListening() { - this.socket?.close(); - this.socket = undefined; - this.currentClient = undefined; + // Iterate over all the namespaces this client was subscribed to + this.namespaceClients.forEach((clientSet, namespace) => { + clientSet.delete(client); + + // Nack any messages that are inflight for that namespace + const nackedMessageIds: Set = new Set(); + this.awaitingAck + .filter(msg => msg.namespace === namespace) + .map(msg => { + this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber); + nackedMessageIds.add(msg.id); + }); + this.awaitingAck = this.awaitingAck.filter(msg => nackedMessageIds.has(msg.id)); + + // If all clients for this namespace have disconnected, also close the connection to EVMConnect + if (clientSet.size == 0) { + this.namespaceEventStreamSocket.get(namespace)?.close(); + this.namespaceEventStreamSocket.delete(namespace); + this.namespaceClients.delete(namespace); + } + }); } addConnectionListener(listener: ConnectionListener) { @@ -125,7 +147,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { this.eventListeners.push(listener); } - private async processEvents(batch: EventBatch) { + private async processEvents(batch: EventBatch, namespace: string) { const messages: WebSocketMessage[] = []; const eventHandlers: Promise[] = []; for (const event of batch.events) { @@ -156,6 +178,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } } const message: WebSocketMessageWithId = { + namespace: namespace, id: uuidv4(), event: 'batch', data: { @@ -164,7 +187,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { batchNumber: batch.batchNumber, }; this.awaitingAck.push(message); - this.currentClient?.send(JSON.stringify(message)); + this.send(namespace, JSON.stringify(message)); } private async getSubscriptionName(ctx: Context, subId: string) { @@ -174,7 +197,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } try { - const sub = await this.eventstream.getSubscription(ctx, subId); + const sub = await this.eventStreamService.getSubscription(ctx, subId); if (sub !== undefined) { this.subscriptionNames.set(subId, sub.name); return sub.name; @@ -185,13 +208,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { return undefined; } - private setCurrentClient(client: WebSocketEx) { - this.currentClient = client; - for (const message of this.awaitingAck) { - this.currentClient.send(JSON.stringify(message)); - } - } - @SubscribeMessage('ack') handleAck(@MessageBody() data: AckMessageData) { if (data.id === undefined) { @@ -201,7 +217,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { const inflight = this.awaitingAck.find(msg => msg.id === data.id); this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); - if (this.socket !== undefined && inflight !== undefined) { + if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id); if ( // If nothing is left awaiting an ack - then we clearly need to ack @@ -212,7 +228,22 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { !this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber)) ) { this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); - this.socket.ack(inflight.batchNumber); + this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); + } + } + } + + send(namespace, payload: string) { + const clients = this.namespaceClients.get(namespace); + if (clients) { + // Randomly select a connected client for this namespace to distribute load + const selected = Math.floor(Math.random() * clients.size); + let i = 0; + for (let client of clients.keys()) { + if (i++ == selected) { + client.send(payload); + return; + } } } } diff --git a/src/eventstream-proxy/eventstream-proxy.interfaces.ts b/src/eventstream-proxy/eventstream-proxy.interfaces.ts index 20bfca3..8588292 100644 --- a/src/eventstream-proxy/eventstream-proxy.interfaces.ts +++ b/src/eventstream-proxy/eventstream-proxy.interfaces.ts @@ -30,6 +30,7 @@ export interface EventListener { } export interface WebSocketMessageWithId extends WebSocketMessage { + namespace: string; id: string; batchNumber: number | undefined; } diff --git a/src/main.ts b/src/main.ts index 549da08..6af6237 100644 --- a/src/main.ts +++ b/src/main.ts @@ -107,10 +107,6 @@ async function bootstrap() { .configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders, blockchainRetryCfg); app.get(AbiMapperService).configure(legacyERC20, legacyERC721); - if (autoInit.toLowerCase() !== 'false') { - await app.get(TokensService).init(newContext()); - } - const port = config.get('PORT', 3000); console.log(`Listening on port ${port}`); await app.listen(port); diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 2daeeb2..356e96b 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -42,7 +42,7 @@ export class TokensController { @HttpCode(204) @ApiOperation({ summary: 'Perform one-time initialization (if not auto-initialized)' }) init(@RequestContext() ctx: Context) { - return this.service.init(ctx); + // Do nothing. Endpoint retained for backwards compatibility with older tooling. } @Post('createpool') diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index d174053..8d9fa99 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -139,6 +139,10 @@ export class TokenPoolConfig { } export class TokenPool { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty({ enum: TokenType }) @IsEnum(TokenType) type: TokenType; @@ -213,6 +217,10 @@ export class BlockchainEvent { } export class TokenPoolActivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -227,6 +235,10 @@ export class TokenPoolActivate { } export class TokenPoolDeactivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -271,6 +283,10 @@ export class CheckInterfaceResponse implements TokenAbi { } export class TokenTransfer { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -330,6 +346,10 @@ export class TokenApprovalConfig { } export class TokenApproval { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -366,6 +386,10 @@ export class TokenApproval { // Websocket notifications class tokenEventBase { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() poolLocator: string; @@ -391,6 +415,10 @@ export class TokenPoolEventInfo { } export class TokenPoolEvent extends tokenEventBase { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() type: TokenType; diff --git a/src/tokens/tokens.service.spec.ts b/src/tokens/tokens.service.spec.ts index 8f70a35..54e888e 100644 --- a/src/tokens/tokens.service.spec.ts +++ b/src/tokens/tokens.service.spec.ts @@ -230,6 +230,7 @@ describe('TokensService', () => { useValue: { addConnectionListener: jest.fn(), addEventListener: jest.fn(), + configure: jest.fn(), }, }, ], @@ -263,6 +264,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -276,6 +278,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_NO_DATA_POOL_ID, standard: 'ERC20', @@ -296,6 +299,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, poolData: 'ns1', }; @@ -306,6 +310,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, standard: 'ERC20', @@ -350,6 +355,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -385,6 +391,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -421,6 +428,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -455,6 +463,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -468,6 +477,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -488,6 +498,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -501,6 +512,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -521,6 +533,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, poolData: 'ns1', }; @@ -531,6 +544,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -575,6 +589,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -610,6 +625,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -646,6 +662,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -680,6 +697,7 @@ describe('TokensService', () => { it('should return ERC721NoData pool details successfully', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -695,6 +713,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC721_NO_DATA_POOL_ID, standard: 'ERC721', @@ -715,6 +734,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, poolData: 'ns1', }; @@ -725,6 +745,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, standard: 'ERC721', @@ -767,6 +788,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenMint = { + namespace: 'ns1', amount: '2', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -784,6 +806,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', @@ -818,6 +841,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -856,6 +880,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -890,6 +915,7 @@ describe('TokensService', () => { it('should return ERC721WithData pool details successfully - implicit withData config', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -904,6 +930,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC721_WITH_DATA_POOL_ID, standard: 'ERC721', @@ -924,6 +951,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, poolData: 'ns1', }; @@ -934,6 +962,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, standard: 'ERC721', @@ -975,6 +1004,7 @@ describe('TokensService', () => { it('should not mint ERC721WithData token due to invalid amount', async () => { const ctx = newContext(); const request: TokenMint = { + namespace: 'ns1', amount: '2', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -992,6 +1022,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1030,6 +1061,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1069,6 +1101,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1112,6 +1145,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', @@ -1149,6 +1183,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -1185,6 +1220,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -1219,6 +1255,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: 'address=0x123&standard=notAStandard&type=fungible', poolData: 'ns1', }; @@ -1231,6 +1268,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: 'address=0x123&type=fungible', poolData: 'ns1', }; diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index b82c961..d033351 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -71,7 +71,6 @@ export class TokensService { baseUrl: string; topic: string; - stream: EventStream; factoryAddress = ''; constructor( @@ -87,33 +86,25 @@ export class TokensService { this.factoryAddress = factoryAddress.toLowerCase(); this.proxy.addConnectionListener(this); this.proxy.addEventListener(new TokenListener(this.mapper, this.blockchain)); + const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; + this.proxy.configure(wsUrl, this.topic); } async onConnect() { const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; - const stream = await this.getStream(newContext()); - this.proxy.configure(wsUrl, stream.name); - } - - /** - * One-time initialization of event stream and base subscription. - */ - async init(ctx: Context) { - this.stream = await this.getStream(ctx); - if (this.factoryAddress !== '') { - await this.createFactorySubscription(ctx, this.factoryAddress); - } + this.proxy.configure(wsUrl, this.topic); } - private async createFactorySubscription(ctx: Context, address: string) { + private async getOrCreateFactorySubscription(ctx: Context, address: string, namespace) { const eventABI = this.mapper.getCreateEvent(); const methodABI = this.mapper.getCreateMethod(); - if (eventABI !== undefined && methodABI !== undefined) { + const stream = await this.getStream(ctx, namespace); + if (eventABI !== undefined && methodABI !== undefined && stream !== undefined) { await this.eventstream.getOrCreateSubscription( ctx, this.baseUrl, eventABI, - this.stream.id, + stream.id, packSubscriptionName(address, eventABI.name), address, [methodABI], @@ -122,15 +113,10 @@ export class TokensService { } } - private async getStream(ctx: Context) { - const stream = this.stream; - if (stream !== undefined) { - return stream; - } + private async getStream(ctx: Context, namespace: string) { await this.migrationCheck(ctx); this.logger.log('Creating stream with name ' + this.topic); - this.stream = await this.eventstream.createOrUpdateStream(ctx, this.topic, this.topic); - return this.stream; + return this.eventstream.createOrUpdateStream(ctx, `${this.topic}/${namespace}`, this.topic); } /** @@ -276,6 +262,7 @@ export class TokensService { } return { + namespace: dto.namespace, data: dto.data, poolLocator: packPoolLocator(poolLocator), standard: dto.type === TokenType.FUNGIBLE ? 'ERC20' : 'ERC721', @@ -296,7 +283,7 @@ export class TokensService { address: string, dto: TokenPool, ): Promise { - await this.createFactorySubscription(ctx, address); + await this.getOrCreateFactorySubscription(ctx, address, dto.namespace); const { method, params } = await this.mapper.getCreateMethodAndParams(ctx, address, dto); const response = await this.blockchain.sendTransaction( ctx, @@ -347,7 +334,7 @@ export class TokensService { poolLocator.type === TokenType.FUNGIBLE, ); const eventAbis = this.getEventAbis(poolLocator); - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const promises = [ this.eventstream.getOrCreateSubscription( @@ -389,6 +376,7 @@ export class TokensService { const poolInfo = await this.queryPool(ctx, poolLocator); const tokenPoolEvent: TokenPoolEvent = { + namespace: dto.namespace, poolData: dto.poolData, poolLocator: dto.poolLocator, standard: poolLocator.type === TokenType.FUNGIBLE ? 'ERC20' : 'ERC721', @@ -412,7 +400,7 @@ export class TokensService { throw new BadRequestException('Invalid pool locator'); } - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const eventAbis = this.getEventAbis(poolLocator); const promises = [ this.eventstream.deleteSubscriptionByName( diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index b25efa7..8b71689 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -50,6 +50,14 @@ export interface WebSocketMessage { data: any; } +export interface WebSocketActionBase { + type: 'start' | 'ack' | 'nack' | 'protocol_error'; +} + +export interface WebSocketStart extends WebSocketActionBase { + namespace: string; +} + /** * Base class for websocket gateways. * diff --git a/test/app.e2e-context.ts b/test/app.e2e-context.ts index 49a1f52..d17b7f4 100644 --- a/test/app.e2e-context.ts +++ b/test/app.e2e-context.ts @@ -27,16 +27,28 @@ export class TestContext { }; eventHandler: (events: EventBatch) => void; receiptHandler: (receipt: EventStreamReply) => void; + connected: Promise; + private resolveConnected: () => void; + private rejectConnected: () => void; + + resetConnectedPromise() { + this.connected = new Promise((resolve, reject) => { + this.resolveConnected = resolve; + this.rejectConnected = reject; + }); + } eventstream = { connect: ( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) => { this.eventHandler = handleEvents; this.receiptHandler = handleReceipt; + this.resolveConnected(); }, getStreams: jest.fn(), @@ -85,6 +97,7 @@ export class TestContext { (this.app.getHttpServer() as Server).listen(); this.server = request(this.app.getHttpServer()); + this.resetConnectedPromise(); } async end() { diff --git a/test/suites/erc20.ts b/test/suites/erc20.ts index 65568ee..0ff305c 100644 --- a/test/suites/erc20.ts +++ b/test/suites/erc20.ts @@ -89,10 +89,10 @@ export default (context: TestContext) => { }), ); }; - describe('ERC20WithData', () => { it('Create pool - unrecognized fields', async () => { const request = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -102,8 +102,8 @@ export default (context: TestContext) => { symbol: SYMBOL, isBestPool: true, // will be stripped but will not cause an error }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_WITH_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -116,16 +116,14 @@ export default (context: TestContext) => { schema: ERC20_WITH_DATA_SCHEMA, }, }); - mockPoolQuery(true); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - invalid type', async () => { const request: TokenPool = { + namespace: 'ns1', type: 'funkible' as TokenType, requestId: REQUEST, signer: IDENTITY, @@ -134,19 +132,17 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const response = { statusCode: 400, message: ['type must be a valid enum value'], error: 'Bad Request', }; - context.http.post = jest.fn(() => new FakeObservable(response)); await context.server.post('/createpool').send(request).expect(400).expect(response); }); - it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -155,8 +151,8 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_WITH_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -169,22 +165,19 @@ export default (context: TestContext) => { schema: ERC20_WITH_DATA_SCHEMA, }, }); - mockPoolQuery(true); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -194,29 +187,24 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === MINT_WITH_DATA) as IAbiMethod, params: ['0x123', '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -226,28 +214,23 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === TRANSFER_WITH_DATA) as IAbiMethod, params: [IDENTITY, '0x123', '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -257,29 +240,24 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === BURN_WITH_DATA) as IAbiMethod, params: [IDENTITY, '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { allowance: '100' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -289,24 +267,20 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === APPROVE_WITH_DATA) as IAbiMethod, params: ['2', '100', '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); }); - describe('ERC20NoData', () => { it('Create pool - unrecognized fields', async () => { const request = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -316,8 +290,8 @@ export default (context: TestContext) => { symbol: SYMBOL, isBestPool: true, // will be stripped but will not cause an error }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_NO_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -329,16 +303,14 @@ export default (context: TestContext) => { schema: ERC20_NO_DATA_SCHEMA, }, }); - mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - invalid type', async () => { const request: TokenPool = { + namespace: 'ns1', type: 'funkible' as TokenType, requestId: REQUEST, signer: IDENTITY, @@ -347,18 +319,16 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const response = { statusCode: 400, message: ['type must be a valid enum value'], error: 'Bad Request', }; - await context.server.post('/createpool').send(request).expect(400).expect(response); }); - it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -367,8 +337,8 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_NO_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -380,22 +350,19 @@ export default (context: TestContext) => { schema: ERC20_NO_DATA_SCHEMA, }, }); - mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -405,29 +372,24 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === MINT_NO_DATA) as IAbiMethod, params: ['0x123', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -437,28 +399,23 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === TRANSFER_NO_DATA) as IAbiMethod, params: ['0x123', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -468,29 +425,24 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === BURN_NO_DATA) as IAbiMethod, params: ['20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { allowance: '100' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -500,20 +452,15 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === APPROVE_NO_DATA) as IAbiMethod, params: ['2', '100'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token - custom ABI', async () => { const burnMethods = [ { @@ -548,8 +495,8 @@ export default (context: TestContext) => { outputs: [], }, ]; - const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -559,7 +506,6 @@ export default (context: TestContext) => { methods: burnMethods, }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -569,20 +515,15 @@ export default (context: TestContext) => { method: burnMethods[0], params: ['20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token - custom ABI, burn from other', async () => { const burnMethods = [ { @@ -617,8 +558,8 @@ export default (context: TestContext) => { outputs: [], }, ]; - const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -628,7 +569,6 @@ export default (context: TestContext) => { methods: burnMethods, }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -638,27 +578,21 @@ export default (context: TestContext) => { method: burnMethods[1], params: ['0x2', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Check interface', async () => { const request: CheckInterfaceRequest = { poolLocator: ERC20_NO_DATA_POOL_ID, format: InterfaceFormat.ABI, methods: ERC20NoDataABI.abi, }; - const response: CheckInterfaceResponse = { approval: { format: InterfaceFormat.ABI, @@ -682,7 +616,6 @@ export default (context: TestContext) => { ], }, }; - await context.server.post('/checkinterface').send(request).expect(200).expect(response); }); }); diff --git a/test/suites/erc721.ts b/test/suites/erc721.ts index bf41034..fa592df 100644 --- a/test/suites/erc721.ts +++ b/test/suites/erc721.ts @@ -83,7 +83,6 @@ export default (context: TestContext) => { }), ); }; - const mockURIQuery = (withURI: boolean) => { context.http.post.mockReturnValueOnce( new FakeObservable({ @@ -91,10 +90,10 @@ export default (context: TestContext) => { }), ); }; - describe('ERC721WithData', () => { it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -103,7 +102,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_WITH_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -116,17 +114,15 @@ export default (context: TestContext) => { schema: ERC721_WITH_DATA_SCHEMA, }, }); - mockURIQuery(true); mockPoolQuery(undefined); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - base URI', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -135,7 +131,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_WITH_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -148,22 +143,19 @@ export default (context: TestContext) => { schema: ERC721_WITH_DATA_SCHEMA, }, }); - mockURIQuery(true); mockPoolQuery(undefined); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -173,29 +165,24 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === MINT_WITH_URI) as IAbiMethod, params: ['0x123', '0x00', ''], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -205,28 +192,23 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === TRANSFER_WITH_DATA) as IAbiMethod, params: [IDENTITY, '0x123', '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -236,29 +218,24 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === BURN_WITH_DATA) as IAbiMethod, params: [IDENTITY, '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for all', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: {}, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -270,29 +247,24 @@ export default (context: TestContext) => { ) as IAbiMethod, params: ['2', true, '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for one', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { tokenIndex: '5' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -302,24 +274,20 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === APPROVE_WITH_DATA) as IAbiMethod, params: ['2', '5', '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); }); - describe('ERC721NoData', () => { it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -328,7 +296,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_NO_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -341,23 +308,20 @@ export default (context: TestContext) => { schema: ERC721_NO_DATA_SCHEMA, }, }); - mockURIQuery(false); mockURIQuery(false); mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -367,29 +331,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === MINT_NO_DATA) as IAbiMethod, params: ['0x123'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -401,28 +360,23 @@ export default (context: TestContext) => { ) as IAbiMethod, params: [IDENTITY, '0x123', '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -432,29 +386,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === BURN_NO_DATA) as IAbiMethod, params: ['721'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for all', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: {}, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -464,29 +413,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === APPROVE_FOR_ALL_NO_DATA) as IAbiMethod, params: ['2', true], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for one', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { tokenIndex: '5' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -496,20 +440,15 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === APPROVE_NO_DATA) as IAbiMethod, params: ['2', '5'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Mint token - custom ABI', async () => { const safeMintAutoIndex = { name: 'safeMint', @@ -524,8 +463,8 @@ export default (context: TestContext) => { ], outputs: [], }; - const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', @@ -534,7 +473,6 @@ export default (context: TestContext) => { methods: [safeMintAutoIndex], }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -544,27 +482,21 @@ export default (context: TestContext) => { method: safeMintAutoIndex, params: ['0x123'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Check interface', async () => { const request: CheckInterfaceRequest = { poolLocator: ERC721_NO_DATA_POOL_ID, format: InterfaceFormat.ABI, methods: ERC721NoDataABI.abi, }; - const response: CheckInterfaceResponse = { approval: { format: InterfaceFormat.ABI, @@ -593,7 +525,6 @@ export default (context: TestContext) => { ], }, }; - await context.server.post('/checkinterface').send(request).expect(200).expect(response); }); }); diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 01c2bca..55e05d7 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -234,7 +234,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent], batchNumber: 12345 }); }) @@ -290,7 +295,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) @@ -375,7 +385,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721MintTransferEvent] }); }) @@ -429,7 +444,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20TransferEvent] }); }) @@ -516,7 +536,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721TransferEvent] }); }) @@ -569,7 +594,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20BurnEvent] }); }) @@ -654,7 +684,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721BurnEvent] }); }) @@ -713,7 +748,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20ApprovalEvent] }); }) @@ -772,7 +812,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721ApprovalEvent] }); }) @@ -831,7 +876,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockApprovalForAllEvent] }); }) @@ -847,7 +897,12 @@ export default (context: TestContext) => { it('Success receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -873,7 +928,12 @@ export default (context: TestContext) => { it('Error receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -905,7 +965,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) @@ -918,25 +983,16 @@ export default (context: TestContext) => { }) .close(); - await context.server.ws('/api/ws').expectJson(message => { - expect(message.id).toBeDefined(); - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-mint'); - return true; - }); - }); - - it('Client switchover', async () => { - context.eventstream.getSubscription.mockReturnValueOnce({ - name: packSubscriptionName(CONTRACT_ADDRESS, '', 'default'), - }); - - const ws1 = context.server.ws('/api/ws'); - const ws2 = context.server.ws('/api/ws'); + context.resetConnectedPromise(); - await ws1 - .exec(() => { + await context.server + .ws('/api/ws') + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) @@ -946,15 +1002,6 @@ export default (context: TestContext) => { expect(message.data.events).toHaveLength(1); expect(message.data.events[0].event).toEqual('token-mint'); return true; - }) - .close(); - - await ws2.expectJson(message => { - expect(message.id).toBeDefined(); - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-mint'); - return true; - }); + }); }); };