Skip to content

Commit

Permalink
feat: SDK like improvements
Browse files Browse the repository at this point in the history
Added 20k dev wallets
Updated to an sdk like feel
Added caching checks
Check for existing conversation
Moved to weighted batches
  • Loading branch information
Alex Risch authored and Alex Risch committed May 10, 2024
1 parent a344282 commit 39be7a5
Show file tree
Hide file tree
Showing 9 changed files with 20,300 additions and 71 deletions.
19,959 changes: 19,959 additions & 0 deletions addresses.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
"typescript": "^5.0.0"
},
"dependencies": {
"@redis/client": "^1.5.14",
"@xmtp/fs-persistence": "^0.0.4",
"@xmtp/grpc-api-client": "^0.2.4",
"@xmtp/proto": "^3.55.0",
"@xmtp/redis-persistence": "^0.0.4",
"@xmtp/xmtp-js": "11.6.0",
"cors": "^2.8.5",
"dotenv": "^16.4.5",
Expand Down
24 changes: 20 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import express, { type Request, type Response } from "express";
import cors from "cors";
import dotenv from "dotenv";
import { getXmtpClient, initializeClients } from "./lib/client";
import { listSubscribers } from "./lib/listSubscribers";
import { invitation } from "@xmtp/proto";
import { startBroadcast } from "./lib/startBroadcast";
import { addBroadcast } from "./lib/broadcasts";
import { addBroadcast, broadcastEntities } from "./lib/broadcasts";
import { broadCastConfigEntities } from "./lib/broadcasterConfigs";
import { base64ToBytes } from "./lib/utils/base64ToBytes";
import { getDevWalletAddresses } from "./lib/addresses";

const envPath = `.env.${process.env.NODE_ENV}`;
dotenv.config({ path: envPath });
Expand Down Expand Up @@ -111,14 +111,30 @@ app.post("/broadcast", async (req: Request, res: Response) => {
res.status(500).send("Client not initialized");
return;
}

const subscribers = await listSubscribers(client);
const subscribers = getDevWalletAddresses();
const broadcastId = addBroadcast(subscribers, text);
startBroadcast(client, subscribers, text, broadcastId);

res.status(200).send({ broadcastId });
});

app.get("/broadcast", async (req: Request, res: Response) => {
// Get broadcast id from params
const { broadcastId } = req.query;
console.log(broadcastId);
if (typeof broadcastId !== "string") {
res.status(400).send("BroadcastId must be a string");
return;
}
if (broadcastEntities.entities[broadcastId] === undefined) {
console.log(broadcastEntities.ids);
res.status(404).send("Broadcast not found");
return;
}
const broadcast = broadcastEntities.entities[broadcastId];
res.status(200).send(broadcast);
});

app.listen(PORT, () => {
console.log(`Listening on port ${PORT}...`);
});
7 changes: 7 additions & 0 deletions src/lib/addresses.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import * as fs from "fs";

export const getDevWalletAddresses = (): string[] => {
// Read from addresses.json
const addresses = JSON.parse(fs.readFileSync("addresses.json", "utf-8"));
return addresses;
};
15 changes: 9 additions & 6 deletions src/lib/broadcasts.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
interface Broadcast {
id: string;
message: string;
recipients: string[];
sent: string[];
recipients: number;
sent: number;
startTime: string;
endTime?: string;
status: "sending" | "waiting" | "completed" | "failed";
Expand Down Expand Up @@ -33,8 +33,8 @@ export const addBroadcast = (recipients: string[], message: string): string => {
const broadcast: Broadcast = {
id,
message,
recipients,
sent: [],
recipients: recipients.length,
sent: 0,
startTime: new Date().toISOString(),
status: "sending",
};
Expand All @@ -44,9 +44,12 @@ export const addBroadcast = (recipients: string[], message: string): string => {
return id;
};

export const updateBroadcastFromBatch = (id: string, batch: string[]): void => {
export const incrementBroadcastSent = (id: string): void => {
const broadcast = broadcastEntities.entities[id];
broadcast.sent.push(...batch);
const count = broadcast.sent + 1;
const total = broadcast.recipients;
broadcastEntities.entities[id].sent = broadcast.sent + 1;
console.log(`Message sent for broadcast ${id} ${count}/${total}`);
};

export const updateBroadcastStatus = (
Expand Down
11 changes: 8 additions & 3 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { Client, type XmtpEnv } from "@xmtp/xmtp-js";
import { GrpcApiClient } from "@xmtp/grpc-api-client";
import { FsPersistence } from "@xmtp/fs-persistence";
import { RedisPersistence } from "@xmtp/redis-persistence";
import { createClient } from "@redis/client";
import { broadCastConfigEntities } from "./broadcasterConfigs";
import { base64ToBytes } from "./utils/base64ToBytes";

const redis = createClient({
url: process.env.REDIS_URL,
});
redis.connect();

let clientsInitialized = false;
const clients = new Map<string, Client>();
// Work around for some weirdness when deploying, could be solved by removing grpc though
Expand All @@ -23,12 +29,11 @@ export async function initializeClients() {
console.error(`Missing ${config.id}_FILE_PERSISTENCE_PATH`);
return;
}
console.log("About to initialize client for: ", address);
try {
const client = await Client.create(null, {
privateKeyOverride: base64ToBytes(keyBundle),
apiClientFactory: GrpcApiClient.fromOptions as any,
basePersistence: new FsPersistence(filePath),
basePersistence: new RedisPersistence(redis as any, "xmtp:"),
env: (process.env.XMTP_ENV as XmtpEnv) ?? "dev",
});
console.log(
Expand Down
121 changes: 63 additions & 58 deletions src/lib/startBroadcast.ts
Original file line number Diff line number Diff line change
@@ -1,76 +1,81 @@
import type { Client } from "@xmtp/xmtp-js";
import {
finishBroadcast,
updateBroadcastFromBatch,
incrementBroadcastSent,
updateBroadcastStatus,
} from "./broadcasts";
import { Broadcast } from "./utils/Broadcast";
const XMTP_RATE_LIMIT = 1000;
const XMTP_RATE_LIMIT_TIME = 60 * 1000; // 1 minute
const XMTP_RATE_LIMIT_TIME_INCREASE = XMTP_RATE_LIMIT_TIME * 5; // 5 minutes

const delay = (ms: number) =>
new Promise<void>((resolve) => setTimeout(resolve, ms));

let sendCount = 0;
let errorCount = 0;
let startTime: number;
export const startBroadcast = async (
client: Client,
broadcastAddresses: string[],
message: string,
broadcastId: string
): Promise<void> => {
const batches: string[][] = [];
let batch: string[] = [];
const canMessageAddresses = await client.canMessage(broadcastAddresses);
let errorCount = 0;
for (let i = 0; i < canMessageAddresses.length; i++) {
if (canMessageAddresses[i]) {
batch.push(broadcastAddresses[i]);
}
// Add a batch of 500 addresses to the batches array
// An introduction message is sent for new contacts, so each new message will actually be 2 messages in this case
// We want to send 1000 messages per minute, so we split the batches in half
// Additional optimization can be done to send messages to contacts that have already been introduced
if (batch.length === XMTP_RATE_LIMIT / 2) {
batches.push(batch);
batch = [];
}
}
if (batch.length > 0) {
batches.push(batch);
}

for (let i = 0; i < batches.length; i++) {
const batch: string[] = [];
const sentAddresses: string[] = [];
updateBroadcastStatus(broadcastId, "sending");
await Promise.all(
batches[i].map(async (address, index) => {
const conversation = await client.conversations.newConversation(
address
);
try {
await conversation.send(message);
console.log(
`Sent message for batch ${i} index ${index} to ${address}`
);
sentAddresses.push(address);
} catch (err) {
errorCount++;
console.error(err);
batch.push(address);
// Add error handling here
}
})
const onBroadcastComplete = () => {
let endTime = Date.now();
console.log(
`Broadcast ${broadcastId} completed Total time ${endTime - startTime}ms`
);
updateBroadcastFromBatch(broadcastId, sentAddresses);
if (i !== batches.length - 1) {
updateBroadcastStatus(broadcastId, "waiting");
// Wait between batches
console.log(`Waiting between batches ${i} and ${i + 1}`);
await delay(XMTP_RATE_LIMIT_TIME_INCREASE);
}
if (batch.length > 0) {
batches.push(batch);
}
finishBroadcast(broadcastId);
};
const onMessageFailed = (address: string) => {
errorCount++;
console.log(`Message failed for address ${errorCount} : ${address}`);
};
const onMessageSent = () => {
sendCount++;
incrementBroadcastSent(broadcastId);
};
const onCanMessageAddressesUpdate = (addresses: string[]) => {
console.log(`Can message addresses updated to ${addresses.length}`);
};
const onBatchComplete = (addresses: string[]) => {
console.log(`Batch complete for ${addresses.length} addresses`);
};
const onCantMessageAddress = (address: string) => {
console.log(`Can't message address ${address}`);
};
const onCanMessageAddreses = (addresses: string[]) => {
console.log(`Can message addresses ${addresses.length}`);
};
const onDelay = (ms: number) => {
console.log(`Delaying for ${ms}ms`);
updateBroadcastStatus(broadcastId, "waiting");
};
const onBatchStart = (addresses: string[]) => {
console.log(`Batch start for ${addresses.length} addresses`);
updateBroadcastStatus(broadcastId, "sending");
};
errorCount = 0;
sendCount = 0;
startTime = Date.now();
const broadcast = new Broadcast({
client,
addresses: broadcastAddresses,
cachedCanMessageAddresses: [],
rateLimitAmount: XMTP_RATE_LIMIT,
rateLimitTime: XMTP_RATE_LIMIT_TIME_INCREASE,
onBatchStart,
onBatchComplete,
onBroadcastComplete,
onCantMessageAddress,
onCanMessageAddreses,
onMessageFailed,
onMessageSent,
onCanMessageAddressesUpdate,
onDelay,
});
try {
await broadcast.broadcast({ message });
} catch (err) {
console.error(`Error broadcasting: ${err}`);
updateBroadcastStatus(broadcastId, "failed");
}
finishBroadcast(broadcastId);
};
Loading

0 comments on commit 39be7a5

Please sign in to comment.