aldikhan13

Rascal Custom Method For RabbitMQ

Apr 14th, 2022 (edited)
405
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /**
  2. * CUSTOM LIBS SETUP RASCAL FOR RABBITMQ BY RESTU WAHYU SAPUTRA
  3. **/
  4.  
  5. import 'dotenv/config'
  6. import { CustomError } from '@helpers/helper.error'
  7. import Broker, { PublicationSession, SubscriberSessionAsPromised } from 'rascal'
  8.  
  9. interface SetterConfig {
  10.     key: string
  11.     prefix: string
  12. }
  13.  
  14. export class RabbitMQ {
  15.     private brokerConfig: Broker.BrokerConfig
  16.     private key: string
  17.     private prefix: string
  18.  
  19.     constructor(key: string, prefix: string) {
  20.         this.key = key
  21.         this.prefix = prefix
  22.         this.setConfig({ key, prefix })
  23.     }
  24.  
  25.     private setConfig(config: SetterConfig) {
  26.         const pubConfig: Broker.PublicationConfig = {
  27.             vhost: process.env.RABBITMQ_VHOST,
  28.             exchange: `rabbitmq_ex:${config.prefix}`,
  29.             routingKey: 'a.b.c',
  30.             options: {
  31.                 persistent: false,
  32.                 mandatory: true,
  33.                 priority: 10,
  34.                 expiration: new Date().getMilliseconds() + 1000 * 5
  35.             },
  36.             autoCreated: true
  37.         }
  38.  
  39.         const subConfig: Broker.PublicationConfig = {
  40.             vhost: process.env.RABBITMQ_VHOST,
  41.             queue: `rabbitmq_eq:${config.prefix}`,
  42.             options: {
  43.                 persistent: false,
  44.                 mandatory: true,
  45.                 priority: 10,
  46.                 expiration: new Date().getMilliseconds() + 1000 * 5
  47.             },
  48.             autoCreated: true
  49.         }
  50.  
  51.         const newPubConfig: Record<string, any> = {}
  52.         Object.assign(newPubConfig, { [`rabbitmq_pub:${config.key}:${config.prefix}`]: pubConfig })
  53.  
  54.         const newSubConfig: Record<string, any> = {}
  55.         Object.assign(newSubConfig, { [`rabbitmq_sub:${config.key}:${config.prefix}`]: subConfig })
  56.  
  57.         this.brokerConfig = {
  58.             vhosts: {
  59.                 [`${process.env.RABBITMQ_VHOST}`]: {
  60.                     connectionStrategy: 'fixed',
  61.                     connection: {
  62.                         vhost: process.env.RABBITMQ_VHOST,
  63.                         hostname: process.env.RABBITMQ_HOST,
  64.                         user: process.env.RABBITMQ_USERNAME,
  65.                         password: process.env.RABBITMQ_PASSWORD,
  66.                         port: process.env.RABBITMQ_PORT,
  67.                         protocol: process.env.RABBITMQ_PROTOCOL,
  68.                         options: {
  69.                             heartbeat: 5, // recommended heartbeat count,
  70.                             timeout: new Date().getMilliseconds() + 1000 * 30 // connection timeout
  71.                         }
  72.                     },
  73.                     publications: newPubConfig,
  74.                     subscriptions: newSubConfig,
  75.                     exchanges: {
  76.                         [`rabbitmq_ex:${config.prefix}`]: {
  77.                             options: {
  78.                                 durable: false, // don`t keep messages not delivered after broker restart,
  79.                                 autoDelete: true // auto delete after messages consume
  80.                             },
  81.                             type: 'topic'
  82.                         }
  83.                     },
  84.                     queues: {
  85.                         [`rabbitmq_eq:${config.prefix}`]: {
  86.                             options: {
  87.                                 durable: false, // don`t keep messages not delivered after broker restart,
  88.                                 maxPriority: 10, // max priority queue recommended max 255,
  89.                                 maxLength: 1000 // keep 1000 message in queue and delete other queue if queue not active
  90.                             }
  91.                         }
  92.                     },
  93.                     bindings: {
  94.                         [`rabbitmq_ex:${config.prefix}[a.b.c] -> rabbitmq_eq:${config.prefix}`]: {
  95.                             source: `rabbitmq_ex:${config.prefix}`,
  96.                             destination: `rabbitmq_eq:${config.prefix}`,
  97.                             destinationType: 'queue'
  98.                         }
  99.                     },
  100.                     publicationChannelPools: {
  101.                         regularPool: {
  102.                             max: 10,
  103.                             min: 5,
  104.                             evictionRunIntervalMillis: new Date().getMilliseconds() + 1000 * 30,
  105.                             idleTimeoutMillis: new Date().getMilliseconds() + 1000 * 60,
  106.                             autostart: true
  107.                         },
  108.                         confirmPool: {
  109.                             max: 10,
  110.                             min: 5,
  111.                             evictionRunIntervalMillis: new Date().getMilliseconds() + 1000 * 30,
  112.                             idleTimeoutMillis: new Date().getMilliseconds() + 1000 * 60,
  113.                             autostart: true
  114.                         }
  115.                     }
  116.                 }
  117.             },
  118.             recovery: {
  119.                 [`rabbitmq_pub:${config.key}:${config.prefix}`]: [
  120.                     { strategy: 'republish', requeue: true, defer: 1000, attempts: 10 },
  121.                     { strategy: 'ack' }
  122.                 ],
  123.                 [`rabbitmq_sub:${config.key}:${config.prefix}`]: { strategy: 'ack' }
  124.             }
  125.         }
  126.     }
  127.  
  128.     private getConfig(): Broker.BrokerConfig {
  129.         return this.brokerConfig
  130.     }
  131.  
  132.     private async connection(): Promise<any> {
  133.         try {
  134.             const broker: Broker.BrokerAsPromised = await Broker.BrokerAsPromised.create(this.getConfig())
  135.             broker.on('error', console.error)
  136.             return broker
  137.         } catch (err: any) {
  138.             return new CustomError(err.message)
  139.         }
  140.     }
  141.  
  142.     async publisher(data: Record<string, any> | Record<string, any>[]): Promise<any> {
  143.         try {
  144.             const connection: Broker.BrokerAsPromised = await this.connection()
  145.             const publisher: PublicationSession = await connection.publish(`rabbitmq_pub:${this.key}:${this.prefix}`, data)
  146.  
  147.             console.info('RabbitMQ publisher is called')
  148.  
  149.             publisher.on('success', (jobId: string) => console.log(`job ${jobId} is success`))
  150.             publisher.on('error', (_err: Error, jobId: string) => {
  151.                 console.log(`job ${jobId} is error`)
  152.                 publisher.abort()
  153.             })
  154.  
  155.             return true
  156.         } catch (err: any) {
  157.             return new CustomError(err.message)
  158.         }
  159.     }
  160.  
  161.     async subscriber(cb: (content: any, error?: Error) => any): Promise<void> {
  162.         try {
  163.             const connection: Broker.BrokerAsPromised = await this.connection()
  164.             const subscriber: SubscriberSessionAsPromised = await connection.subscribe(`rabbitmq_sub:${this.key}:${this.prefix}`)
  165.  
  166.             console.info('RabbitMQ subscriber is called')
  167.  
  168.             subscriber
  169.                 .on('message', (_message: any, content: any, ackOrNack: Broker.AckOrNack): void => {
  170.                     cb(content)
  171.                     ackOrNack()
  172.                 })
  173.                 .on('error', console.error)
  174.         } catch (err: any) {
  175.             cb(null, err)
  176.         }
  177.     }
  178. }
  179.  
  180. // publisher demo here
  181. import { RabbitMQ } from '@libs/lib.rabbitmq'
  182. ;(async () => {
  183.     try {
  184.         const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
  185.         let i: number = 0
  186.         setInterval(async () => {
  187.             const res = await broker.publisher({ message: `${i++} hello wordl from publisher:${new Date().getTime()}` })
  188.             console.log(res)
  189.         }, 2000)
  190.     } catch (err) {
  191.         console.error(err)
  192.     }
  193. })()
  194.  
  195. // subscriber demo here
  196. import { RabbitMQ } from '@libs/lib.rabbitmq'
  197. ;(async () => {
  198.     try {
  199.         const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
  200.         broker.subscriber((content: string, error: Error) => {
  201.             if (!error) console.log(content)
  202.         })
  203.     } catch (err) {
  204.         console.error(err)
  205.     }
  206. })()
Add Comment
Please, Sign In to add comment