Advertisement
aldikhan13

Golang RabbitMQ

Oct 26th, 2024 (edited)
395
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 14.66 KB | None | 0 0
  1. // Change to map for mapping corellationId as key, in my case in prod i use redis for get delivery temporary store from rpc
  2. // And for this case i use mutex for handle race condition, because map is not safe thread (type data)
  3. // Why this race condition when not use mutex, because this operation is async prosess
  4. // And i use infinite loop for check channel is empty or not, when channel empty use after time and throw timeout, when
  5. // consumer rpc is break, if you not undestand concept channel and select you can confuse, please learn this if you not
  6. // undestand, check my tutorial in my linkend about channel in last post
  7. // alter native you can use sync.Map for safe thread and you can't use mutex again
  8.  
  9. package pkg
  10.  
  11. import (
  12.     "crypto/tls"
  13.     "errors"
  14.     "fmt"
  15.     "math"
  16.     "os"
  17.     "os/signal"
  18.     "syscall"
  19.     "time"
  20.  
  21.     "dario.cat/mergo"
  22.     "github.com/lithammer/shortuuid"
  23.     "github.com/rabbitmq/amqp091-go"
  24.     amqp "github.com/wagslane/go-rabbitmq" // v0.12.4
  25.  
  26.     apkg "github.com/x/x/internal/domain/adapters/pkg"
  27.     "github.com/x/x/internal/domain/constant"
  28.     dto "github.com/x/x/internal/domain/dto/pkg"
  29.     "github.com/x/x/internal/infrastructure/common/helpers"
  30.     "github.com/x/x/internal/infrastructure/configs"
  31. )
  32.  
  33. type rabbit struct {
  34.     connection      *rabbitmq.Conn
  35.     env             *configs.Environtment
  36.     rpcExchange     string
  37.     rpcExchangeType string
  38.     rpcQueue        string
  39.     rpcReplyTo      string
  40.     rpcConsumerID   string
  41.     rpcConsumerRes  []byte
  42. }
  43.  
  44. var (
  45.     args             amqp091.Table          = amqp091.Table{}
  46.     publisherOptions []dto.PublisherOptions = []dto.PublisherOptions{}
  47.     delivery         chan []byte            = make(chan []byte, 1000)
  48.     mutex            *sync.RWMutex          = new(sync.RWMutex)
  49.     safeMap          *sync.Map              = new(sync.Map)
  50.     parser           ahelpers.Parser        = helpers.NewParser()
  51. )
  52.  
  53. func NewRabbitMQ(env *configs.Environtment) (apkg.RabbitMQ, error) {
  54.     connection, err := amqp.NewConn(env.AMQP_BSN,
  55.         amqp.WithConnectionOptionsConfig(amqp.Config{
  56.             Vhost:           env.AMQP_VHOST,
  57.             FrameSize:       208896,
  58.             Heartbeat:       time.Duration(time.Second * 3),
  59.             TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  60.         }),
  61.         amqp.WithConnectionOptionsLogging,
  62.     )
  63.  
  64.     if err != nil {
  65.         connection.Close()
  66.         return nil, err
  67.     }
  68.  
  69.     Logrus("info", "RabbitMQ connection success")
  70.     return &rabbitmq{connection: connection, env: env}, nil
  71. }
  72.  
  73. /**
  74. * ===========================================
  75. * HANDLER METHOD - CONNECTION
  76. * ===========================================
  77. **/
  78. func (h *rabbitmq) Connection() *amqp.Conn {
  79.     return h.connection
  80. }
  81.  
  82. /**
  83. * ===========================================
  84. * HANDLER METHOD - PUBLISHER
  85. * ===========================================
  86. **/
  87.  
  88. func (h *rabbit) Publisher(options *dto.PublisherOptions) error {
  89.     parser := helpers.NewParser()
  90.  
  91.     if options.Exchange == "" {
  92.         options.Exchange = h.env.AMQP_EXCHANGE
  93.     }
  94.  
  95.     if options.ExchangeType == "" {
  96.         options.ExchangeType = constant.Direct
  97.     }
  98.  
  99.     if options.Args["x-error"] != nil {
  100.         args = amqp091.Table{"x-error-id": options.CorrelationID, "x-read": false}
  101.  
  102.         if err := mergo.Map(&args, options.Args); err != nil {
  103.             return err
  104.         }
  105.     } else {
  106.         args = amqp091.Table{"x-custom-corellation-id": options.CorrelationID, "x-read": true}
  107.  
  108.         if err := mergo.Map(&args, options.Args); err != nil {
  109.             return err
  110.         }
  111.     }
  112.  
  113.     options.Args = args
  114.     options.Timestamp = time.Now().Local()
  115.  
  116.     publisher, err := rabbitmq.NewPublisher(h.connection,
  117.         rabbitmq.WithPublisherOptionsExchangeName(options.Exchange),
  118.         rabbitmq.WithPublisherOptionsExchangeKind(options.ExchangeType),
  119.         rabbitmq.WithPublisherOptionsExchangeDeclare,
  120.         rabbitmq.WithPublisherOptionsExchangeDurable,
  121.         rabbitmq.WithPublisherOptionsExchangeNoWait,
  122.         rabbitmq.WithPublisherOptionsLogging,
  123.     )
  124.     if err != nil {
  125.         return err
  126.     }
  127.     defer h.closeConnection(publisher, nil, nil)
  128.  
  129.     bodyByte, err := parser.Marshal(options.Body)
  130.     if err != nil {
  131.         return err
  132.     }
  133.  
  134.     err = publisher.Publish(bodyByte, options.Queue,
  135.         rabbitmq.WithPublishOptionsExchange(options.Exchange),
  136.         rabbitmq.WithPublishOptionsTimestamp(options.Timestamp),
  137.         rabbitmq.WithPublishOptionsCorrelationID(options.CorrelationID),
  138.         rabbitmq.WithPublishOptionsHeaders(rabbitmq.Table(options.Args)),
  139.         rabbitmq.WithPublishOptionsPersistentDelivery,
  140.     )
  141.     if err != nil {
  142.         return err
  143.     }
  144.  
  145.     if options.CorrelationID != "" && options.Args["x-read"] != false {
  146.         if err := h.publisherError(&dto.ConsumerOptions{Queue: "reply.error", ConsumerID: options.CorrelationID}); err != nil {
  147.             return err
  148.         }
  149.     }
  150.  
  151.     return nil
  152. }
  153.  
  154. /**
  155. * ===========================================
  156. * HANDLER METHOD - CONSUMER
  157. * ===========================================
  158. **/
  159.  
  160. func (h *rabbitmq) Consumer(options *dto.ConsumerOptions, handler func(d amqp.Delivery) (action amqp.Action)) {
  161.  
  162.     if options.Exchange == "" {
  163.         options.Exchange = h.env.AMQP_EXCHANGE
  164.     }
  165.  
  166.     if options.ExchangeType == "" {
  167.         options.ExchangeType = constant.Direct
  168.     }
  169.  
  170.     options.ConsumerID = shortuuid.New()
  171.  
  172.     consumer, err := amqp.NewConsumer(h.connection, handler, options.Queue,
  173.         amqp.WithConsumerOptionsExchangeName(options.Exchange),
  174.         amqp.WithConsumerOptionsExchangeKind(options.ExchangeType),
  175.         amqp.WithConsumerOptionsBinding(amqp.Binding{
  176.             RoutingKey: options.Queue,
  177.             BindingOptions: amqp.BindingOptions{
  178.                 Declare: true,
  179.                 NoWait:  false,
  180.                 Args:    amqp.Table(args),
  181.             },
  182.         }),
  183.         amqp.WithConsumerOptionsConsumerName(options.ConsumerID),
  184.         amqp.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
  185.         amqp.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
  186.         amqp.WithConsumerOptionsExchangeDurable,
  187.         amqp.WithConsumerOptionsQueueDurable,
  188.         amqp.WithConsumerOptionsLogging,
  189.     )
  190.  
  191.     if err != nil {
  192.         h.closeConnection(nil, consumer, h.connection)
  193.         Logrus("error", "Consumer Error: %v", err)
  194.         return
  195.     }
  196. }
  197.  
  198. /**
  199. * ===========================================
  200. * HANDLER METHOD - PUBLISHER RPC
  201. * ===========================================
  202. **/
  203.  
  204. func (h *rabbitmq) PublisherRPC(options *dto.PublisherOptions) ([]byte, error) {
  205.     if len(publisherOptions) > 0 {
  206.         publisherOptions = nil
  207.     }
  208.  
  209.     if options.Exchange == "" {
  210.         options.Exchange = h.env.AMQP_EXCHANGE
  211.     }
  212.  
  213.     if options.ExchangeType == "" {
  214.         options.ExchangeType = constant.Direct
  215.     }
  216.  
  217.     options.CorrelationID = shortuuid.New()
  218.     options.ReplyTo = fmt.Sprintf("rpc.%s", options.CorrelationID)
  219.     options.Timestamp = time.Now().Local()
  220.  
  221.     publisherOptions = append(publisherOptions, *options)
  222.     consumer, err := h.listeningConsumer(mutex, options)
  223.     if err != nil {
  224.         return nil, err
  225.     }
  226.  
  227.     publisher, err := amqp.NewPublisher(h.connection,
  228.         amqp.WithPublisherOptionsExchangeName(options.Exchange),
  229.         amqp.WithPublisherOptionsExchangeKind(options.ExchangeType),
  230.         amqp.WithPublisherOptionsExchangeDeclare,
  231.         amqp.WithPublisherOptionsExchangeDurable,
  232.         amqp.WithPublisherOptionsExchangeNoWait,
  233.         amqp.WithPublisherOptionsLogging,
  234.     )
  235.  
  236.     if err != nil {
  237.         return nil, err
  238.     }
  239.     defer h.closeConnection(publisher, consumer, h.connection)
  240.  
  241.     bodyByte, err := parser.Marshal(options.Body)
  242.     if err != nil {
  243.         return nil, err
  244.     }
  245.  
  246.     err = publisher.Publish(bodyByte, options.Queue,
  247.         amqp.WithPublishOptionsExchange(options.Exchange),
  248.         amqp.WithPublishOptionsCorrelationID(options.CorrelationID),
  249.         amqp.WithPublishOptionsReplyTo(options.ReplyTo),
  250.         amqp.WithPublishOptionsTimestamp(options.Timestamp),
  251.         amqp.WithPublishOptionsPersistentDelivery,
  252.     )
  253.     if err != nil {
  254.         return nil, err
  255.     }
  256.  
  257.     for {
  258.         timer := time.NewTimer(time.Duration(time.Second * 5))
  259.  
  260.         select {
  261.         case d := <-delivery:
  262.             mutex.RLock()
  263.             defer mutex.RUnlock()
  264.  
  265.             return d[options.CorrelationID], nil
  266.  
  267.         case <-timer.C:
  268.             delivery <- map[string][]byte{options.CorrelationID: []byte("Publisher RPC timeout")}
  269.  
  270.             mutex.RLock()
  271.             defer mutex.RUnlock()
  272.  
  273.             d := <-delivery
  274.  
  275.             return d[options.CorrelationID], nil
  276.         }
  277.     }
  278. }
  279.  
  280. /**
  281. * ===========================================
  282. * HANDLER METHOD - CONSUMER RPC
  283. * ===========================================
  284. **/
  285.  
  286. func (h *rabbitmq) ConsumerRPC(options *dto.ConsumerOptions, handler func(delivery amqp.Delivery) (action amqp.Action)) {
  287.  
  288.     if options.Exchange == "" {
  289.         options.Exchange = h.env.AMQP_EXCHANGE
  290.     }
  291.  
  292.     if options.ExchangeType == "" {
  293.         options.ExchangeType = constant.Direct
  294.     }
  295.  
  296.     h.rpcConsumerID = shortuuid.New()
  297.     h.rpcReplyTo = options.Queue
  298.     h.rpcExchange = options.Exchange
  299.     h.rpcExchangeType = options.ExchangeType
  300.  
  301.     consumer, err := amqp.NewConsumer(h.connection, handler, options.Queue,
  302.         amqp.WithConsumerOptionsBinding(amqp.Binding{
  303.             RoutingKey: options.Queue,
  304.             BindingOptions: amqp.BindingOptions{
  305.                 Declare: true,
  306.                 NoWait:  false,
  307.                 Args:    amqp.Table(args),
  308.             },
  309.         }),
  310.         amqp.WithConsumerOptionsExchangeName(options.Exchange),
  311.         amqp.WithConsumerOptionsExchangeKind(options.ExchangeType),
  312.         amqp.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
  313.         amqp.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
  314.         amqp.WithConsumerOptionsConsumerName(h.rpcConsumerID),
  315.         amqp.WithConsumerOptionsExchangeDeclare,
  316.         amqp.WithConsumerOptionsExchangeDurable,
  317.         amqp.WithConsumerOptionsQueueDurable,
  318.         amqp.WithConsumerOptionsLogging,
  319.     )
  320.  
  321.     if err != nil {
  322.         h.closeConnection(nil, consumer, h.connection)
  323.         Logrus("error", "ConsumerRPC Error: %v", err)
  324.         return
  325.     }
  326. }
  327.  
  328. /**
  329. * ===========================================
  330. * HANDLER METHOD - PUBLISHER REPLY TO RPC
  331. * ===========================================
  332. **/
  333.  
  334. func (h *rabbitmq) ReplyToDeliveryPublisher(deliveryBodyTo []byte, d amqp.Delivery) {
  335.     mutex.Lock()
  336.     defer mutex.Unlock()
  337.  
  338.     if len(d.ReplyTo) > 0 {
  339.         h.rpcReplyTo = d.ReplyTo
  340.     }
  341.  
  342.     if deliveryBodyTo != nil {
  343.         h.rpcConsumerRes = deliveryBodyTo
  344.     } else {
  345.         h.rpcConsumerRes = d.Body
  346.     }
  347.  
  348.     publisher, err := amqp.NewPublisher(h.connection,
  349.         amqp.WithPublisherOptionsExchangeName(h.rpcExchange),
  350.         amqp.WithPublisherOptionsExchangeKind(h.rpcExchangeType),
  351.         amqp.WithPublisherOptionsExchangeDeclare,
  352.         amqp.WithPublisherOptionsExchangeDurable,
  353.         amqp.WithPublisherOptionsExchangeNoWait,
  354.         amqp.WithPublisherOptionsLogging,
  355.     )
  356.  
  357.     if err != nil {
  358.         Logrus("error", "ReplyToDeliveryPublisher Error: %v", err)
  359.         return
  360.     }
  361.  
  362.     err = publisher.Publish(h.rpcConsumerRes, []string{h.rpcReplyTo},
  363.         amqp.WithPublishOptionsCorrelationID(d.CorrelationId),
  364.         amqp.WithPublishOptionsContentType(d.ContentType),
  365.         amqp.WithPublishOptionsTimestamp(d.Timestamp),
  366.         amqp.WithPublishOptionsPersistentDelivery,
  367.     )
  368.  
  369.     if err != nil {
  370.         Logrus("error", "ReplyToDeliveryPublisher Error: %v", err)
  371.         return
  372.     }
  373. }
  374.  
  375. /**
  376. * ===========================================
  377. * HANDLER METHOD - PUBLISHER ERROR
  378. * ===========================================
  379. **/
  380.  
  381. func (h *rabbit) publisherError(options *dto.ConsumerOptions) error {
  382.     if options.Exchange == "" {
  383.         options.Exchange = h.env.AMQP_EXCHANGE
  384.     }
  385.  
  386.     if options.ExchangeType == "" {
  387.         options.ExchangeType = constant.Direct
  388.     }
  389.  
  390.     consumer, err := rabbitmq.NewConsumer(h.connection, func(d rabbitmq.Delivery) (action rabbitmq.Action) {
  391.         errorID := d.Headers["x-error-id"]
  392.         errorMessage := d.Headers["x-error"]
  393.  
  394.         if errorID != d.CorrelationId && errorMessage == nil {
  395.             delivery <- []byte("")
  396.             return rabbitmq.NackDiscard
  397.         }
  398.  
  399.         safeMap.Store(errorID, errorMessage)
  400.         loadSafeMap, ok := safeMap.LoadAndDelete(errorID)
  401.  
  402.         if ok {
  403.             delivery <- []byte(loadSafeMap.(string))
  404.         } else {
  405.             safeMap.Store(errorID, errorMessage)
  406.         }
  407.  
  408.         return rabbitmq.Ack
  409.     },
  410.         options.Queue,
  411.         rabbitmq.WithConsumerOptionsExchangeName(options.Exchange),
  412.         rabbitmq.WithConsumerOptionsExchangeKind(options.ExchangeType),
  413.         rabbitmq.WithConsumerOptionsConsumerName(options.ConsumerID),
  414.         rabbitmq.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
  415.         rabbitmq.WithConsumerOptionsQOSPrefetch(h.env.AMQP_PREFECTH),
  416.         rabbitmq.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
  417.         rabbitmq.WithConsumerOptionsExchangeDeclare,
  418.         rabbitmq.WithConsumerOptionsExchangeDurable,
  419.         rabbitmq.WithConsumerOptionsQueueDurable,
  420.         rabbitmq.WithConsumerOptionsLogging,
  421.     )
  422.     if err != nil {
  423.         return err
  424.     }
  425.     defer h.closeConnection(nil, consumer, nil)
  426.  
  427.     select {
  428.     case d := <-delivery:
  429.         loadSafeMap, ok := safeMap.LoadAndDelete(options.ConsumerID)
  430.  
  431.         if ok {
  432.             return errors.New(loadSafeMap.(string))
  433.         } else {
  434.             return errors.New(string(d))
  435.         }
  436.     }
  437. }
  438.  
  439. func (h *rabbitmq) listeningConsumer(mutex *sync.RWMutex, options *dto.PublisherOptions) (*amqp.Consumer, error) {
  440.     consumer, err := amqp.NewConsumer(h.connection, func(d amqp.Delivery) (action amqp.Action) {
  441.         for _, option := range publisherOptions {
  442.             if option.CorrelationID != d.CorrelationId {
  443.                 mutex.Lock()
  444.                 defer mutex.Unlock()
  445.                 delivery <- map[string][]byte{d.CorrelationId: d.Body}
  446.  
  447.                 return amqp.NackDiscard
  448.             }
  449.         }
  450.  
  451.         mutex.Lock()
  452.         defer mutex.Unlock()
  453.         delivery <- map[string][]byte{d.CorrelationId: d.Body}
  454.  
  455.         return amqp.Ack
  456.     }, options.ReplyTo,
  457.         amqp.WithConsumerOptionsExchangeName(options.Exchange),
  458.         amqp.WithConsumerOptionsExchangeKind(options.ExchangeType),
  459.         amqp.WithConsumerOptionsConsumerName(options.CorrelationID),
  460.         amqp.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
  461.         amqp.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
  462.         amqp.WithConsumerOptionsExchangeDeclare,
  463.         amqp.WithConsumerOptionsExchangeDurable,
  464.         amqp.WithConsumerOptionsQueueAutoDelete,
  465.         amqp.WithConsumerOptionsLogging,
  466.     )
  467.  
  468.     if err != nil {
  469.         return consumer, err
  470.     }
  471.  
  472.     return consumer, nil
  473. }
  474.  
  475. func (h *rabbitmq) cancelRPC() error {
  476.     timer := time.NewTimer(time.Duration(time.Second * 60))
  477.  
  478.     for {
  479.         select {
  480.         case <-timer.C:
  481.             defer timer.Reset(time.Second * 1)
  482.             return errors.New("cancelled")
  483.  
  484.         default:
  485.             return nil
  486.         }
  487.     }
  488. }
  489.  
  490. func (h *rabbitmq) closeConnection(publisher *amqp.Publisher, consumer *amqp.Consumer, connection *amqp.Conn) {
  491.     defer h.recovery()
  492.  
  493.     if publisher != nil && consumer != nil && connection != nil {
  494.         publisher.Close()
  495.         consumer.Close()
  496.         connection.Close()
  497.     } else if publisher != nil && consumer == nil && connection != nil {
  498.         publisher.Close()
  499.         connection.Close()
  500.     } else if publisher == nil && consumer != nil && connection != nil {
  501.         consumer.Close()
  502.         connection.Close()
  503.     } else {
  504.         closeChan := make(chan os.Signal, 1)
  505.         signal.Notify(closeChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGALRM, syscall.SIGABRT, syscall.SIGUSR1)
  506.  
  507.         go func() {
  508.             for {
  509.                 select {
  510.                 case <-closeChan:
  511.                     publisher.Close()
  512.                     consumer.Close()
  513.                     connection.Close()
  514.                 default:
  515.                     return
  516.                 }
  517.             }
  518.         }()
  519.     }
  520. }
  521.  
  522. func (h *rabbitmq) recovery() {
  523.     if err := recover(); err != nil {
  524.         return
  525.     }
  526. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement