Skip to content

Commit

Permalink
futures: add queue_delete method
Browse files Browse the repository at this point in the history
  • Loading branch information
dfaust committed Apr 7, 2017
1 parent d5c83aa commit 2b22d2c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
45 changes: 41 additions & 4 deletions futures/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,14 @@ impl<T: AsyncRead+AsyncWrite+'static> Channel<T> {
}
}

/// purges a queue
pub fn queue_purge(&self, name: &str) -> Box<Future<Item = (), Error = io::Error>> {
/// Purge a queue.
///
/// This method removes all messages from a queue which are not awaiting acknowledgment.
pub fn queue_purge(&self, queue_name: &str) -> Box<Future<Item = (), Error = io::Error>> {
let cl_transport = self.transport.clone();

if let Ok(mut transport) = self.transport.lock() {
match transport.conn.queue_purge(self.id, 0, name.to_string(), false) {
match transport.conn.queue_purge(self.id, 0, queue_name.to_string(), false) {
Err(e) => Box::new(
future::err(Error::new(ErrorKind::ConnectionAborted, format!("could not purge queue: {:?}", e)))
),
Expand All @@ -333,7 +335,42 @@ impl<T: AsyncRead+AsyncWrite+'static> Channel<T> {
} else {
//FIXME: if we're there, it means the mutex failed
Box::new(future::err(
Error::new(ErrorKind::ConnectionAborted, format!("could not purge queue {}", name))
Error::new(ErrorKind::ConnectionAborted, format!("could not purge queue {}", queue_name))
))
}
}

/// Delete a queue.
///
/// This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue
/// if this is defined in the server configuration, and all consumers on the queue are cancelled.
///
/// If `if_unused` is set, the server will only delete the queue if it has no consumers.
/// If the queue has consumers the server does not delete it but raises a channel exception instead.
///
/// If `if_empty` is set, the server will only delete the queue if it has no messages.
pub fn queue_delete(&self, queue_name: &str, if_unused: bool, if_empty: bool) -> Box<Future<Item = (), Error = io::Error>> {
let cl_transport = self.transport.clone();

if let Ok(mut transport) = self.transport.lock() {
match transport.conn.queue_delete(self.id, 0, queue_name.to_string(), if_unused, if_empty, false) {
Err(e) => Box::new(
future::err(Error::new(ErrorKind::ConnectionAborted, format!("could not delete queue: {:?}", e)))
),
Ok(request_id) => {
trace!("delete request id: {}", request_id);
transport.send_frames();

transport.handle_frames();

trace!("delete returning closure");
wait_for_answer(cl_transport, request_id)
},
}
} else {
//FIXME: if we're there, it means the mutex failed
Box::new(future::err(
Error::new(ErrorKind::ConnectionAborted, format!("could not delete queue {}", queue_name))
))
}
}
Expand Down
9 changes: 6 additions & 3 deletions futures/tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ fn connection() {
let id = channel.id;
info!("created channel with id: {}", id);

let ch = channel.clone();
let ch1 = channel.clone();
let ch2 = channel.clone();
channel.queue_declare("hello", &QueueDeclareOptions::default(), FieldTable::new()).and_then(move |_| {
info!("channel {} declared queue {}", id, "hello");

Expand All @@ -56,9 +57,11 @@ fn connection() {
let msg = message.unwrap();
info!("got message: {:?}", msg);
assert_eq!(msg.data, b"hello from tokio");
ch.basic_ack(msg.delivery_tag);
ch1.basic_ack(msg.delivery_tag);
Ok(())
}).map_err(|(err, _)| err)
}).map_err(|(err, _)| err).and_then(move |_| {
ch2.queue_delete("hello", false, false)
})
})
})
})
Expand Down

0 comments on commit 2b22d2c

Please sign in to comment.