Advertisement
aldikhan13

NestJs RabbitMQ

Oct 22nd, 2024
411
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
TypeScript 5.74 KB | Source Code | 0 0
  1. import { Injectable, Logger } from '@nestjs/common';
  2. import { Options, ConsumeMessage, Channel } from 'amqplib';
  3. import {
  4.   AmqpConnection,
  5.   MessageHandlerErrorBehavior,
  6.   MessageHandlerOptions,
  7.   QueueOptions,
  8.   RequestOptions,
  9.   SubscriptionResult,
  10. } from '@golevelup/nestjs-rabbitmq';
  11. import { randomUUID } from 'crypto';
  12. import { ConfigService } from '@nestjs/config';
  13. import { IRmqService } from 'src/infrastructure/common/helpers/rmq.helper';
  14.  
  15. @Injectable()
  16. export class RmqService implements IRmqService {
  17.   private exchangeName: string;
  18.   private logger: Logger = new Logger('RmqService');
  19.  
  20.   constructor( private readonly configService: ConfigService, private readonly amqpConnection: AmqpConnection) {
  21.     this.exchangeName = this.configService.get('RMQ_EXCHANGE_NAME') || 'amq.topic';
  22.   }
  23.  
  24.   private rmqLogger(queueName: string, message? :any, source?: string, rawMessage?: ConsumeMessage, correlationId?: string): void {
  25.       if(source == 'consumer-rpc')  {
  26.         this.logger.log('\n');
  27.         this.logger.log(`Consumer RPC Name: ${queueName}`);
  28.         this.logger.log(`Consumer RPC Corellation ID: ${rawMessage?.properties?.correlationId}`);
  29.         this.logger.log(`Consumer RPC Reply To: ${rawMessage?.properties?.replyTo}`);
  30.         this.logger.log(`Consumer RPC Message: ${JSON.stringify(message)}`);
  31.         this.logger.log('\n');
  32.       }
  33.  
  34.       if(source == 'publisher-rpc')  {
  35.           this.logger.log('\n');
  36.           this.logger.log(`Publisher RPC Name: ${queueName}`);
  37.           this.logger.log(`Publisher RPC Corellation ID: ${correlationId}`);
  38.           this.logger.log(`Publisher RPC Message: ${JSON.stringify(message)}`);
  39.           this.logger.log('\n');
  40.       }
  41.  
  42.       if(source == 'consumer')  {
  43.           this.logger.log('\n');
  44.           this.logger.log(`Consumer Name: ${queueName}`);
  45.           this.logger.log(`Consumer Message: ${JSON.stringify(message)}`);
  46.           this.logger.log('\n');
  47.       }
  48.  
  49.  
  50.       if(source == 'publisher')  {
  51.           this.logger.log('\n');
  52.           this.logger.log(`Publisher Name: ${queueName}`);
  53.           this.logger.log(`Publisher Message: ${JSON.stringify(message)}`);
  54.           this.logger.log('\n');
  55.     }
  56.   }
  57.  
  58.   private assertQueueErrorHandler(_channel: Channel, queueName: string,  _queueOptions: QueueOptions | undefined, error: any): any {
  59.     this.logger.error(`Queue Name: ${queueName} Error: ${error?.message}`);
  60.   }
  61.  
  62.   async publish(queueName: string, message: any): Promise<boolean> {
  63.     this.rmqLogger(queueName, message, 'publisher', message);
  64.     const messageId: string = randomUUID();
  65.  
  66.     const publishOptions: Options.Publish = {
  67.       persistent: true,
  68.       deliveryMode: true,
  69.       priority: 10,
  70.       messageId
  71.     };
  72.  
  73.     return this.amqpConnection.publish(this.exchangeName, queueName, message, publishOptions);
  74.   }
  75.  
  76.   consumer(queueName: string, handler?: (message: any, rawMessage?: ConsumeMessage) => void): void {
  77.       const consumerOptions: MessageHandlerOptions = {
  78.         exchange: this.exchangeName,
  79.         routingKey: queueName,
  80.         queue: queueName,
  81.         createQueueIfNotExists: true,
  82.         queueOptions: {
  83.           durable: true,
  84.           consumerOptions: { noAck: false },
  85.         },
  86.         assertQueueErrorHandler: this.assertQueueErrorHandler,
  87.       };
  88.  
  89.       this.amqpConnection.createSubscriber((message: any, rawMessage?: ConsumeMessage, _headers?: any) => {
  90.         this.rmqLogger(queueName, message, 'consumer', rawMessage);
  91.  
  92.         if(!rawMessage?.properties?.messageId) {
  93.           this.amqpConnection.channel.nack(message)
  94.         }
  95.  
  96.         if(handler !== undefined && handler instanceof Function)  handler(message, rawMessage)
  97.         return message;
  98.       },
  99.       consumerOptions,
  100.       queueName,
  101.     );
  102.   }
  103.  
  104.   publishRPC(queueName: string, message: any): Promise<any> {
  105.     const correlationId: string = randomUUID();
  106.     const requestId: string = randomUUID();
  107.  
  108.     const requestOptions: RequestOptions = {
  109.       exchange: this.exchangeName,
  110.       routingKey: queueName,
  111.       correlationId,
  112.       headers: {
  113.         'X-Request-ID': requestId,
  114.       },
  115.       publishOptions: {
  116.         persistent: true,
  117.         deliveryMode: true,
  118.         priority: 10,
  119.       },
  120.       payload: message,
  121.     };
  122.  
  123.     this.rmqLogger(queueName, message, 'publisher-rpc', undefined, requestOptions.correlationId);
  124.  
  125.     return this.amqpConnection.request(requestOptions);
  126.   }
  127.  
  128.   consumerRPC(queueName: string, overwriteMessage?: any, handler?: (message: any, rawMessage?: ConsumeMessage) => void): Promise<SubscriptionResult> {
  129.       const rpcOptions: MessageHandlerOptions = {
  130.         exchange: this.exchangeName,
  131.         routingKey: queueName,
  132.         queue: queueName,
  133.         createQueueIfNotExists: true,
  134.         queueOptions: {
  135.           durable: true,
  136.           consumerOptions: { noAck: false },
  137.         },
  138.         errorBehavior: MessageHandlerErrorBehavior.NACK,
  139.         assertQueueErrorHandler: this.assertQueueErrorHandler,
  140.       };
  141.  
  142.       return this.amqpConnection.createRpc((message: any, rawMessage?: ConsumeMessage, _headers?: any) => {
  143.         this.rmqLogger(queueName, message, 'consumer-rpc', rawMessage);
  144.  
  145.         if (!rawMessage?.properties?.correlationId || !rawMessage?.properties?.replyTo) {
  146.           this.logger.warn('consumerRPC required correlationId property');
  147.           this.logger.warn('consumerRPC required replyTo property');
  148.           this.amqpConnection.channel.nack(message)
  149.         }
  150.  
  151.         if (message && overwriteMessage) {
  152.           message = Object.assign(message, overwriteMessage);
  153.         }
  154.  
  155.         if(handler !== undefined && handler instanceof Function)  handler(message, rawMessage)
  156.         return message;
  157.       },
  158.  
  159.       rpcOptions,
  160.     );
  161.   }
  162. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement