Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Change to map for mapping corellationId as key, in my case in prod i use redis for get delivery temporary store from rpc
- // And for this case i use mutex for handle race condition, because map is not safe thread (type data)
- // Why this race condition when not use mutex, because this operation is async prosess
- // And i use infinite loop for check channel is empty or not, when channel empty use after time and throw timeout, when
- // consumer rpc is break, if you not undestand concept channel and select you can confuse, please learn this if you not
- // undestand, check my tutorial in my linkend about channel in last post
- // alter native you can use sync.Map for safe thread and you can't use mutex again
- package pkg
- import (
- "crypto/tls"
- "errors"
- "fmt"
- "math"
- "os"
- "os/signal"
- "syscall"
- "time"
- "dario.cat/mergo"
- "github.com/lithammer/shortuuid"
- "github.com/rabbitmq/amqp091-go"
- amqp "github.com/wagslane/go-rabbitmq" // v0.12.4
- apkg "github.com/x/x/internal/domain/adapters/pkg"
- "github.com/x/x/internal/domain/constant"
- dto "github.com/x/x/internal/domain/dto/pkg"
- "github.com/x/x/internal/infrastructure/common/helpers"
- "github.com/x/x/internal/infrastructure/configs"
- )
- type rabbit struct {
- connection *rabbitmq.Conn
- env *configs.Environtment
- rpcExchange string
- rpcExchangeType string
- rpcQueue string
- rpcReplyTo string
- rpcConsumerID string
- rpcConsumerRes []byte
- }
- var (
- args amqp091.Table = amqp091.Table{}
- publisherOptions []dto.PublisherOptions = []dto.PublisherOptions{}
- delivery chan []byte = make(chan []byte, 1000)
- mutex *sync.RWMutex = new(sync.RWMutex)
- safeMap *sync.Map = new(sync.Map)
- parser ahelpers.Parser = helpers.NewParser()
- )
- func NewRabbitMQ(env *configs.Environtment) (apkg.RabbitMQ, error) {
- connection, err := amqp.NewConn(env.AMQP_BSN,
- amqp.WithConnectionOptionsConfig(amqp.Config{
- Vhost: env.AMQP_VHOST,
- FrameSize: 208896,
- Heartbeat: time.Duration(time.Second * 3),
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
- }),
- amqp.WithConnectionOptionsLogging,
- )
- if err != nil {
- connection.Close()
- return nil, err
- }
- Logrus("info", "RabbitMQ connection success")
- return &rabbitmq{connection: connection, env: env}, nil
- }
- /**
- * ===========================================
- * HANDLER METHOD - CONNECTION
- * ===========================================
- **/
- func (h *rabbitmq) Connection() *amqp.Conn {
- return h.connection
- }
- /**
- * ===========================================
- * HANDLER METHOD - PUBLISHER
- * ===========================================
- **/
- func (h *rabbit) Publisher(options *dto.PublisherOptions) error {
- parser := helpers.NewParser()
- if options.Exchange == "" {
- options.Exchange = h.env.AMQP_EXCHANGE
- }
- if options.ExchangeType == "" {
- options.ExchangeType = constant.Direct
- }
- if options.Args["x-error"] != nil {
- args = amqp091.Table{"x-error-id": options.CorrelationID, "x-read": false}
- if err := mergo.Map(&args, options.Args); err != nil {
- return err
- }
- } else {
- args = amqp091.Table{"x-custom-corellation-id": options.CorrelationID, "x-read": true}
- if err := mergo.Map(&args, options.Args); err != nil {
- return err
- }
- }
- options.Args = args
- options.Timestamp = time.Now().Local()
- publisher, err := rabbitmq.NewPublisher(h.connection,
- rabbitmq.WithPublisherOptionsExchangeName(options.Exchange),
- rabbitmq.WithPublisherOptionsExchangeKind(options.ExchangeType),
- rabbitmq.WithPublisherOptionsExchangeDeclare,
- rabbitmq.WithPublisherOptionsExchangeDurable,
- rabbitmq.WithPublisherOptionsExchangeNoWait,
- rabbitmq.WithPublisherOptionsLogging,
- )
- if err != nil {
- return err
- }
- defer h.closeConnection(publisher, nil, nil)
- bodyByte, err := parser.Marshal(options.Body)
- if err != nil {
- return err
- }
- err = publisher.Publish(bodyByte, options.Queue,
- rabbitmq.WithPublishOptionsExchange(options.Exchange),
- rabbitmq.WithPublishOptionsTimestamp(options.Timestamp),
- rabbitmq.WithPublishOptionsCorrelationID(options.CorrelationID),
- rabbitmq.WithPublishOptionsHeaders(rabbitmq.Table(options.Args)),
- rabbitmq.WithPublishOptionsPersistentDelivery,
- )
- if err != nil {
- return err
- }
- if options.CorrelationID != "" && options.Args["x-read"] != false {
- if err := h.publisherError(&dto.ConsumerOptions{Queue: "reply.error", ConsumerID: options.CorrelationID}); err != nil {
- return err
- }
- }
- return nil
- }
- /**
- * ===========================================
- * HANDLER METHOD - CONSUMER
- * ===========================================
- **/
- func (h *rabbitmq) Consumer(options *dto.ConsumerOptions, handler func(d amqp.Delivery) (action amqp.Action)) {
- if options.Exchange == "" {
- options.Exchange = h.env.AMQP_EXCHANGE
- }
- if options.ExchangeType == "" {
- options.ExchangeType = constant.Direct
- }
- options.ConsumerID = shortuuid.New()
- consumer, err := amqp.NewConsumer(h.connection, handler, options.Queue,
- amqp.WithConsumerOptionsExchangeName(options.Exchange),
- amqp.WithConsumerOptionsExchangeKind(options.ExchangeType),
- amqp.WithConsumerOptionsBinding(amqp.Binding{
- RoutingKey: options.Queue,
- BindingOptions: amqp.BindingOptions{
- Declare: true,
- NoWait: false,
- Args: amqp.Table(args),
- },
- }),
- amqp.WithConsumerOptionsConsumerName(options.ConsumerID),
- amqp.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
- amqp.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
- amqp.WithConsumerOptionsExchangeDurable,
- amqp.WithConsumerOptionsQueueDurable,
- amqp.WithConsumerOptionsLogging,
- )
- if err != nil {
- h.closeConnection(nil, consumer, h.connection)
- Logrus("error", "Consumer Error: %v", err)
- return
- }
- }
- /**
- * ===========================================
- * HANDLER METHOD - PUBLISHER RPC
- * ===========================================
- **/
- func (h *rabbitmq) PublisherRPC(options *dto.PublisherOptions) ([]byte, error) {
- if len(publisherOptions) > 0 {
- publisherOptions = nil
- }
- if options.Exchange == "" {
- options.Exchange = h.env.AMQP_EXCHANGE
- }
- if options.ExchangeType == "" {
- options.ExchangeType = constant.Direct
- }
- options.CorrelationID = shortuuid.New()
- options.ReplyTo = fmt.Sprintf("rpc.%s", options.CorrelationID)
- options.Timestamp = time.Now().Local()
- publisherOptions = append(publisherOptions, *options)
- consumer, err := h.listeningConsumer(mutex, options)
- if err != nil {
- return nil, err
- }
- publisher, err := amqp.NewPublisher(h.connection,
- amqp.WithPublisherOptionsExchangeName(options.Exchange),
- amqp.WithPublisherOptionsExchangeKind(options.ExchangeType),
- amqp.WithPublisherOptionsExchangeDeclare,
- amqp.WithPublisherOptionsExchangeDurable,
- amqp.WithPublisherOptionsExchangeNoWait,
- amqp.WithPublisherOptionsLogging,
- )
- if err != nil {
- return nil, err
- }
- defer h.closeConnection(publisher, consumer, h.connection)
- bodyByte, err := parser.Marshal(options.Body)
- if err != nil {
- return nil, err
- }
- err = publisher.Publish(bodyByte, options.Queue,
- amqp.WithPublishOptionsExchange(options.Exchange),
- amqp.WithPublishOptionsCorrelationID(options.CorrelationID),
- amqp.WithPublishOptionsReplyTo(options.ReplyTo),
- amqp.WithPublishOptionsTimestamp(options.Timestamp),
- amqp.WithPublishOptionsPersistentDelivery,
- )
- if err != nil {
- return nil, err
- }
- for {
- timer := time.NewTimer(time.Duration(time.Second * 5))
- select {
- case d := <-delivery:
- mutex.RLock()
- defer mutex.RUnlock()
- return d[options.CorrelationID], nil
- case <-timer.C:
- delivery <- map[string][]byte{options.CorrelationID: []byte("Publisher RPC timeout")}
- mutex.RLock()
- defer mutex.RUnlock()
- d := <-delivery
- return d[options.CorrelationID], nil
- }
- }
- }
- /**
- * ===========================================
- * HANDLER METHOD - CONSUMER RPC
- * ===========================================
- **/
- func (h *rabbitmq) ConsumerRPC(options *dto.ConsumerOptions, handler func(delivery amqp.Delivery) (action amqp.Action)) {
- if options.Exchange == "" {
- options.Exchange = h.env.AMQP_EXCHANGE
- }
- if options.ExchangeType == "" {
- options.ExchangeType = constant.Direct
- }
- h.rpcConsumerID = shortuuid.New()
- h.rpcReplyTo = options.Queue
- h.rpcExchange = options.Exchange
- h.rpcExchangeType = options.ExchangeType
- consumer, err := amqp.NewConsumer(h.connection, handler, options.Queue,
- amqp.WithConsumerOptionsBinding(amqp.Binding{
- RoutingKey: options.Queue,
- BindingOptions: amqp.BindingOptions{
- Declare: true,
- NoWait: false,
- Args: amqp.Table(args),
- },
- }),
- amqp.WithConsumerOptionsExchangeName(options.Exchange),
- amqp.WithConsumerOptionsExchangeKind(options.ExchangeType),
- amqp.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
- amqp.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
- amqp.WithConsumerOptionsConsumerName(h.rpcConsumerID),
- amqp.WithConsumerOptionsExchangeDeclare,
- amqp.WithConsumerOptionsExchangeDurable,
- amqp.WithConsumerOptionsQueueDurable,
- amqp.WithConsumerOptionsLogging,
- )
- if err != nil {
- h.closeConnection(nil, consumer, h.connection)
- Logrus("error", "ConsumerRPC Error: %v", err)
- return
- }
- }
- /**
- * ===========================================
- * HANDLER METHOD - PUBLISHER REPLY TO RPC
- * ===========================================
- **/
- func (h *rabbitmq) ReplyToDeliveryPublisher(deliveryBodyTo []byte, d amqp.Delivery) {
- mutex.Lock()
- defer mutex.Unlock()
- if len(d.ReplyTo) > 0 {
- h.rpcReplyTo = d.ReplyTo
- }
- if deliveryBodyTo != nil {
- h.rpcConsumerRes = deliveryBodyTo
- } else {
- h.rpcConsumerRes = d.Body
- }
- publisher, err := amqp.NewPublisher(h.connection,
- amqp.WithPublisherOptionsExchangeName(h.rpcExchange),
- amqp.WithPublisherOptionsExchangeKind(h.rpcExchangeType),
- amqp.WithPublisherOptionsExchangeDeclare,
- amqp.WithPublisherOptionsExchangeDurable,
- amqp.WithPublisherOptionsExchangeNoWait,
- amqp.WithPublisherOptionsLogging,
- )
- if err != nil {
- Logrus("error", "ReplyToDeliveryPublisher Error: %v", err)
- return
- }
- err = publisher.Publish(h.rpcConsumerRes, []string{h.rpcReplyTo},
- amqp.WithPublishOptionsCorrelationID(d.CorrelationId),
- amqp.WithPublishOptionsContentType(d.ContentType),
- amqp.WithPublishOptionsTimestamp(d.Timestamp),
- amqp.WithPublishOptionsPersistentDelivery,
- )
- if err != nil {
- Logrus("error", "ReplyToDeliveryPublisher Error: %v", err)
- return
- }
- }
- /**
- * ===========================================
- * HANDLER METHOD - PUBLISHER ERROR
- * ===========================================
- **/
- func (h *rabbit) publisherError(options *dto.ConsumerOptions) error {
- if options.Exchange == "" {
- options.Exchange = h.env.AMQP_EXCHANGE
- }
- if options.ExchangeType == "" {
- options.ExchangeType = constant.Direct
- }
- consumer, err := rabbitmq.NewConsumer(h.connection, func(d rabbitmq.Delivery) (action rabbitmq.Action) {
- errorID := d.Headers["x-error-id"]
- errorMessage := d.Headers["x-error"]
- if errorID != d.CorrelationId && errorMessage == nil {
- delivery <- []byte("")
- return rabbitmq.NackDiscard
- }
- safeMap.Store(errorID, errorMessage)
- loadSafeMap, ok := safeMap.LoadAndDelete(errorID)
- if ok {
- delivery <- []byte(loadSafeMap.(string))
- } else {
- safeMap.Store(errorID, errorMessage)
- }
- return rabbitmq.Ack
- },
- options.Queue,
- rabbitmq.WithConsumerOptionsExchangeName(options.Exchange),
- rabbitmq.WithConsumerOptionsExchangeKind(options.ExchangeType),
- rabbitmq.WithConsumerOptionsConsumerName(options.ConsumerID),
- rabbitmq.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
- rabbitmq.WithConsumerOptionsQOSPrefetch(h.env.AMQP_PREFECTH),
- rabbitmq.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
- rabbitmq.WithConsumerOptionsExchangeDeclare,
- rabbitmq.WithConsumerOptionsExchangeDurable,
- rabbitmq.WithConsumerOptionsQueueDurable,
- rabbitmq.WithConsumerOptionsLogging,
- )
- if err != nil {
- return err
- }
- defer h.closeConnection(nil, consumer, nil)
- select {
- case d := <-delivery:
- loadSafeMap, ok := safeMap.LoadAndDelete(options.ConsumerID)
- if ok {
- return errors.New(loadSafeMap.(string))
- } else {
- return errors.New(string(d))
- }
- }
- }
- func (h *rabbitmq) listeningConsumer(mutex *sync.RWMutex, options *dto.PublisherOptions) (*amqp.Consumer, error) {
- consumer, err := amqp.NewConsumer(h.connection, func(d amqp.Delivery) (action amqp.Action) {
- for _, option := range publisherOptions {
- if option.CorrelationID != d.CorrelationId {
- mutex.Lock()
- defer mutex.Unlock()
- delivery <- map[string][]byte{d.CorrelationId: d.Body}
- return amqp.NackDiscard
- }
- }
- mutex.Lock()
- defer mutex.Unlock()
- delivery <- map[string][]byte{d.CorrelationId: d.Body}
- return amqp.Ack
- }, options.ReplyTo,
- amqp.WithConsumerOptionsExchangeName(options.Exchange),
- amqp.WithConsumerOptionsExchangeKind(options.ExchangeType),
- amqp.WithConsumerOptionsConsumerName(options.CorrelationID),
- amqp.WithConsumerOptionsConcurrency(h.env.AMQP_CONCURRENCY),
- amqp.WithConsumerOptionsConsumerAutoAck(h.env.AMQP_ACK),
- amqp.WithConsumerOptionsExchangeDeclare,
- amqp.WithConsumerOptionsExchangeDurable,
- amqp.WithConsumerOptionsQueueAutoDelete,
- amqp.WithConsumerOptionsLogging,
- )
- if err != nil {
- return consumer, err
- }
- return consumer, nil
- }
- func (h *rabbitmq) cancelRPC() error {
- timer := time.NewTimer(time.Duration(time.Second * 60))
- for {
- select {
- case <-timer.C:
- defer timer.Reset(time.Second * 1)
- return errors.New("cancelled")
- default:
- return nil
- }
- }
- }
- func (h *rabbitmq) closeConnection(publisher *amqp.Publisher, consumer *amqp.Consumer, connection *amqp.Conn) {
- defer h.recovery()
- if publisher != nil && consumer != nil && connection != nil {
- publisher.Close()
- consumer.Close()
- connection.Close()
- } else if publisher != nil && consumer == nil && connection != nil {
- publisher.Close()
- connection.Close()
- } else if publisher == nil && consumer != nil && connection != nil {
- consumer.Close()
- connection.Close()
- } else {
- closeChan := make(chan os.Signal, 1)
- signal.Notify(closeChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGALRM, syscall.SIGABRT, syscall.SIGUSR1)
- go func() {
- for {
- select {
- case <-closeChan:
- publisher.Close()
- consumer.Close()
- connection.Close()
- default:
- return
- }
- }
- }()
- }
- }
- func (h *rabbitmq) recovery() {
- if err := recover(); err != nil {
- return
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement