Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using RabbitMQ;
- using RabbitMQ.Client;
- using System.Diagnostics;
- using RabbitMQ.Client.Events;
- //download rabbitmq client buat UWP disini https://drive.google.com/file/d/0B8YLfSLDHjT-amVpcnBWaEZ2Yjg/view?usp=sharing
- namespace HMD_Holo
- {
- class AMQPService
- {
- private string hostname = "localhost";
- private int port = 5672;
- private string user = "guest";
- private string pass = "guest";
- private string vhost = "/";
- private string routing_key = "kfx_queue";
- ConnectionFactory connectionFactory;
- IConnection connection;
- IModel channel;
- private string replyQueueName;
- private QueueingBasicConsumer consumer;
- public AMQPService()
- {
- connectionFactory = new ConnectionFactory();
- connectionFactory.HostName = hostname;
- connectionFactory.Port = port;
- connectionFactory.UserName = user;
- connectionFactory.Password = pass;
- connectionFactory.VirtualHost = vhost;
- _connect();
- if(connection.IsOpen) {
- _createChannel();
- }
- }
- ~AMQPService()
- {
- Close();
- }
- private async void _connect()
- {
- Debug.WriteLine("Attempt to connect to server : " + hostname);
- connection = connectionFactory.CreateConnection();
- Debug.WriteLine("Connection Status : " + connection.IsOpen);
- }
- private async void _createChannel()
- {
- Debug.WriteLine("Create Channel");
- channel = connection.CreateModel();
- Debug.WriteLine("Setup Basic QOS");
- channel.BasicQos(0, 1, false);
- if(channel.IsOpen)
- {
- replyQueueName = channel.QueueDeclare().QueueName;
- Debug.WriteLine("Declare Queue : " + replyQueueName);
- consumer = new QueueingBasicConsumer(channel);
- Debug.WriteLine("Consume Queue");
- channel.BasicConsume(queue: replyQueueName,
- noAck: true,
- consumer: consumer);
- Debug.WriteLine("Queue Consumed");
- }
- }
- //ini gimana caranya biar bisa asynchronous??
- public string Call(string message)
- {
- var corrId = Guid.NewGuid().ToString();
- Debug.WriteLine("Corr ID : " + corrId.ToString());
- var props = channel.CreateBasicProperties();
- Debug.WriteLine("Props : " + props.ToString());
- props.ReplyTo = replyQueueName;
- Debug.WriteLine("Reply To : " + replyQueueName);
- props.CorrelationId = corrId;
- Debug.WriteLine("Corr ID : " + corrId.ToString());
- var messageBytes = Encoding.UTF8.GetBytes(message);
- Debug.WriteLine("Get Message");
- channel.BasicPublish(exchange: "",
- routingKey: routing_key,
- basicProperties: props,
- body: messageBytes);
- Debug.WriteLine("Publish Message");
- while (true)
- {
- var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
- if (ea.BasicProperties.CorrelationId == corrId)
- {
- Debug.WriteLine("Message Content : " + Encoding.UTF8.GetString(ea.Body));
- return Encoding.UTF8.GetString(ea.Body);
- }
- }
- }
- public async void Close()
- {
- connection.Close();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement