Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BC-8403 Code improvements #43

Merged
merged 69 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
38ba503
Add callback to stop the worker service if the client is destroyed.
CeEv Dec 4, 2024
81c2de8
seperate job code
CeEv Dec 4, 2024
898d7ff
Move wait into loop and make it configurable.
CeEv Dec 4, 2024
b94bccd
Outsource handling to private methods.
CeEv Dec 5, 2024
1fc05d4
Add todo
CeEv Dec 5, 2024
98a4d5b
Add comments
CeEv Dec 5, 2024
01e1f49
BC-8403 - WIP: refactor Redis and YRedis modules
SevenWaysDP Dec 6, 2024
07203c3
Rename api to yRedisClient to match with our implicit naming guidlines.
CeEv Dec 9, 2024
23a89e7
Cleanup worker start calling.
CeEv Dec 9, 2024
8b0bf5c
Simplify test setup
CeEv Dec 9, 2024
cbdd9ac
Rename User to YRedisUser.
CeEv Dec 9, 2024
3cedcae
Outsource YRedisUser in additonal file.
CeEv Dec 9, 2024
b760ae3
Extract code example as show case to seperate y-redis logic from uws …
CeEv Dec 9, 2024
a6adeaa
Update y-redis.client.ts
CeEv Dec 9, 2024
cf82a85
Refactor y-redis service structure and add YRedisService implementation
SevenWaysDP Dec 9, 2024
6ba75bb
fixup! Refactor y-redis service structure and add YRedisService imple…
SevenWaysDP Dec 9, 2024
c96f60e
Outsource some small methods.
CeEv Dec 9, 2024
535f2c0
Merge branch 'BC-8403-code-improvments' of https://github.com/hpi-sch…
CeEv Dec 9, 2024
35e0666
Add RedisUserFactory
CeEv Dec 9, 2024
dc0575b
replace primitives
CeEv Dec 9, 2024
f7c01d2
Add todo
CeEv Dec 9, 2024
bf396e0
Refactor logger methods and update YRedisClient to use Logger for war…
SevenWaysDP Dec 10, 2024
37bc8b6
Switch from array type to single and use [] instead
CeEv Dec 10, 2024
e85a9aa
Merge branch 'BC-8403-code-improvments' of https://github.com/hpi-sch…
CeEv Dec 10, 2024
039f805
Fix import path for WorkerService in tldraw-worker.app.ts
SevenWaysDP Dec 10, 2024
9cd9a40
Code style changes
CeEv Dec 10, 2024
0fdefde
Merge branch 'BC-8403-code-improvments' of https://github.com/hpi-sch…
CeEv Dec 10, 2024
0ceffca
Move auth error logging to gateway.
CeEv Dec 10, 2024
f3dc7c2
wrap subscriber behind y-redis-service interface
CeEv Dec 10, 2024
24e414d
Change data flow to avoid double execution and import from helpers.
CeEv Dec 10, 2024
f57083d
Code style and add try catch for proceed data.
CeEv Dec 10, 2024
ba15d2a
Fix for tests
CeEv Dec 10, 2024
f54b098
Refactor y-redis subscriber service and update environment variables
SevenWaysDP Dec 11, 2024
a2f6205
add tests for y-redis.service
SevenWaysDP Dec 11, 2024
52f555a
Implement OnModuleInit in YRedisClient to create Redis group on initi…
SevenWaysDP Dec 11, 2024
4c3103f
temp
SevenWaysDP Dec 11, 2024
c53f425
revert .env.test
SevenWaysDP Dec 11, 2024
a576ea5
fixup! revert .env.test
SevenWaysDP Dec 11, 2024
dc5d908
Refactor YRedisModule to support server and worker configurations
SevenWaysDP Dec 11, 2024
2bb2a07
Add worker test cases
bischofmax Dec 11, 2024
a92b775
Merge branch 'BC-8403-code-improvments' of github.com:hpi-schul-cloud…
bischofmax Dec 11, 2024
7979661
Refactor YRedisDoc interface and implementation; introduce YRedisDocF…
SevenWaysDP Dec 12, 2024
a88ddb6
Enhance SubscriberService to wait if no streams are available; refact…
SevenWaysDP Dec 12, 2024
032ce7b
TESTS: Update websocket configuration to use port 3399; refactor test…
SevenWaysDP Dec 12, 2024
d451606
code review
SevenWaysDP Dec 12, 2024
d20fa3f
Merge remote-tracking branch 'origin/main' into BC-8403-code-improvments
SevenWaysDP Dec 12, 2024
6df6b03
Update TLDRAW_WEBSOCKET_URL to use port 3399 in Tldraw-Config API tests
SevenWaysDP Dec 12, 2024
91ab698
Minor naming fixes
bischofmax Dec 12, 2024
8e96c8a
Merge branch 'BC-8403-code-improvments' of github.com:hpi-schul-cloud…
bischofmax Dec 12, 2024
5664f91
Code review
SevenWaysDP Dec 12, 2024
0b2c66f
Rename deleteStorageReferencesIfExist
bischofmax Dec 12, 2024
715b14e
Rename helper methods in worker
bischofmax Dec 12, 2024
3e98e71
Merge branch 'BC-8403-code-improvments' of github.com:hpi-schul-cloud…
bischofmax Dec 12, 2024
a821b40
Improve authorization handling in WebsocketGateway by sending error r…
SevenWaysDP Dec 13, 2024
0743c46
revert renaming
SevenWaysDP Dec 13, 2024
54c469c
delete unused file
SevenWaysDP Dec 13, 2024
cbf5f6c
code review
SevenWaysDP Dec 13, 2024
0f4eee0
Refactor Redis service to factory pattern and update imports accordingly
SevenWaysDP Dec 13, 2024
6d5bb3c
code review
SevenWaysDP Dec 13, 2024
2bbfa1c
code review
SevenWaysDP Dec 13, 2024
e086f6b
Refactor StreamMessageReply interface to use Buffer for message prope…
SevenWaysDP Dec 13, 2024
e322f84
Refactor YRedisClient and WorkerService to remove destroyedCallback a…
SevenWaysDP Dec 13, 2024
be25869
Refactor logger usage to replace log method with info for improved cl…
SevenWaysDP Dec 13, 2024
f20d5cd
code review
SevenWaysDP Dec 13, 2024
a737787
Fix import path for YRedisDocProps to use correct filename
SevenWaysDP Dec 13, 2024
0d6ba28
add tests
SevenWaysDP Dec 13, 2024
7456a2c
Rename y-redis testing files to redis for consistency and update impo…
SevenWaysDP Dec 13, 2024
83186b2
Refactor RedisFactory tests to use consistent naming and improve clarity
SevenWaysDP Dec 13, 2024
68a2670
Refactor Redis testing factories for consistency and clarity
SevenWaysDP Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ S3_SECRET_KEY=miniouser
S3_SSL=false

FEATURE_TLDRAW_ENABLED=true
TLDRAW_WEBSOCKET_URL=ws://localhost:3345
TLDRAW_WEBSOCKET_URL=ws://localhost:3399
TLDRAW_WEBSOCKET_PORT=3399

X_API_ALLOWED_KEYS=randomString
4 changes: 2 additions & 2 deletions src/apps/tldraw-server.app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ async function bootstrap(): Promise<void> {
await metricsApp.listen(metricsPort, async () => {
const logger = await metricsApp.resolve(Logger);
logger.setContext('METRICS');
logger.log(`Metrics server is running on port ${metricsPort}`);
logger.info(`Metrics server is running on port ${metricsPort}`);
});

await nestApp.listen(httpPort, async () => {
const logger = await nestApp.resolve(Logger);
logger.setContext('TLDRAW');
logger.log(`Server is running on port ${httpPort}`);
logger.info(`Server is running on port ${httpPort}`);
});
}
bootstrap();
10 changes: 10 additions & 0 deletions src/apps/tldraw-worker.app.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { NestFactory } from '@nestjs/core';
import { WorkerModule } from '../modules/worker/worker.module.js';
import { WorkerService } from '../modules/worker/worker.service.js';

async function bootstrap(): Promise<void> {
const nestApp = await NestFactory.createApplicationContext(WorkerModule);

await nestApp.init();
const workerService = await nestApp.resolve(WorkerService);

try {
workerService.start();
} catch (error) {
console.error(error);
workerService.stop();
process.exit(1);
}
}
bootstrap();
1 change: 0 additions & 1 deletion src/infra/authorization/authorization.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ export class AuthorizationService {

private createErrorResponsePayload(code: number, reason: string): ResponsePayload {
const response = ResponsePayloadBuilder.buildWithError(code, reason);
this.logger.log(`Error: ${code} - ${reason}`);

return response;
}
Expand Down
14 changes: 7 additions & 7 deletions src/infra/logger/logger.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { RequestLoggingBody } from './interfaces/logger.interface.js';
import { Logger } from './logger.js';

describe('Logger', () => {
let service: Logger;
let logger: Logger;
let processStdoutWriteSpy: jest.SpyInstance<
boolean,
[str: string | Uint8Array, encoding?: BufferEncoding | undefined, cb?: ((err?: Error) => void) | undefined],
Expand All @@ -30,7 +30,7 @@ describe('Logger', () => {
],
}).compile();

service = await module.resolve(Logger);
logger = await module.resolve(Logger);
winstonLogger = module.get(WINSTON_MODULE_PROVIDER);
});

Expand All @@ -44,26 +44,26 @@ describe('Logger', () => {
processStderrWriteSpy.mockRestore();
});

describe('WHEN log logging', () => {
describe('WHEN info logging', () => {
it('should call winstonLogger.info', () => {
const error = new Error('custom error');
service.log(error.message, error.stack);
logger.info(error.message, error.stack);
expect(winstonLogger.info).toHaveBeenCalled();
});
});

describe('WHEN warn logging', () => {
it('should call winstonLogger.warning', () => {
const error = new Error('custom error');
service.warn(error.message, error.stack);
logger.warning(error.message, error.stack);
expect(winstonLogger.warning).toHaveBeenCalled();
});
});

describe('WHEN debug logging', () => {
it('should call winstonLogger.debug', () => {
const error = new Error('custom error');
service.debug(error.message, error.stack);
logger.debug(error.message, error.stack);
expect(winstonLogger.debug).toHaveBeenCalled();
});
});
Expand All @@ -81,7 +81,7 @@ describe('Logger', () => {
},
error,
};
service.http(message, error.stack);
logger.http(message, error.stack);
expect(winstonLogger.notice).toHaveBeenCalled();
});
});
Expand Down
4 changes: 2 additions & 2 deletions src/infra/logger/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ export class Logger {

public constructor(@Inject(WINSTON_MODULE_PROVIDER) private readonly logger: winston.Logger) {}

public log(message: unknown, context?: string): void {
public info(message: unknown, context?: string): void {
this.logger.info(this.createMessage(message, context));
}

public warn(message: unknown, context?: string): void {
public warning(message: unknown, context?: string): void {
SevenWaysDP marked this conversation as resolved.
Show resolved Hide resolved
this.logger.warning(this.createMessage(message, context));
}

Expand Down
4 changes: 2 additions & 2 deletions src/infra/metrics/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from './metrics.module.js';
export * from './metrics.service.js';
export { MetricsModule } from './metrics.module.js';
export { MetricsService } from './metrics.service.js';
2 changes: 1 addition & 1 deletion src/infra/redis/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from './redis.factory.js';
export * from './redis.module.js';
export * from './redis.service.js';
6 changes: 3 additions & 3 deletions src/infra/redis/interfaces/redis-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ export interface RedisAdapter {
exists(stream: string): Promise<number>;
createGroup(): Promise<void>;
quit(): Promise<void>;
readStreams(streams: StreamNameClockPair[]): Promise<StreamMessagesReply>;
readMessagesFromStream(streamName: string): Promise<StreamMessagesReply>;
readStreams(streams: StreamNameClockPair[]): Promise<StreamMessagesReply[]>;
readMessagesFromStream(streamName: string): Promise<StreamMessagesReply[]>;
reclaimTasks(consumerName: string, redisTaskDebounce: number, tryClaimCount: number): Promise<XAutoClaimResponse>;
getDeletedDocEntries(): Promise<StreamMessageReply[]>;
deleteDeleteDocEntry(id: string): Promise<number>;
deleteDeletedDocEntry(id: string): Promise<number>;
tryClearTask(task: Task): Promise<number>;
tryDeduplicateTask(task: Task, lastId: number, redisMinMessageLifetime: number): Promise<void>;
}
14 changes: 4 additions & 10 deletions src/infra/redis/interfaces/stream-message-reply.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
import { RedisKey } from 'ioredis';

interface Message {
key: RedisKey;
m?: RedisKey;
m?: Buffer;
docName?: string;
compact?: string;
compact?: Buffer;
}

export interface StreamMessageReply {
id: RedisKey;
message: Record<keyof Message, RedisKey>;
message: Message;
}

export interface StreamMessagesSingleReply {
export interface StreamMessagesReply {
name: string;
messages: StreamMessageReply[] | null;
}

export type StreamMessagesReply = {
name: string;
messages: StreamMessageReply[] | null;
}[];
6 changes: 3 additions & 3 deletions src/infra/redis/ioredis.adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ describe(IoRedisAdapter.name, () => {

await redisAdapter.createGroup();

expect(logger.log).toHaveBeenCalledWith(error);
expect(logger.info).toHaveBeenCalledWith(error);
});
});

Expand Down Expand Up @@ -412,15 +412,15 @@ describe(IoRedisAdapter.name, () => {
it('should call redis xdel with correct values', async () => {
const { id, xdelSpy, expectedProps, redisAdapter } = await setup();

await redisAdapter.deleteDeleteDocEntry(id);
await redisAdapter.deleteDeletedDocEntry(id);

expect(xdelSpy).toHaveBeenCalledWith(...expectedProps);
});

it('should return correct value', async () => {
const { id, redisAdapter } = await setup();

const result = await redisAdapter.deleteDeleteDocEntry(id);
const result = await redisAdapter.deleteDeletedDocEntry(id);

expect(result).toBe(1);
});
Expand Down
8 changes: 4 additions & 4 deletions src/infra/redis/ioredis.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class IoRedisAdapter implements RedisAdapter {
try {
await this.redis.xgroup('CREATE', this.redisWorkerStreamName, this.redisWorkerGroupName, '0', 'MKSTREAM');
} catch (e) {
this.logger.log(e);
this.logger.info(e);
// It is okay when the group already exists, so we can ignore this error.
if (e.message !== 'BUSYGROUP Consumer Group name already exists') {
throw e;
Expand All @@ -88,7 +88,7 @@ export class IoRedisAdapter implements RedisAdapter {
await this.redis.quit();
}

public async readStreams(streams: StreamNameClockPair[]): Promise<StreamMessagesReply> {
public async readStreams(streams: StreamNameClockPair[]): Promise<StreamMessagesReply[]> {
const reads = await this.redis.xreadBuffer(
'COUNT',
1000,
Expand All @@ -104,7 +104,7 @@ export class IoRedisAdapter implements RedisAdapter {
return streamReplyRes;
}

public async readMessagesFromStream(streamName: string): Promise<StreamMessagesReply> {
public async readMessagesFromStream(streamName: string): Promise<StreamMessagesReply[]> {
const reads = await this.redis.xreadBuffer('STREAMS', streamName, '0');

const streamReplyRes = mapToStreamMessagesReply(reads);
Expand Down Expand Up @@ -140,7 +140,7 @@ export class IoRedisAdapter implements RedisAdapter {
return transformedDeletedTasks;
}

public deleteDeleteDocEntry(id: string): Promise<number> {
public deleteDeletedDocEntry(id: string): Promise<number> {
const result = this.redis.xdel(this.redisDeleteStreamName, id);

return result;
Expand Down
2 changes: 1 addition & 1 deletion src/infra/redis/mapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function mapToStreamMessagesReplies(messages: XItems | unknown): StreamMe
return result;
}

export function mapToStreamMessagesReply(streamReply: XReadBufferReply | unknown): StreamMessagesReply {
export function mapToStreamMessagesReply(streamReply: XReadBufferReply | unknown): StreamMessagesReply[] {
if (streamReply === null) {
return [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as util from 'util';
import { Logger } from '../logger/index.js';
import { IoRedisAdapter } from './ioredis.adapter.js';
import { RedisConfig } from './redis.config.js';
import { RedisService } from './redis.service.js';
import { RedisFactory } from './redis.factory.js';

jest.mock('ioredis', () => {
return {
Expand All @@ -14,7 +14,7 @@ jest.mock('ioredis', () => {

jest.mock<IoRedisAdapter>('./ioredis.adapter.js');

describe('Redis Service', () => {
describe(RedisFactory.name, () => {
beforeEach(() => {
jest.resetAllMocks();
});
Expand Down Expand Up @@ -50,7 +50,7 @@ describe('Redis Service', () => {
const constructorSpy = jest.spyOn(Redis.prototype, 'constructor');

const logger = createMock<Logger>();
const service = new RedisService(config, logger);
const factory = new RedisFactory(config, logger);

const expectedProps = {
sentinels: [
Expand All @@ -62,29 +62,29 @@ describe('Redis Service', () => {
name: 'sentinelName',
};

return { resolveSrv, sentinelServiceName, service, constructorSpy, expectedProps };
return { resolveSrv, sentinelServiceName, factory, constructorSpy, expectedProps };
};

it('calls resolveSrv', async () => {
const { resolveSrv, sentinelServiceName, service } = setup();
const { resolveSrv, sentinelServiceName, factory } = setup();

await service.createRedisInstance();
await factory.createRedisInstance();

expect(resolveSrv).toHaveBeenLastCalledWith(sentinelServiceName);
});

it('create new Redis instance with correctly props', async () => {
const { service, constructorSpy, expectedProps } = setup();
const { factory, constructorSpy, expectedProps } = setup();

await service.createRedisInstance();
await factory.createRedisInstance();

expect(constructorSpy).toHaveBeenCalledWith(expectedProps);
});

it('creates a new Redis instance', async () => {
const { service } = setup();
const { factory } = setup();

const redisInstance = await service.createRedisInstance();
const redisInstance = await factory.createRedisInstance();

expect(redisInstance).toBeInstanceOf(IoRedisAdapter);
});
Expand All @@ -105,33 +105,33 @@ describe('Redis Service', () => {
const constructorSpy = jest.spyOn(Redis.prototype, 'constructor');

const logger = createMock<Logger>();
const service = new RedisService(config, logger);
const factory = new RedisFactory(config, logger);

const expectedProps = redisUrl;

return { resolveSrv, service, redisMock, constructorSpy, expectedProps };
return { resolveSrv, factory, redisMock, constructorSpy, expectedProps };
};

it('calls resolveSrv', async () => {
const { resolveSrv, service } = setup();
const { resolveSrv, factory } = setup();

await service.createRedisInstance();
await factory.createRedisInstance();

expect(resolveSrv).not.toHaveBeenCalled();
});

it('create new Redis instance with correctly props', async () => {
const { service, constructorSpy, expectedProps } = setup();
const { factory, constructorSpy, expectedProps } = setup();

await service.createRedisInstance();
await factory.createRedisInstance();

expect(constructorSpy).toHaveBeenCalledWith(expectedProps);
});

it('creates a new Redis instance', async () => {
const { service } = setup();
const { factory } = setup();

const redisInstance = await service.createRedisInstance();
const redisInstance = await factory.createRedisInstance();

expect(redisInstance).toBeInstanceOf(IoRedisAdapter);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Injectable } from '@nestjs/common';
import * as dns from 'dns';
import { Redis } from 'ioredis';
import * as util from 'util';
Expand All @@ -7,14 +6,11 @@ import { RedisAdapter } from './interfaces/index.js';
import { IoRedisAdapter } from './ioredis.adapter.js';
import { RedisConfig } from './redis.config.js';

@Injectable()
export class RedisService {
export class RedisFactory {
public constructor(
private readonly config: RedisConfig,
private readonly logger: Logger,
) {
this.logger.setContext(RedisService.name);
}
) {}

public async createRedisInstance(): Promise<RedisAdapter> {
let redisInstance: Redis;
Expand All @@ -39,7 +35,7 @@ export class RedisService {
const sentinelName = this.config.REDIS_SENTINEL_NAME;
const sentinelPassword = this.config.REDIS_SENTINEL_PASSWORD;
const sentinels = await this.discoverSentinelHosts();
this.logger.log(`Discovered sentinels: ${JSON.stringify(sentinels)}`);
this.logger.info(`Discovered sentinels: ${JSON.stringify(sentinels)}`);

const redisInstance = new Redis({
sentinels,
Expand All @@ -63,7 +59,7 @@ export class RedisService {

return hosts;
} catch (err) {
this.logger.log('Error during service discovery:', err);
this.logger.info('Error during service discovery:', err);
throw err;
}
}
Expand Down
Loading
Loading