Skip to content

Commit

Permalink
Merge pull request wso2#743 from akalankapagoda/MB-1860
Browse files Browse the repository at this point in the history
[MB-1860]Queue delete and purge in non durable path
  • Loading branch information
akalankapagoda authored Nov 9, 2016
2 parents 6ac81b8 + 626faff commit b2c7ee2
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.wso2.andes.server.binding.Binding;
import org.wso2.andes.server.exchange.Exchange;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.message.ServerMessage;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.IncomingMessage;
import org.wso2.andes.server.queue.QueueEntry;
Expand Down Expand Up @@ -514,7 +513,7 @@ public static void createBinding(Exchange exchange, AMQShortString routingKey,
Andes.getInstance().addBinding(binding);
} catch (AndesException e) {
log.error("error while creating binding", e);
throw new AMQInternalException("error while removing queue", e);
throw new AMQInternalException("error while creating binding", e);
}
}

Expand All @@ -533,6 +532,14 @@ public static void removeBinding(Binding binding) throws AndesException {
if (log.isDebugEnabled()) {
log.debug("Ignored binding for default exchange " + exchange.getNameShortString());
}
return;
} else if (!binding.getQueue().isDurable() &&
exchange.getNameShortString().equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) {

if (log.isDebugEnabled()) {
log.debug("Ignored binding for non durable topic " + binding.getBindingKey());
}

return;
}

Expand Down Expand Up @@ -574,7 +581,7 @@ private static void addLocalSubscriptionsForAllBindingsOfQueue(AMQQueue queue, S
String boundExchangeName = b.getExchange().getName();
String bindingKey = b.getBindingKey();
String queueName = queue.getName();
boolean isDurable = b.getQueue().isDurable();
boolean isDurable = b.getQueue().isDurable();

String storageQueueToBind = AndesUtils.getStorageQueueForDestination(bindingKey,
boundExchangeName,queueName,isDurable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,11 @@ public boolean checkIfQueueDeletable(String queueName) throws AndesException {
* Delete the queue from broker. This will remove all bounded subscriptions, notify and
* remove bindings before removing the queue.
*
* @param queueDeleteEvent name of the storage queue to delete
* @param storageQueue storage queue to delete
* @throws AndesException issue on deleting queue
*/
public void deleteQueue(InboundQueueEvent queueDeleteEvent) throws AndesException {

StorageQueue queueWithEvent = queueDeleteEvent.toStorageQueue();
String storageQueueName = queueWithEvent.getName();
public void deleteQueue(StorageQueue storageQueue) throws AndesException {
String storageQueueName = storageQueue.getName();

//remove all subscriptions to the queue
subscriptionManager.closeAllSubscriptionsBoundToQueue(storageQueueName);
Expand All @@ -300,7 +298,11 @@ public void deleteQueue(InboundQueueEvent queueDeleteEvent) throws AndesExceptio
}

//purge the queue cluster-wide. Other nodes will only delete messages buffered to memory on those nodes
handleQueuePurge(queueDeleteEvent);
int purgedMessageCount = storageQueue.purgeMessages();

if (log.isDebugEnabled()) {
log.debug(purgedMessageCount + " messages purged while deleting queue " + storageQueueName );
}

// Remove queue information from database
contextStore.deleteQueueInformation(storageQueueName);
Expand All @@ -309,8 +311,7 @@ public void deleteQueue(InboundQueueEvent queueDeleteEvent) throws AndesExceptio
messageStore.removeLocalQueueData(storageQueueName);

//identify storage queue, unbind it from router and delete from queue registry
StorageQueue storageQueue = AndesContext.getInstance().
getStorageQueueRegistry().removeStorageQueue(storageQueueName);
AndesContext.getInstance().getStorageQueueRegistry().removeStorageQueue(storageQueueName);

//Notify cluster to delete queue
clusterNotificationAgent.notifyQueueChange(storageQueue, ClusterNotificationListener.QueueChange.Deleted);
Expand Down Expand Up @@ -382,7 +383,7 @@ public void createBinding(InboundBindingEvent bindingEvent) throws AndesExceptio
AndesMessageRouter messageRouter = AndesContext.getInstance().
getMessageRouterRegistry().getMessageRouter(messageRouterName);

queue.bindQueueToMessageRouter(bindingKey,messageRouter);
queue.bindQueueToMessageRouter(bindingKey, messageRouter);

AndesBinding binding = new AndesBinding(bindingEvent.getBoundMessageRouterName(),
queue, bindingEvent.getBindingKey());
Expand Down Expand Up @@ -425,6 +426,20 @@ public void syncCreateBinding(InboundBindingSyncEvent bindingSyncEvent) throws A
public void removeBinding(InboundBindingEvent removeBindingEvent) throws AndesException {
String messageRouterName = removeBindingEvent.getBoundMessageRouterName();
String boundQueueName = removeBindingEvent.getBoundedQueue().getQueueName();
String bindingKey = removeBindingEvent.getBindingKey();

removeBinding(messageRouterName, boundQueueName, bindingKey);
}

/**
* Remove andes binding from andes kernel.
*
* @param messageRouterName Message router name of the binding
* @param boundQueueName The bound queue name of the binding
* @param bindingKey The binding key
* @throws AndesException
*/
public void removeBinding(String messageRouterName, String boundQueueName, String bindingKey) throws AndesException {

StorageQueue queue = AndesContext.getInstance().
getStorageQueueRegistry().getStorageQueue(boundQueueName);
Expand All @@ -438,20 +453,12 @@ public void removeBinding(InboundBindingEvent removeBindingEvent) throws AndesEx
* every internal queues it create for each subscriber
*/
if ((null != queue) && (null != messageRouter)) {
messageRouter.removeMapping(removeBindingEvent.getBindingKey(), queue);
messageRouter.removeMapping(bindingKey, queue);

AndesBinding removedBinding = amqpConstructStore.removeBinding(messageRouterName, boundQueueName, true);

clusterNotificationAgent.notifyBindingsChange(removedBinding,
ClusterNotificationListener.BindingChange.Deleted);

//if a non durable queue on binding removal delete the queue if there are no more
// subscribers. Delete call for non durable queues is prevented at Qpid-Andes bridge.
if (!queue.isDurable() && queue.getBoundSubscriptions().isEmpty()) {
InboundQueueEvent queueDeleteEvent = new InboundQueueEvent(queue.getName(),
queue.isDurable(), queue.isShared(), queue.getQueueOwner(), queue.isExclusive());
deleteQueue(queueDeleteEvent);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void handleIsQueueDeletableEvent() {
private void handleQueueDeleteEvent() {
boolean isComplete = false;
try {
contextInformationManager.deleteQueue(this);
contextInformationManager.deleteQueue(toStorageQueue());
isComplete = true;
} catch (AndesException e) {
isEventComplete.setException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.amqp.AMQPUtils;
import org.wso2.andes.kernel.AndesContext;
import org.wso2.andes.kernel.AndesContextInformationManager;
import org.wso2.andes.kernel.AndesContextStore;
import org.wso2.andes.kernel.AndesException;
import org.wso2.andes.kernel.ClusterNotificationListener;
Expand Down Expand Up @@ -247,7 +248,10 @@ private void removeLocalSubscriptionAndNotify(AndesSubscription subscription) th

subscriptionRegistry.removeSubscription(subscription);

subscription.getStorageQueue().unbindSubscription(subscription);
StorageQueue storageQueue = subscription.getStorageQueue();

storageQueue.unbindSubscription(subscription);

if (!storeUnavailable) {
try {
andesContextStore.removeDurableSubscription(subscription);
Expand All @@ -262,6 +266,15 @@ private void removeLocalSubscriptionAndNotify(AndesSubscription subscription) th
clusterNotificationAgent.notifySubscriptionsChange(subscription,
ClusterNotificationListener.SubscriptionChange.Closed);

// If there are no subscriptions for this queue, then delete it
if (!storageQueue.isDurable() && storageQueue.getBoundSubscriptions().isEmpty() ) {

AndesContextInformationManager contextInformationManager = AndesContext.getInstance()
.getAndesContextInformationManager();

contextInformationManager.deleteQueue(storageQueue);
}

log.info("Remove Local Subscription " + subscription.getProtocolType() + " " + subscription.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.wso2.andes.configuration.AndesConfigurationManager;
import org.wso2.andes.configuration.enums.AndesConfiguration;
import org.wso2.andes.kernel.AndesContext;
import org.wso2.andes.kernel.AndesContextInformationManager;
import org.wso2.andes.kernel.AndesException;
import org.wso2.andes.kernel.DeliverableAndesMetadata;
import org.wso2.andes.kernel.MessageHandler;
Expand Down Expand Up @@ -308,11 +309,8 @@ public void unbindSubscription(AndesSubscription subscription) throws AndesExcep
//return slots back to coordinator
messageHandler.releaseAllSlots();
messageHandler.clearReadButUndeliveredMessages();
} else {
//On binding removal call we will do this
//unbindQueueFromMessageRouter();
purgeMessages();
}

messageHandler.stopMessageDelivery(this);
} else {
subscription.rebufferUnackedMessages();
Expand Down

0 comments on commit b2c7ee2

Please sign in to comment.