Skip to content

Commit

Permalink
HORNETQ-720 Remove the concurrent largeMessage deletion, add a statef…
Browse files Browse the repository at this point in the history
…ul cache

:-S
  • Loading branch information
Francisco Borges committed Mar 25, 2013
1 parent 49e70dd commit 03c56b8
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ public interface SequentialFileFactory
{
SequentialFile createSequentialFile(String fileName, int maxIO);

/**
* Lists files that end with the given extension.
* <p>
* This method inserts a ".' before the extension.
* @param extension
* @return
* @throws Exception
*/
List<String> listFiles(String extension) throws Exception;

boolean isSupportsCallbacks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -214,6 +215,8 @@ public static JournalContent getType(byte type)
private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings =
new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();

private final Set<Long> largeMessagesToDelete = new HashSet<Long>();

public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
{
this(config, executorFactory, null);
Expand Down Expand Up @@ -433,8 +436,8 @@ public void startReplication(ReplicationManager replicationManager, PagingManage
if (replicator != null)
{
replicator.sendSynchronizationDone(nodeID);
performCachedLargeMessageDeletes();
}
// XXX HORNETQ-720 SEND a compare journals message?
}
finally
{
Expand Down Expand Up @@ -485,7 +488,6 @@ public static String md5(File file)
@Override
public void stopReplication()
{

storageManagerLock.writeLock().lock();
try
{
Expand All @@ -502,13 +504,41 @@ public void stopReplication()
HornetQServerLogger.LOGGER.errorStoppingReplicationManager(e);
}
replicator = null;
// delete inside the writeLock. Avoids a lot of state checking and races with
// startReplication.
// This method should not be called under normal circumstances
performCachedLargeMessageDeletes();
}
finally
{
storageManagerLock.writeLock().unlock();
}
}

/**
* Assumption is that this is only called with a writeLock on the StorageManager.
*/
private void performCachedLargeMessageDeletes()
{
for (Long largeMsgId : largeMessagesToDelete)
{
SequentialFile msg = createFileForLargeMessage(largeMsgId, LargeMessageExtension.DURABLE);
try
{
msg.delete();
}
catch (Exception e)
{
HornetQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId);
}
if (replicator != null)
{
replicator.largeMessageDelete(largeMsgId);
}
}
largeMessagesToDelete.clear();
}

/**
* @param pageFilesToSync
* @throws Exception
Expand Down Expand Up @@ -581,8 +611,8 @@ private long getLargeMessageIdFromFilename(String filename)
private void getLargeMessageInformation() throws Exception
{
Map<Long, Pair<String, Long>> largeMessages = new HashMap<Long, Pair<String, Long>>();
// only send durable messages...
List<String> filenames = largeMessagesFactory.listFiles(LargeMessageExtension.DURABLE.getExtension());
// only send durable messages... // listFiles append a "." to anything...
List<String> filenames = largeMessagesFactory.listFiles("msg");

List<Long> idList = new ArrayList<Long>();
for (String filename : filenames)
Expand Down Expand Up @@ -2248,10 +2278,12 @@ public synchronized void stop(boolean ioCriticalError) throws Exception
return;
}

if (!ioCriticalError && journalLoaded && idGenerator != null)
if (!ioCriticalError)
{
performCachedLargeMessageDeletes();
// Must call close to make sure last id is persisted
idGenerator.persistCurrentID();
if (journalLoaded && idGenerator != null)
idGenerator.persistCurrentID();
}

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -2347,13 +2379,33 @@ public Journal getBindingsJournal()
// Package protected ---------------------------------------------

// This should be accessed from this package only
void deleteLargeMessageFile(final LargeServerMessage largeServerMessageImpl) throws HornetQException
void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws HornetQException
{
final SequentialFile file = largeServerMessageImpl.getFile();
final SequentialFile file = largeServerMessage.getFile();
if (file == null)
{
return;
}

if (largeServerMessage.isDurable() && isReplicated())
{
readLock();
try
{
if (isReplicated() && replicator.isSynchronizing())
{
synchronized (largeMessagesToDelete)
{
largeMessagesToDelete.add(Long.valueOf(largeServerMessage.getMessageID()));
}
return;
}
}
finally
{
readUnLock();
}
}
Runnable deleteAction = new Runnable()
{
public void run()
Expand All @@ -2365,7 +2417,7 @@ public void run()
{
if (replicator != null)
{
replicator.largeMessageDelete(largeServerMessageImpl.getMessageID());
replicator.largeMessageDelete(largeServerMessage.getMessageID());
}
file.delete();
}
Expand All @@ -2376,7 +2428,7 @@ public void run()
}
catch (Exception e)
{
HornetQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeServerMessageImpl.getMessageID());
HornetQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeServerMessage.getMessageID());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,7 @@ private void handleStartReplicationSynchronization(final ReplicationStartSyncMes

private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
{
HornetQServerLogger.LOGGER.info("REND LM DELETE " + packet.getMessageId());
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true);

if (message != null)
{
executor.execute(new Runnable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;

import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQExceptionType;
import org.hornetq.api.core.HornetQInterruptedException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.SessionFailureListener;
Expand Down Expand Up @@ -126,10 +124,6 @@ public static ADD_OPERATION_TYPE toOperation(boolean isUpdate)

private volatile boolean inSync = true;

private Long largeMessageCurrentlySendingId = Long.valueOf(-1);
private volatile boolean largeMessageInterruptSend = false;
private final Semaphore largeMessageSemaphore = new Semaphore(1);

/**
* @param remotingConnection
*/
Expand Down Expand Up @@ -258,29 +252,6 @@ public void largeMessageDelete(final Long messageId)
{
if (enabled)
{
if (inSync)
{
synchronized (largeMessageSyncGuard)
{
if (largeMessageCurrentlySendingId.equals(messageId))
{
// deal with it
largeMessageInterruptSend = true;
try {
largeMessageSemaphore.acquire();
largeMessageSemaphore.release();
}
catch (InterruptedException e)
{
throw new HornetQInterruptedException(e);
}
}
else if (largeMessagesToSync.containsKey(messageId))
{
largeMessagesToSync.remove(messageId);
}
}
}
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId));
}
}
Expand Down Expand Up @@ -531,7 +502,6 @@ public void syncJournalFile(JournalFile jf, JournalContent content) throws Excep
*/
public Map.Entry<Long, Pair<String, Long>> getNextLargeMessageToSync()
{

synchronized (largeMessageSyncGuard)
{
Iterator<Entry<Long, Pair<String, Long>>> iter = largeMessagesToSync.entrySet().iterator();
Expand All @@ -542,7 +512,6 @@ public Map.Entry<Long, Pair<String, Long>> getNextLargeMessageToSync()

Entry<Long, Pair<String, Long>> entry = iter.next();
iter.remove();
largeMessageCurrentlySendingId = entry.getKey();
return entry;
}
}
Expand All @@ -551,19 +520,7 @@ public void syncLargeMessageFile(SequentialFile file, long size, long id) throws
{
if (enabled)
{
largeMessageSemaphore.acquire();
try
{
sendLargeFile(null, null, id, file, size);
}
finally
{
largeMessageSemaphore.release();
synchronized (largeMessageSyncGuard)
{
largeMessageCurrentlySendingId = Long.valueOf(-1);
}
}
sendLargeFile(null, null, id, file, size);
}
}

Expand Down Expand Up @@ -620,13 +577,6 @@ private void sendLargeFile(JournalContent content, SimpleString pageStore, final
}
buffer.rewind();

if (largeMessageInterruptSend)
{
// if the file is open at the backup, this will close it
sendReplicatePacket(new ReplicationSyncFileMessage(null, null, id, -1, null));
return;
}

// sending -1 or 0 bytes will close the file at the backup
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
Expand Down Expand Up @@ -718,4 +668,12 @@ public CoreRemotingConnection getBackupTransportConnection()
{
return remotingConnection;
}

/**
* @return
*/
public boolean isSynchronizing()
{
return inSync;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import junit.framework.Assert;
Expand Down Expand Up @@ -63,31 +64,10 @@ public void testDeleteLargeMessages() throws Exception
assertEquals("we really ought to delete these after delivery", target, getAllMessageFileIds(dir).size());
}

public void testDeleteLargeMessagesDuringSync() throws Exception
{
File backupLMDir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
File liveLMDir = new File(liveServer.getServer().getConfiguration().getLargeMessagesDirectory());
assertEquals("Should not have any large messages... previous test failed to clean up?", 0,
getAllMessageFileIds(backupLMDir).size());
createProducerSendSomeMessages();

backupServer.start();
waitForComponent(backupServer.getServer(), 5);
receiveMsgsInRange(0, getNumberOfMessages() / 2);

finishSyncAndFailover();
backupServer.stop();
Set<Long> backupLM = getAllMessageFileIds(backupLMDir);
Set<Long> liveLM = getAllMessageFileIds(liveLMDir);
assertEquals("live and backup should have the same files ", liveLM, backupLM);
assertEquals("we really ought to delete these after delivery: " + backupLM, getNumberOfMessages() / 2,
backupLM.size());
}

/**
* @throws Exception
*/
public void testDeleteLargeMessagesDuringSyncLarger() throws Exception
public void testDeleteLargeMessagesDuringSync() throws Exception
{
setNumberOfMessages(200);
File backupLMdir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
Expand All @@ -100,7 +80,10 @@ public void testDeleteLargeMessagesDuringSyncLarger() throws Exception
waitForComponent(backupServer.getServer(), 5);
receiveMsgsInRange(0, getNumberOfMessages() / 2);

finishSyncAndFailover();
startBackupFinishSyncing();
Thread.sleep(500);
liveServer.getServer().stop();
backupServer.getServer().waitForActivation(10, TimeUnit.SECONDS);
backupServer.stop();

Set<Long> backupLM = getAllMessageFileIds(backupLMdir);
Expand Down

0 comments on commit 03c56b8

Please sign in to comment.