Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- use lapin::{Channel, ConsumerDelegate, Connection, ConnectionProperties};
- use lapin::message::DeliveryResult;
- use lapin::options::{BasicAckOptions, QueueDeclareOptions, BasicConsumeOptions, BasicPublishOptions};
- use lapin::types::FieldTable;
- use lapin::protocol::BasicProperties;
- #[derive(Clone, Debug)]
- struct Subscriber {
- channel: Channel,
- }
- impl ConsumerDelegate for Subscriber {
- fn on_new_delivery(&self, delivery: DeliveryResult) {
- if let Some(delivery) = delivery.unwrap() {
- self.channel
- .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
- .wait()
- .expect("basic_ack");
- }
- }
- }
- fn my_test() {
- let addr =
- std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
- let conn = Connection::connect(&addr, ConnectionProperties::default())
- .wait()
- .expect("connection error");
- println!("CONNECTED");
- let channel_a = conn.create_channel().wait().expect("create_channel");
- let channel_b = conn.create_channel().wait().expect("create_channel");
- channel_a
- .queue_declare(
- "hello",
- QueueDeclareOptions::default(),
- FieldTable::default(),
- )
- .wait()
- .expect("queue_declare");
- let _queue = channel_b
- .queue_declare(
- "hello",
- QueueDeclareOptions::default(),
- FieldTable::default(),
- )
- .wait()
- .expect("queue_declare");
- println!("will consume");
- channel_b
- .clone()
- .basic_consume(
- "hello",
- "my_consumer",
- BasicConsumeOptions::default(),
- FieldTable::default(),
- )
- .wait()
- .expect("basic_consume")
- .set_delegate(Box::new(Subscriber { channel: channel_b }));
- let payload = b"Hello world!";
- loop {
- channel_a
- .basic_publish(
- "",
- "hello",
- BasicPublishOptions::default(),
- payload.to_vec(),
- BasicProperties::default(),
- )
- .wait()
- .expect("basic_publish");
- }
- }
Add Comment
Please, Sign In to add comment