Skip to content

Commit

Permalink
BC-8587 - change types
Browse files Browse the repository at this point in the history
  • Loading branch information
SevenWaysDP committed Jan 6, 2025
1 parent c6d4adb commit 34fffc5
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 28 deletions.
15 changes: 9 additions & 6 deletions src/infra/redis/interfaces/stream-message-reply.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import { RedisKey } from 'ioredis';

interface Message {
m?: Buffer;
docName?: string;
compact?: Buffer;
export interface M {
m: Buffer;
}
export interface DocName {
docName: string;
}
export interface Compact {
compact: Buffer;
}

export interface StreamMessageReply {
id: RedisKey;
message: Message;
message: M | DocName | Compact;
}

export interface StreamMessagesReply {
name: string;
messages: StreamMessageReply[] | null;
Expand Down
8 changes: 5 additions & 3 deletions src/infra/redis/mapper.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { RedisKey } from 'ioredis';
import { RedisGuard, TypeGuard } from './guards/index.js';
import {
Compact,
DocName,
M,
StreamMessageReply,
StreamMessagesReply,
XAutoClaimResponse,
Expand Down Expand Up @@ -69,8 +71,8 @@ function mapToStreamMessageReply(value: XItem): StreamMessageReply {
return { id: id.toString(), message: transformTuplesReply(fields) };
}

function transformTuplesReply(reply: RedisKey[]): Record<string, RedisKey> {
const message: Record<string, RedisKey> = Object.create(null);
function transformTuplesReply(reply: string[] | Buffer[]): M | DocName | Compact {
const message = Object.create(null);

for (let i = 0; i < reply.length; i += 2) {
message[reply[i].toString()] = reply[i + 1];
Expand Down
2 changes: 1 addition & 1 deletion src/infra/y-redis/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const extractMessagesFromStreamReply = (
messages: [] as Uint8Array[],
}));
docStreamReply.messages?.forEach((m: StreamMessageReply) => {
if (m.message.m != null) {
if ('m' in m.message && m.message.m != null) {
const unit8ArrayRedisKey = castRedisKeyToUnit8Array(m.message.m);
docMessages.messages.push(unit8ArrayRedisKey);
}
Expand Down
8 changes: 7 additions & 1 deletion src/infra/y-redis/y-redis.client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ describe(YRedisClient.name, () => {
},
];

const expectedMessages = messages?.map((message) => message.message.m).filter((m) => m != null);
const expectedMessages: Buffer[] = [];

messages?.forEach((message) => {
if ('m' in message.message && message.message.m) {
expectedMessages.push(message.message.m);
}
});

return { spyMergeMessages, expectedResult, expectedMessages, props };
};
Expand Down
8 changes: 7 additions & 1 deletion src/infra/y-redis/y-redis.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ export class YRedisClient implements OnModuleInit {
return [];
}

const filteredMessages = messages.map((message) => message.message.m).filter((m) => m != null);
const filteredMessages: Buffer[] = [];

messages.forEach((message) => {
if ('m' in message.message && Buffer.isBuffer(message.message.m) && message.message.m) {
filteredMessages.push(message.message.m);
}
});

return filteredMessages;
}
Expand Down
16 changes: 11 additions & 5 deletions src/modules/worker/worker.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { Awareness } from 'y-protocols/awareness';
import { Doc } from 'yjs';
import { Logger } from '../../infra/logger/logger.js';
import { RedisAdapter, StreamMessageReply } from '../../infra/redis/interfaces/index.js';
import { RedisAdapter, StreamMessageReply, Task } from '../../infra/redis/interfaces/index.js';
import { streamMessageReplyFactory } from '../../infra/redis/testing/stream-message-reply.factory.js';
import { xAutoClaimResponseFactory } from '../../infra/redis/testing/x-auto-claim-response.factory.js';
import { StorageService } from '../../infra/storage/storage.service.js';
Expand All @@ -14,10 +14,16 @@ import { REDIS_FOR_WORKER } from './worker.const.js';
import { WorkerService } from './worker.service.js';

const mapStreamMessageRepliesToTask = (streamMessageReplies: StreamMessageReply[]) => {
const tasks = streamMessageReplies.map((message) => ({
stream: message.message.compact?.toString(),
id: message.id.toString(),
}));
const tasks: Task[] = [];

streamMessageReplies.forEach((message) => {
if ('compact' in message.message) {
tasks.push({
stream: message.message.compact.toString(),
id: message.id.toString(),
});
}
});

return tasks;
};
Expand Down
27 changes: 16 additions & 11 deletions src/modules/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ export class WorkerService implements Job, OnModuleDestroy {
`Stream still empty, removing recurring task from queue ${JSON.stringify({ stream: task.stream })}`,
);

const deleteEntryId = deletedDocEntries.find((entry) => entry.message.docName === task.stream)?.id.toString();
const deleteEntry = deletedDocEntries.find(
(entry) => 'docName' in entry.message && entry.message.docName === task.stream,
);

if (deleteEntryId) {
if (deleteEntry) {
const roomStreamInfos = decodeRedisRoomStreamName(task.stream.toString(), this.redis.redisPrefix);
await Promise.all([
this.redis.deleteDeletedDocEntry(deleteEntryId),
this.redis.deleteDeletedDocEntry(deleteEntry.id.toString()),
this.storageService.deleteDocument(roomStreamInfos.room, roomStreamInfos.docid),
]);
}
Expand All @@ -157,9 +159,10 @@ export class WorkerService implements Job, OnModuleDestroy {
// helper
private mapReclaimTaskToTask(reclaimedTasks: XAutoClaimResponse): Task[] {
const tasks: Task[] = [];
reclaimedTasks.messages?.forEach((m) => {
const stream = m?.message.compact;
stream && tasks.push({ stream: stream.toString(), id: m?.id.toString() });
reclaimedTasks.messages?.forEach((entry) => {
if ('compact' in entry.message && entry.message.compact) {
tasks.push({ stream: entry.message.compact.toString(), id: entry?.id.toString() });
}
});

if (tasks.length > 0) {
Expand All @@ -176,11 +179,13 @@ export class WorkerService implements Job, OnModuleDestroy {
}

private extractDocNamesFromStreamMessageReply(docEntries: StreamMessageReply[]): string[] {
const docNames = docEntries
.map((entry) => {
return entry.message.docName;
})
.filter((docName) => docName !== undefined);
const docNames: string[] = [];

docEntries.forEach((entry) => {
if ('docName' in entry.message && typeof entry.message.docName === 'string' && entry.message.docName) {
docNames.push(entry.message.docName);
}
});

return docNames;
}
Expand Down

0 comments on commit 34fffc5

Please sign in to comment.