Skip to content

Commit

Permalink
refactor(bus): log the data flow (#92)
Browse files Browse the repository at this point in the history
closes #91
  • Loading branch information
derevnjuk authored May 18, 2022
1 parent 053d9d9 commit 3a2adab
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 29 deletions.
9 changes: 6 additions & 3 deletions packages/bus/src/dispatchers/HttpCommandDispatcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { HttpRequest } from '../commands';
import { HttpCommandDispatcher } from './HttpCommandDispatcher';
import { HttpCommandDispatcherConfig } from './HttpCommandDispatcherConfig';
import { HttpCommandError } from '../exceptions';
import { RetryStrategy } from '@sec-tester/core';
import { Logger, RetryStrategy } from '@sec-tester/core';
import {
anyFunction,
instance,
Expand All @@ -16,16 +16,18 @@ import nock from 'nock';

describe('HttpCommandDispatcher', () => {
const mockedRetryStrategy = mock<RetryStrategy>();
const mockedLogger = mock<Logger>();

beforeAll(() => {
nock.disableNetConnect();
nock.enableNetConnect('127.0.0.1');
});

afterEach(() => {
reset<RetryStrategy | HttpCommandDispatcherConfig>(
reset<RetryStrategy | HttpCommandDispatcherConfig | Logger>(
spiedOptions,
mockedRetryStrategy
mockedRetryStrategy,
mockedLogger
);
nock.cleanAll();
nock.restore();
Expand Down Expand Up @@ -53,6 +55,7 @@ describe('HttpCommandDispatcher', () => {
);

axiosDispatcher = new HttpCommandDispatcher(
instance(mockedLogger),
instance(mockedRetryStrategy),
options
);
Expand Down
24 changes: 22 additions & 2 deletions packages/bus/src/dispatchers/HttpCommandDispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { HttpCommandDispatcherConfig } from './HttpCommandDispatcherConfig';
import { HttpRequest } from '../commands';
import { HttpCommandError } from '../exceptions';
import { CommandDispatcher, RetryStrategy } from '@sec-tester/core';
import { CommandDispatcher, Logger, RetryStrategy } from '@sec-tester/core';
import { inject, injectable } from 'tsyringe';
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios';
import rateLimit, { RateLimitedAxiosInstance } from 'axios-rate-limit';
Expand All @@ -14,6 +14,7 @@ export class HttpCommandDispatcher implements CommandDispatcher {
private readonly client: RateLimitedAxiosInstance;

constructor(
private readonly logger: Logger,
@inject(RetryStrategy)
private readonly retryStrategy: RetryStrategy,
@inject(HttpCommandDispatcherConfig)
Expand All @@ -25,6 +26,11 @@ export class HttpCommandDispatcher implements CommandDispatcher {
public async execute<T, R>(
command: HttpRequest<T, R>
): Promise<R | undefined> {
this.logger.debug(
'Executing an incoming command (%s): %j',
command.correlationId,
command
);
const response = await this.retryStrategy.acquire(() =>
this.performHttpRequest(command)
);
Expand All @@ -34,6 +40,12 @@ export class HttpCommandDispatcher implements CommandDispatcher {
response.data.on('readable', response.data.read.bind(response.data));
await promisify(finished)(response.data);
} else {
this.logger.debug(
'Received a response to the command (%s): %j',
command.correlationId,
response.data
);

return response.data;
}
}
Expand Down Expand Up @@ -63,7 +75,15 @@ export class HttpCommandDispatcher implements CommandDispatcher {
...(!expectReply ? { responseType: 'stream' } : {})
});
} catch (e) {
throw new HttpCommandError(e);
const httpError = new HttpCommandError(e);

this.logger.debug(
'Command (%s) has been failed:',
correlationId,
httpError
);

throw httpError;
}
}

Expand Down
7 changes: 7 additions & 0 deletions packages/bus/src/dispatchers/RMQEventBus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Command,
Event,
EventHandler,
Logger,
NoResponse,
RetryStrategy
} from '@sec-tester/core';
Expand Down Expand Up @@ -79,6 +80,7 @@ describe('RMQEventBus', () => {
const mockedConnectionManager = mock<AmqpConnectionManager>();
const mockedChannelWrapper = mock<ChannelWrapper>();
const mockedChannel = mock<Channel>();
const mockedLogger = mock<Logger>();
const mockedDependencyContainer = mock<DependencyContainer>();
const mockedRetryStrategy = mock<RetryStrategy>();
const options: RMQEventBusConfig = {
Expand Down Expand Up @@ -111,6 +113,9 @@ describe('RMQEventBus', () => {
when(mockedRetryStrategy.acquire(anyFunction())).thenCall(
(callback: (...args: unknown[]) => unknown) => callback()
);
when(mockedDependencyContainer.resolve(Logger)).thenReturn(
instance(mockedLogger)
);
rmq = new RMQEventBus(
instance(mockedDependencyContainer),
instance(mockedRetryStrategy),
Expand All @@ -125,6 +130,7 @@ describe('RMQEventBus', () => {
| AmqpConnectionManager
| Channel
| RMQEventBusConfig
| Logger
| DependencyContainer
| RetryStrategy
>(
Expand All @@ -133,6 +139,7 @@ describe('RMQEventBus', () => {
mockedChannel,
spiedOptions,
mockedDependencyContainer,
mockedLogger,
mockedRetryStrategy
);
jest.resetModules();
Expand Down
88 changes: 72 additions & 16 deletions packages/bus/src/dispatchers/RMQEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
EventHandlerConstructor,
EventHandlerNotFound,
IllegalOperation,
Logger,
NoResponse,
NoSubscriptionsFound,
RetryStrategy
Expand Down Expand Up @@ -58,13 +59,15 @@ export class RMQEventBus implements EventBus {
private readonly consumerTags: string[] = [];

private readonly REPLY_QUEUE_NAME = 'amq.rabbitmq.reply-to';
private readonly logger: Logger;

constructor(
private readonly container: DependencyContainer,
@inject(RetryStrategy)
private readonly retryStrategy: RetryStrategy,
@inject(RMQEventBusConfig) private readonly options: RMQEventBusConfig
) {
this.logger = container.resolve(Logger);
this.subject.setMaxListeners(Infinity);
}

Expand All @@ -83,6 +86,8 @@ export class RMQEventBus implements EventBus {
});

await this.createConsumerChannel();

this.logger.debug('Event bus connected to %s', this.options.url);
}
}

Expand Down Expand Up @@ -148,23 +153,32 @@ export class RMQEventBus implements EventBus {
}

public async destroy(): Promise<void> {
if (this.channel) {
await Promise.all(
this.consumerTags.map(consumerTag => this.channel?.cancel(consumerTag))
);
await this.channel.close();
}
try {
if (this.channel) {
await Promise.all(
this.consumerTags.map(consumerTag =>
this.channel?.cancel(consumerTag)
)
);
await this.channel.close();
}

if (this.client) {
await (this.client as unknown as EventEmitter).removeAllListeners();
await this.client.close();
}
if (this.client) {
await (this.client as unknown as EventEmitter).removeAllListeners();
await this.client.close();
}

delete this.channel;
delete this.client;
delete this.channel;
delete this.client;

this.consumerTags.splice(0, this.consumerTags.length);
this.subject.removeAllListeners();
this.consumerTags.splice(0, this.consumerTags.length);
this.subject.removeAllListeners();
this.logger.debug('Event bus disconnected from %s', this.options.url);
} catch (e) {
this.logger.error('Cannot terminate event bus gracefully');
this.logger.debug('Event bus terminated');
this.logger.debug('Error on disconnect: %s', e.message);
}
}

private async subscribe<T, R>(
Expand All @@ -182,6 +196,12 @@ export class RMQEventBus implements EventBus {
}

private async bindQueue(eventName: string): Promise<void> {
this.logger.debug(
'Bind the queue (%s) to the exchange (%s) by the routing key (%s).',
this.options.clientQueue,
this.options.exchange,
eventName
);
await this.getChannel().addSetup((channel: Channel) =>
channel.bindQueue(
this.options.clientQueue,
Expand Down Expand Up @@ -212,6 +232,12 @@ export class RMQEventBus implements EventBus {
}

private async unbindQueue(eventName: string) {
this.logger.debug(
'Unbind the queue (%s) to the exchange (%s) by the routing key (%s).',
this.options.clientQueue,
this.options.exchange,
eventName
);
await this.getChannel().removeSetup((channel: Channel) =>
channel.unbindQueue(
this.options.clientQueue,
Expand Down Expand Up @@ -320,7 +346,18 @@ export class RMQEventBus implements EventBus {
this.parseConsumeMessage(message);

if (event?.correlationId) {
this.logger.debug(
'Received a reply (%s) with following payload: %j',
event.correlationId,
event.payload
);

this.subject.emit(event.correlationId, event.payload);
} else {
this.logger.debug(
'Error while processing a reply. The correlation ID not found. Reply: %j',
event
);
}
}

Expand All @@ -329,6 +366,12 @@ export class RMQEventBus implements EventBus {
this.parseConsumeMessage(message);

if (event) {
this.logger.debug(
'Received a event (%s) with following payload: %j',
event.name,
event.payload
);

const handlers = this.handlers.get(event.name);

if (!handlers) {
Expand All @@ -349,14 +392,25 @@ export class RMQEventBus implements EventBus {
const response = await handler.handle(event.payload);

if (response && event.replyTo) {
this.logger.debug(
'Sending a reply (%s) back with following payload: %j',
event.name,
event.payload
);

await this.tryToSendMessage({
payload: response,
routingKey: event.replyTo,
correlationId: event.correlationId
});
}
} catch {
// noop
} catch (e) {
this.logger.debug(
'Error while processing a message (%s) due to error occurred: %s. Event: %j',
event.correlationId,
e.message,
event
);
}
}

Expand All @@ -375,6 +429,8 @@ export class RMQEventBus implements EventBus {
timestamp = new Date()
} = options;

this.logger.debug('Send a message with following parameters: %j', options);

await this.getChannel().publish(
exchange ?? '',
routingKey,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The default configuration is as follows:

```js
{
logLevel: LogLevel.NOTICE,
logLevel: LogLevel.ERROR,
credentialProviders: [new EnvCredentialProvider()];
}
```
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/configuration/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class Configuration {
constructor({
cluster,
credentials,
logLevel = LogLevel.NOTICE,
logLevel = LogLevel.ERROR,
credentialProviders = [new EnvCredentialProvider()]
}: ConfigurationOptions) {
if (!credentials && !credentialProviders?.length) {
Expand Down
10 changes: 4 additions & 6 deletions packages/repeater/src/lib/Repeater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class Repeater {

private readonly bus: EventBus;
private readonly configuration: Configuration;
private readonly logger: Logger | undefined;
private readonly logger: Logger;

private timer?: Timer;

Expand All @@ -46,9 +46,7 @@ export class Repeater {
this.configuration = configuration;

const { container } = this.configuration;
if (container.isRegistered(Logger, true)) {
this.logger = container.resolve(Logger);
}
this.logger = container.resolve(Logger);

this.setupShutdown();
}
Expand Down Expand Up @@ -132,7 +130,7 @@ export class Repeater {
try {
await this.stop();
} catch (e) {
this.logger?.error(e.message);
this.logger.error(e.message);
}
});
});
Expand All @@ -145,7 +143,7 @@ export class Repeater {
this.handleRegisterError(payload.error);
} else {
if (gt(payload.version, this.configuration.repeaterVersion)) {
this.logger?.warn(
this.logger.warn(
'%s: A new Repeater version (%s) is available, please update @sec-tester.',
chalk.yellow('(!) IMPORTANT'),
payload.version
Expand Down

0 comments on commit 3a2adab

Please sign in to comment.