Skip to content

Commit

Permalink
Merge pull request wso2#729 from pumudu88/master
Browse files Browse the repository at this point in the history
[MB-1846] Optimize mb slot recovery process during startup
  • Loading branch information
pumudu88 authored Oct 14, 2016
2 parents dd1d712 + 3f1808b commit a3fde24
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.gs.collections.impl.map.mutable.primitive.LongObjectHashMap;
import org.wso2.andes.configuration.util.ConfigurationProperties;
import org.wso2.andes.kernel.slot.Slot;
import org.wso2.andes.kernel.slot.RecoverySlotCreator;
import org.wso2.andes.store.HealthAwareStore;

import java.util.List;
Expand Down Expand Up @@ -162,17 +163,17 @@ List<AndesMessageMetadata> getNextNMessageMetadataFromQueue(final String storage
int count) throws AndesException;

/**
* Read a list of message ids from store specifying a starting message id
* and a count
* Recover slots for given queue. Will update slot message ids during startup.
*
* @param storageQueueName name of the queue
* @param firstMsgId first id
* @param count how many messages to read
* @return list of messageIds
* @param storageQueueName storage queue name
* @param firstMsgId first message id
* @param messageLimitPerSlot slot size
* @param callBack callBack for slot creator
* @return total number of recovered message count
* @throws AndesException
*/
public LongArrayList getNextNMessageIdsFromQueue(final String storageQueueName, long firstMsgId, int count)
throws AndesException;
int recoverSlotsForQueue(final String storageQueueName, long firstMsgId, int messageLimitPerSlot,
RecoverySlotCreator.CallBack callBack) throws AndesException;

/**
* Retrieve a metadata list from dead letter channel for a specific queue specifying a starting message id and a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.andes.kernel.slot;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.kernel.AndesContext;
import org.wso2.andes.kernel.AndesException;

import java.sql.SQLException;

/**
* Callback for create slot map for given storage queue.
*/
public class RecoverySlotCreator {

private static Log log = LogFactory.getLog(RecoverySlotCreator.class);

/**
* Inner callback class for recovery slot creator.
*/
public static class CallBack {

/**
*
* Initialize slot map for given queue
*
* @param storageQueueName storage queue name
* @param firstMessageID first messageID
* @param lastMessageID last messageID
* @param messageCount message count for given slot
* @throws SQLException
* @throws AndesException
*/
public void initializeSlotMapForQueue(String storageQueueName,long firstMessageID,long lastMessageID,
int messageCount) throws SQLException, AndesException {

if (AndesContext.getInstance().isClusteringEnabled()) {
SlotManagerClusterMode.getInstance().updateMessageID(storageQueueName,
AndesContext.getInstance().getClusterAgent().getLocalNodeIdentifier(),
firstMessageID, lastMessageID, lastMessageID);
} else {
SlotManagerStandalone.getInstance().updateMessageID(storageQueueName, lastMessageID);
}
if (log.isDebugEnabled()) {
log.debug("Created a slot with " + messageCount + " messages for queue (" + storageQueueName + ")");
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@

package org.wso2.andes.kernel.slot;

import com.gs.collections.impl.list.mutable.primitive.LongArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.configuration.AndesConfigurationManager;
import org.wso2.andes.configuration.enums.AndesConfiguration;
import org.wso2.andes.kernel.AndesContext;
import org.wso2.andes.kernel.AndesException;
import org.wso2.andes.kernel.MessageStore;

import java.sql.SQLException;

/**
* SlotCreator is used to recover slots belonging to a storage queue when the cluster is restarted.
*/
public class SlotCreator implements Runnable {

/**
* Interval between two consecutive stat logs in milliseconds
*/
private static final int STAT_PUBLISHING_INTERVAL = 10 * 1000;

/**
* Class logger
*/
Expand Down Expand Up @@ -76,62 +71,17 @@ public void run() {
}

/**
* Iteratively recover messages for the storage queue
* Recover messages for the storage queue
*
* @throws AndesException
* @throws ConnectionException
* @throws SQLException
*/
private void initializeSlotMapForQueue() throws AndesException, ConnectionException {
int databaseReadsCounter = 0;
int restoreMessagesCounter = 0;
long messageCountOfQueue = messageStore.getMessageCountForQueue(queueName);

LongArrayList messageIdList = messageStore.getNextNMessageIdsFromQueue(queueName, 0, slotSize);
int numberOfMessages = messageIdList.size();

databaseReadsCounter++;
restoreMessagesCounter = restoreMessagesCounter + messageIdList.size();
private void initializeSlotMapForQueue() throws AndesException, SQLException {

long lastMessageID;
long firstMessageID;
long lastStatPublishTime = System.currentTimeMillis();
RecoverySlotCreator.CallBack slotCreatorCallBack = new RecoverySlotCreator.CallBack();
int restoreMessagesCounter = messageStore.recoverSlotsForQueue(queueName, 0, slotSize, slotCreatorCallBack);

while (numberOfMessages > 0) {
int lastMessageArrayIndex = numberOfMessages - 1;
lastMessageID = messageIdList.get(lastMessageArrayIndex);
firstMessageID = messageIdList.get(0);

if (log.isDebugEnabled()) {
log.debug("Created a slot with " + messageIdList.size() + " messages for queue (" + queueName + ")");
}

if (AndesContext.getInstance().isClusteringEnabled()) {
SlotManagerClusterMode.getInstance().updateMessageID(queueName,
AndesContext.getInstance().getClusterAgent().getLocalNodeIdentifier(), firstMessageID,
lastMessageID, lastMessageID);
} else {
SlotManagerStandalone.getInstance().updateMessageID(queueName, lastMessageID);
}

long currentTimeInMillis = System.currentTimeMillis();
if (currentTimeInMillis - lastStatPublishTime > STAT_PUBLISHING_INTERVAL) {
// messageCountOfQueue is multiplied by 1.0 to convert it to double
double recoveredPercentage = (restoreMessagesCounter / (messageCountOfQueue * 1.0)) * 100.0;
log.info(restoreMessagesCounter + "/" + messageCountOfQueue + " (" + Math.round(recoveredPercentage)
+ "%) messages recovered for queue \"" + queueName + "\"");
lastStatPublishTime = currentTimeInMillis;
}

// We need to increment lastMessageID since the getNextNMessageMetadataFromQueue returns message list
// including the given starting ID.
messageIdList = messageStore.getNextNMessageIdsFromQueue(queueName, lastMessageID + 1, slotSize);
numberOfMessages = messageIdList.size();
//increase value of counters
databaseReadsCounter++;
restoreMessagesCounter = restoreMessagesCounter + messageIdList.size();
}
log.info("Recovered " + restoreMessagesCounter + " messages for queue \"" + queueName + "\".");

log.info("Recovered " + restoreMessagesCounter + " messages for queue \"" + queueName + "\" using "
+ databaseReadsCounter + " database calls");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.wso2.andes.kernel.DurableStoreConnection;
import org.wso2.andes.kernel.MessageStore;
import org.wso2.andes.kernel.slot.Slot;
import org.wso2.andes.kernel.slot.RecoverySlotCreator;
import org.wso2.andes.tools.utils.MessageTracer;

import java.util.List;
Expand Down Expand Up @@ -241,10 +242,10 @@ public long getMessageCountForQueueInRange(final String storageQueueName, long f
/**
* {@inheritDoc}
*/
public LongArrayList getNextNMessageIdsFromQueue(final String storageQueueName, long firstMsgId, int count)
throws AndesException {
public int recoverSlotsForQueue(final String storageQueueName, long firstMsgId, int count,
RecoverySlotCreator.CallBack callBack) throws AndesException {
try {
return wrappedInstance.getNextNMessageIdsFromQueue(storageQueueName, firstMsgId, count);
return wrappedInstance.recoverSlotsForQueue(storageQueueName,firstMsgId,count,callBack);
} catch (AndesStoreUnavailableException exception) {
notifyFailures(exception);
throw exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.wso2.andes.kernel.DurableStoreConnection;
import org.wso2.andes.kernel.MessageStore;
import org.wso2.andes.kernel.slot.Slot;
import org.wso2.andes.kernel.slot.RecoverySlotCreator;
import org.wso2.andes.metrics.MetricsConstants;
import org.wso2.andes.server.queue.DLCQueueUtils;
import org.wso2.andes.store.AndesDataIntegrityViolationException;
Expand Down Expand Up @@ -89,6 +90,11 @@ public class RDBMSMessageStoreImpl implements MessageStore {
*/
private AndesMessageCache messageCache;

/**
* Interval between two consecutive stat logs in milliseconds for slot recovery process
*/
private static final int STAT_PUBLISHING_INTERVAL = 10 * 1000;

/**
* Partially created prepared statement to retrieve content of multiple messages using IN operator
* this will be completed on the fly when the request comes
Expand Down Expand Up @@ -743,6 +749,7 @@ public List<DeliverableAndesMetadata> getMetadataList(Slot slot, final String st
Connection connection = null;
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
long getMetadataListExecutionStart = 0;

Context metaListRetrievalContext = MetricManager.timer(MetricsConstants.GET_META_DATA_LIST, Level.INFO).start();
Context contextRead = MetricManager.timer(MetricsConstants.DB_READ, Level.INFO).start();
Expand All @@ -754,7 +761,16 @@ public List<DeliverableAndesMetadata> getMetadataList(Slot slot, final String st
preparedStatement.setLong(2, firstMsgId);
preparedStatement.setLong(3, lastMsgID);

if (log.isDebugEnabled()) {
log.debug("Started executing get metadata range query for queue :" + storageQueueName);
getMetadataListExecutionStart = System.currentTimeMillis();
}
resultSet = preparedStatement.executeQuery();
if (log.isDebugEnabled()) {
log.debug("Time elapsed for execute get metadata range query "+ storageQueueName + " : " +
(System.currentTimeMillis() - getMetadataListExecutionStart)+" milliseconds.");
}

while (resultSet.next()) {
DeliverableAndesMetadata md = new DeliverableAndesMetadata(slot,
resultSet.getLong(RDBMSConstants.MESSAGE_ID), resultSet.getBytes(RDBMSConstants.METADATA),
Expand Down Expand Up @@ -826,15 +842,18 @@ public long getMessageCountForQueueInRange(final String storageQueueName, long f
/**
* {@inheritDoc}
*/
public LongArrayList getNextNMessageIdsFromQueue(final String storageQueueName, long firstMsgId, int count)
throws AndesException {
LongArrayList mdList = new LongArrayList(count);
public int recoverSlotsForQueue(final String storageQueueName, long firstMsgId, int messageLimitPerSlot,
RecoverySlotCreator.CallBack callBack) throws AndesException {

long messageCountForQueue = getMessageCountForQueue(storageQueueName);

Connection connection = null;
PreparedStatement preparedStatement = null;
ResultSet results = null;
int restoreMessagesCounter = 0;

Context nextMessageIdsRetrievalContext = MetricManager
.timer(Level.INFO, MetricsConstants.GET_NEXT_MESSAGE_IDS_FROM_QUEUE).start();
.timer(MetricsConstants.GET_NEXT_MESSAGE_IDS_FROM_QUEUE, Level.INFO).start();
Context contextRead = MetricManager.timer(MetricsConstants.DB_READ, Level.INFO).start();

try {
Expand All @@ -844,17 +863,35 @@ public LongArrayList getNextNMessageIdsFromQueue(final String storageQueueName,
preparedStatement.setInt(2, getCachedQueueID(storageQueueName));

results = preparedStatement.executeQuery();
int resultCount = 0;

long lastStatPublishTime = System.currentTimeMillis();

long batchStartMessageID = 0;
int currentBatchCount = 0;
long currentMessageId = 0;
while (results.next()) {
currentMessageId = results.getLong(RDBMSConstants.MESSAGE_ID);

if (resultCount == count) {
break;
if (currentBatchCount == 0) {
batchStartMessageID = currentMessageId;
}

long messageId = results.getLong(RDBMSConstants.MESSAGE_ID);
currentBatchCount++;

mdList.add(messageId);
resultCount++;
if (currentBatchCount == messageLimitPerSlot) {
callBack.initializeSlotMapForQueue(storageQueueName, batchStartMessageID, currentMessageId,
messageLimitPerSlot);
restoreMessagesCounter = restoreMessagesCounter + currentBatchCount;
currentBatchCount = 0;
}
lastStatPublishTime = publishStat(storageQueueName, messageCountForQueue, restoreMessagesCounter,
lastStatPublishTime);
}

if (currentBatchCount < messageLimitPerSlot) {
restoreMessagesCounter = restoreMessagesCounter + currentBatchCount;
callBack.initializeSlotMapForQueue(storageQueueName, batchStartMessageID, currentMessageId,
messageLimitPerSlot);
}
} catch (SQLException e) {
throw rdbmsStoreUtils.convertSQLException("error occurred while retrieving message ids from queue ", e);
Expand All @@ -863,7 +900,29 @@ public LongArrayList getNextNMessageIdsFromQueue(final String storageQueueName,
contextRead.stop();
close(connection, preparedStatement, results, RDBMSConstants.TASK_RETRIEVING_NEXT_N_IDS_FROM_QUEUE);
}
return mdList;
return restoreMessagesCounter;
}

/**
* Publish restore slot process progress in given time intervals
*
* @param storageQueueName storage queue name
* @param messageCountForQueue message count for queue
* @param restoreMessagesCounter restore message counter
* @param lastStatPublishTime last stat publish time
* @return last stat publish time
*/
private long publishStat(String storageQueueName, long messageCountForQueue,
int restoreMessagesCounter, long lastStatPublishTime) {
long currentTimeInMillis = System.currentTimeMillis();
if (currentTimeInMillis - lastStatPublishTime > STAT_PUBLISHING_INTERVAL) {
// messageCountOfQueue is multiplied by 1.0 to convert it to double
double recoveredPercentage = (restoreMessagesCounter / (messageCountForQueue * 1.0)) * 100.0;
log.info(restoreMessagesCounter + "/" + messageCountForQueue + " (" + Math.round(recoveredPercentage)
+ "%) messages recovered for queue \"" + storageQueueName + "\"");
lastStatPublishTime = currentTimeInMillis;
}
return lastStatPublishTime;
}

/**
Expand Down

0 comments on commit a3fde24

Please sign in to comment.