Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import { Injectable, Logger } from '@nestjs/common';
- import { Options, ConsumeMessage, Channel } from 'amqplib';
- import {
- AmqpConnection,
- MessageHandlerErrorBehavior,
- MessageHandlerOptions,
- QueueOptions,
- RequestOptions,
- SubscriptionResult,
- } from '@golevelup/nestjs-rabbitmq';
- import { randomUUID } from 'crypto';
- import { ConfigService } from '@nestjs/config';
- import { IRmqService } from 'src/infrastructure/common/helpers/rmq.helper';
- @Injectable()
- export class RmqService implements IRmqService {
- private exchangeName: string;
- private logger: Logger = new Logger('RmqService');
- constructor( private readonly configService: ConfigService, private readonly amqpConnection: AmqpConnection) {
- this.exchangeName = this.configService.get('RMQ_EXCHANGE_NAME') || 'amq.topic';
- }
- private rmqLogger(queueName: string, message? :any, source?: string, rawMessage?: ConsumeMessage, correlationId?: string): void {
- if(source == 'consumer-rpc') {
- this.logger.log('\n');
- this.logger.log(`Consumer RPC Name: ${queueName}`);
- this.logger.log(`Consumer RPC Corellation ID: ${rawMessage?.properties?.correlationId}`);
- this.logger.log(`Consumer RPC Reply To: ${rawMessage?.properties?.replyTo}`);
- this.logger.log(`Consumer RPC Message: ${JSON.stringify(message)}`);
- this.logger.log('\n');
- }
- if(source == 'publisher-rpc') {
- this.logger.log('\n');
- this.logger.log(`Publisher RPC Name: ${queueName}`);
- this.logger.log(`Publisher RPC Corellation ID: ${correlationId}`);
- this.logger.log(`Publisher RPC Message: ${JSON.stringify(message)}`);
- this.logger.log('\n');
- }
- if(source == 'consumer') {
- this.logger.log('\n');
- this.logger.log(`Consumer Name: ${queueName}`);
- this.logger.log(`Consumer Message: ${JSON.stringify(message)}`);
- this.logger.log('\n');
- }
- if(source == 'publisher') {
- this.logger.log('\n');
- this.logger.log(`Publisher Name: ${queueName}`);
- this.logger.log(`Publisher Message: ${JSON.stringify(message)}`);
- this.logger.log('\n');
- }
- }
- private assertQueueErrorHandler(_channel: Channel, queueName: string, _queueOptions: QueueOptions | undefined, error: any): any {
- this.logger.error(`Queue Name: ${queueName} Error: ${error?.message}`);
- }
- async publish(queueName: string, message: any): Promise<boolean> {
- this.rmqLogger(queueName, message, 'publisher', message);
- const messageId: string = randomUUID();
- const publishOptions: Options.Publish = {
- persistent: true,
- deliveryMode: true,
- priority: 10,
- messageId
- };
- return this.amqpConnection.publish(this.exchangeName, queueName, message, publishOptions);
- }
- consumer(queueName: string, handler?: (message: any, rawMessage?: ConsumeMessage) => void): void {
- const consumerOptions: MessageHandlerOptions = {
- exchange: this.exchangeName,
- routingKey: queueName,
- queue: queueName,
- createQueueIfNotExists: true,
- queueOptions: {
- durable: true,
- consumerOptions: { noAck: false },
- },
- assertQueueErrorHandler: this.assertQueueErrorHandler,
- };
- this.amqpConnection.createSubscriber((message: any, rawMessage?: ConsumeMessage, _headers?: any) => {
- this.rmqLogger(queueName, message, 'consumer', rawMessage);
- if(!rawMessage?.properties?.messageId) {
- this.amqpConnection.channel.nack(message)
- }
- if(handler !== undefined && handler instanceof Function) handler(message, rawMessage)
- return message;
- },
- consumerOptions,
- queueName,
- );
- }
- publishRPC(queueName: string, message: any): Promise<any> {
- const correlationId: string = randomUUID();
- const requestId: string = randomUUID();
- const requestOptions: RequestOptions = {
- exchange: this.exchangeName,
- routingKey: queueName,
- correlationId,
- headers: {
- 'X-Request-ID': requestId,
- },
- publishOptions: {
- persistent: true,
- deliveryMode: true,
- priority: 10,
- },
- payload: message,
- };
- this.rmqLogger(queueName, message, 'publisher-rpc', undefined, requestOptions.correlationId);
- return this.amqpConnection.request(requestOptions);
- }
- consumerRPC(queueName: string, overwriteMessage?: any, handler?: (message: any, rawMessage?: ConsumeMessage) => void): Promise<SubscriptionResult> {
- const rpcOptions: MessageHandlerOptions = {
- exchange: this.exchangeName,
- routingKey: queueName,
- queue: queueName,
- createQueueIfNotExists: true,
- queueOptions: {
- durable: true,
- consumerOptions: { noAck: false },
- },
- errorBehavior: MessageHandlerErrorBehavior.NACK,
- assertQueueErrorHandler: this.assertQueueErrorHandler,
- };
- return this.amqpConnection.createRpc((message: any, rawMessage?: ConsumeMessage, _headers?: any) => {
- this.rmqLogger(queueName, message, 'consumer-rpc', rawMessage);
- if (!rawMessage?.properties?.correlationId || !rawMessage?.properties?.replyTo) {
- this.logger.warn('consumerRPC required correlationId property');
- this.logger.warn('consumerRPC required replyTo property');
- this.amqpConnection.channel.nack(message)
- }
- if (message && overwriteMessage) {
- message = Object.assign(message, overwriteMessage);
- }
- if(handler !== undefined && handler instanceof Function) handler(message, rawMessage)
- return message;
- },
- rpcOptions,
- );
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement