Advertisement
MatrixDeity

Untitled

Feb 13th, 2024
883
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 1.06 KB | None | 0 0
  1. func (s *Subscriber) SubscribeToRidsStatuses(
  2.     ctx context.Context, wg *sync.WaitGroup, processor registrar.RidsStatusesProcessor,
  3. ) error {
  4.     log := s.log.
  5.         AddName("receiving_rids_statuses").
  6.         AddName(ctxparam.GoID(ctx)).
  7.         AddData(logs.XRequestID, ctxparam.RequestID(ctx))
  8.  
  9.     messages, err := s.kafka.Subscribe(ctx, s.config.SridsStatusesTopic)
  10.     if err != nil {
  11.         return fmt.Errorf("kafka subscribing: %w", err)
  12.     }
  13.  
  14.     wg.Add(1)
  15.     go func() {
  16.         defer wg.Done()
  17.         for m := range messages {
  18.             var err error
  19.  
  20.             pbUpdate := &pb.EventRIDStatusUpdate{}
  21.             err = protojson.Unmarshal(m.Payload, pbUpdate)
  22.             if err != nil {
  23.                 log.Errorf("unmarshalling %q rid status update: %s", m.UUID, err)
  24.                 continue
  25.             }
  26.  
  27.             if err := validateRIDStatusUpdate(pbUpdate); err != nil {
  28.                 log.Errorf("validation rid status update: %s", err)
  29.                 continue
  30.             }
  31.  
  32.             msg := registrar.NewMessage(pbUpdate.Srid.Value, int(pbUpdate.Status.Value))
  33.             wg.Add(1)
  34.             go func() {
  35.                 defer wg.Done()
  36.                 processor(ctxparam.WithNewGoID(ctx), msg)
  37.             }()
  38.         }
  39.     }()
  40.  
  41.     return nil
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement