Advertisement
vitareinforce

Tambahan di Class RMQ

Apr 10th, 2019
184
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.32 KB | None | 0 0
  1. /**
  2. * Fungsi untuk subscribe data RMQ
  3. * @param handler
  4. * @param subscribeThread
  5. */
  6. public void subscribeNotification(final Handler handler, Thread subscribeThread)
  7. {
  8. subscribeThread = new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. while(true) {
  12. try {
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. channel.basicQos(0);
  16. channel.queueBind("panwaslu_notification", "amq.fanout", "*");
  17. QueueingConsumer consumer = new QueueingConsumer(channel);
  18. channel.basicConsume("panwaslu_notification", true, consumer);
  19. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  20.  
  21. if (delivery != null){
  22.  
  23. try{
  24.  
  25. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  26. Log.d("ConsumeDataRMQ", "MessageConsumed" + message);
  27.  
  28. Message msg = handler.obtainMessage();
  29. Bundle bundle = new Bundle();
  30.  
  31. bundle.putString("msg", message);
  32. msg.setData(bundle);
  33. handler.sendMessage(msg);
  34.  
  35. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  36.  
  37. }catch (Exception e){
  38. channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
  39. }
  40. }
  41. } catch (InterruptedException e) {
  42. break;
  43. } catch (Exception e1) {
  44. Log.d("", "Connection broken: " + e1.getClass().getName());
  45. try {
  46. Thread.sleep(4000); //sleep and then try again
  47. } catch (InterruptedException e) {
  48. break;
  49. }
  50. }
  51. }
  52. }
  53. });
  54. subscribeThread.start();
  55. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement