Skip to content

Commit

Permalink
Add multiple support to ack/nack
Browse files Browse the repository at this point in the history
  • Loading branch information
whitfin authored and Keruspe committed Jun 18, 2018
1 parent 8bc2707 commit 0fd5665
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 9 deletions.
4 changes: 2 additions & 2 deletions futures/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn main() {
channel.basic_get("hello", BasicGetOptions::default()).and_then(move |message| {
info!("got message: {:?}", message);
info!("decoded message: {:?}", std::str::from_utf8(&message.delivery.data).unwrap());
channel.basic_ack(message.delivery.delivery_tag)
channel.basic_ack(message.delivery.delivery_tag, false)
}).and_then(move |_| {
ch.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::new())
})
Expand All @@ -79,7 +79,7 @@ fn main() {
stream.for_each(move |message| {
debug!("got message: {:?}", message);
info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
c.basic_ack(message.delivery_tag)
c.basic_ack(message.delivery_tag, false)
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion futures/examples/consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn create_consumer<T: AsyncRead + AsyncWrite + Sync + Send + 'static>(client: &C
info!("got stream for consumer {}", n);
stream.for_each(move |message| {
println!("consumer '{}' got '{}'", n, std::str::from_utf8(&message.data).unwrap());
channel.basic_ack(message.delivery_tag)
channel.basic_ack(message.delivery_tag, false)
})
}).map(|_| ()).map_err(move |err| eprintln!("got error in consumer '{}': {:?}", n, err))
}
Expand Down
8 changes: 4 additions & 4 deletions futures/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,20 +394,20 @@ impl<T: AsyncRead+AsyncWrite+Send+Sync+'static> Channel<T> {
}

/// acks a message
pub fn basic_ack(&self, delivery_tag: u64) -> impl Future<Item = (), Error = io::Error> + Send + 'static {
pub fn basic_ack(&self, delivery_tag: u64, multiple: bool) -> impl Future<Item = (), Error = io::Error> + Send + 'static {
let channel_id = self.id;

self.run_on_locked_transport("basic_ack", "Could not ack message", move |transport| {
transport.conn.basic_ack(channel_id, delivery_tag, false).map(|_| None)
transport.conn.basic_ack(channel_id, delivery_tag, multiple).map(|_| None)
}).map(|_| ())
}

/// nacks a message
pub fn basic_nack(&self, delivery_tag: u64, requeue: bool) -> impl Future<Item = (), Error = io::Error> + Send + 'static {
pub fn basic_nack(&self, delivery_tag: u64, multiple: bool, requeue: bool) -> impl Future<Item = (), Error = io::Error> + Send + 'static {
let channel_id = self.id;

self.run_on_locked_transport("basic_nack", "Could not nack message", move |transport| {
transport.conn.basic_nack(channel_id, delivery_tag, false, requeue).map(|_| None)
transport.conn.basic_nack(channel_id, delivery_tag, multiple, requeue).map(|_| None)
}).map(|_| ())
}

Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
//! stream.for_each(move |message| {
//! debug!("got message: {:?}", message);
//! info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
//! ch.basic_ack(message.delivery_tag);
//! ch.basic_ack(message.delivery_tag, false);
//! Ok(())
//! })
//! })
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn connection() {
let msg = message.unwrap();
info!("got message: {:?}", msg);
assert_eq!(msg.data, b"hello from tokio");
ch1.basic_ack(msg.delivery_tag)
ch1.basic_ack(msg.delivery_tag, false)
}).and_then(move |_| {
ch2.queue_delete("hello", QueueDeleteOptions::default())
})
Expand Down

0 comments on commit 0fd5665

Please sign in to comment.