From 03c56b8e202c47ef389d49b9dec233ce9011297a Mon Sep 17 00:00:00 2001 From: Francisco Borges Date: Mon, 25 Mar 2013 11:53:01 +0100 Subject: [PATCH] HORNETQ-720 Remove the concurrent largeMessage deletion, add a stateful cache :-S --- .../core/journal/SequentialFileFactory.java | 8 +++ .../impl/journal/JournalStorageManager.java | 72 ++++++++++++++++--- .../core/replication/ReplicationEndpoint.java | 2 - .../core/replication/ReplicationManager.java | 60 +++------------- .../failover/BackupSyncLargeMessageTest.java | 29 ++------ 5 files changed, 85 insertions(+), 86 deletions(-) diff --git a/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java b/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java index ddd1f725993..91611ba630c 100644 --- a/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java +++ b/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java @@ -28,6 +28,14 @@ public interface SequentialFileFactory { SequentialFile createSequentialFile(String fileName, int maxIO); + /** + * Lists files that end with the given extension. + *

+ * This method inserts a ".' before the extension. + * @param extension + * @return + * @throws Exception + */ List listFiles(String extension) throws Exception; boolean isSupportsCallbacks(); diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java index 371eb1a2357..c5a169a3d82 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -214,6 +215,8 @@ public static JournalContent getType(byte type) private final Map mapPersistedAddressSettings = new ConcurrentHashMap(); + private final Set largeMessagesToDelete = new HashSet(); + public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) { this(config, executorFactory, null); @@ -433,8 +436,8 @@ public void startReplication(ReplicationManager replicationManager, PagingManage if (replicator != null) { replicator.sendSynchronizationDone(nodeID); + performCachedLargeMessageDeletes(); } - // XXX HORNETQ-720 SEND a compare journals message? } finally { @@ -485,7 +488,6 @@ public static String md5(File file) @Override public void stopReplication() { - storageManagerLock.writeLock().lock(); try { @@ -502,6 +504,10 @@ public void stopReplication() HornetQServerLogger.LOGGER.errorStoppingReplicationManager(e); } replicator = null; + // delete inside the writeLock. Avoids a lot of state checking and races with + // startReplication. + // This method should not be called under normal circumstances + performCachedLargeMessageDeletes(); } finally { @@ -509,6 +515,30 @@ public void stopReplication() } } + /** + * Assumption is that this is only called with a writeLock on the StorageManager. + */ + private void performCachedLargeMessageDeletes() + { + for (Long largeMsgId : largeMessagesToDelete) + { + SequentialFile msg = createFileForLargeMessage(largeMsgId, LargeMessageExtension.DURABLE); + try + { + msg.delete(); + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId); + } + if (replicator != null) + { + replicator.largeMessageDelete(largeMsgId); + } + } + largeMessagesToDelete.clear(); + } + /** * @param pageFilesToSync * @throws Exception @@ -581,8 +611,8 @@ private long getLargeMessageIdFromFilename(String filename) private void getLargeMessageInformation() throws Exception { Map> largeMessages = new HashMap>(); - // only send durable messages... - List filenames = largeMessagesFactory.listFiles(LargeMessageExtension.DURABLE.getExtension()); + // only send durable messages... // listFiles append a "." to anything... + List filenames = largeMessagesFactory.listFiles("msg"); List idList = new ArrayList(); for (String filename : filenames) @@ -2248,10 +2278,12 @@ public synchronized void stop(boolean ioCriticalError) throws Exception return; } - if (!ioCriticalError && journalLoaded && idGenerator != null) + if (!ioCriticalError) { + performCachedLargeMessageDeletes(); // Must call close to make sure last id is persisted - idGenerator.persistCurrentID(); + if (journalLoaded && idGenerator != null) + idGenerator.persistCurrentID(); } final CountDownLatch latch = new CountDownLatch(1); @@ -2347,13 +2379,33 @@ public Journal getBindingsJournal() // Package protected --------------------------------------------- // This should be accessed from this package only - void deleteLargeMessageFile(final LargeServerMessage largeServerMessageImpl) throws HornetQException + void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws HornetQException { - final SequentialFile file = largeServerMessageImpl.getFile(); + final SequentialFile file = largeServerMessage.getFile(); if (file == null) { return; } + + if (largeServerMessage.isDurable() && isReplicated()) + { + readLock(); + try + { + if (isReplicated() && replicator.isSynchronizing()) + { + synchronized (largeMessagesToDelete) + { + largeMessagesToDelete.add(Long.valueOf(largeServerMessage.getMessageID())); + } + return; + } + } + finally + { + readUnLock(); + } + } Runnable deleteAction = new Runnable() { public void run() @@ -2365,7 +2417,7 @@ public void run() { if (replicator != null) { - replicator.largeMessageDelete(largeServerMessageImpl.getMessageID()); + replicator.largeMessageDelete(largeServerMessage.getMessageID()); } file.delete(); } @@ -2376,7 +2428,7 @@ public void run() } catch (Exception e) { - HornetQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeServerMessageImpl.getMessageID()); + HornetQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeServerMessage.getMessageID()); } } diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java index 4c87904a45e..370656e0ea2 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java +++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java @@ -606,9 +606,7 @@ private void handleStartReplicationSynchronization(final ReplicationStartSyncMes private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) { - HornetQServerLogger.LOGGER.info("REND LM DELETE " + packet.getMessageId()); final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true); - if (message != null) { executor.execute(new Runnable() diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java index e17628a5f92..50aadd5fae1 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java @@ -25,12 +25,10 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Semaphore; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.HornetQExceptionType; -import org.hornetq.api.core.HornetQInterruptedException; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.SessionFailureListener; @@ -126,10 +124,6 @@ public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) private volatile boolean inSync = true; - private Long largeMessageCurrentlySendingId = Long.valueOf(-1); - private volatile boolean largeMessageInterruptSend = false; - private final Semaphore largeMessageSemaphore = new Semaphore(1); - /** * @param remotingConnection */ @@ -258,29 +252,6 @@ public void largeMessageDelete(final Long messageId) { if (enabled) { - if (inSync) - { - synchronized (largeMessageSyncGuard) - { - if (largeMessageCurrentlySendingId.equals(messageId)) - { - // deal with it - largeMessageInterruptSend = true; - try { - largeMessageSemaphore.acquire(); - largeMessageSemaphore.release(); - } - catch (InterruptedException e) - { - throw new HornetQInterruptedException(e); - } - } - else if (largeMessagesToSync.containsKey(messageId)) - { - largeMessagesToSync.remove(messageId); - } - } - } sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId)); } } @@ -531,7 +502,6 @@ public void syncJournalFile(JournalFile jf, JournalContent content) throws Excep */ public Map.Entry> getNextLargeMessageToSync() { - synchronized (largeMessageSyncGuard) { Iterator>> iter = largeMessagesToSync.entrySet().iterator(); @@ -542,7 +512,6 @@ public Map.Entry> getNextLargeMessageToSync() Entry> entry = iter.next(); iter.remove(); - largeMessageCurrentlySendingId = entry.getKey(); return entry; } } @@ -551,19 +520,7 @@ public void syncLargeMessageFile(SequentialFile file, long size, long id) throws { if (enabled) { - largeMessageSemaphore.acquire(); - try - { - sendLargeFile(null, null, id, file, size); - } - finally - { - largeMessageSemaphore.release(); - synchronized (largeMessageSyncGuard) - { - largeMessageCurrentlySendingId = Long.valueOf(-1); - } - } + sendLargeFile(null, null, id, file, size); } } @@ -620,13 +577,6 @@ private void sendLargeFile(JournalContent content, SimpleString pageStore, final } buffer.rewind(); - if (largeMessageInterruptSend) - { - // if the file is open at the backup, this will close it - sendReplicatePacket(new ReplicationSyncFileMessage(null, null, id, -1, null)); - return; - } - // sending -1 or 0 bytes will close the file at the backup sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer)); if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) @@ -718,4 +668,12 @@ public CoreRemotingConnection getBackupTransportConnection() { return remotingConnection; } + + /** + * @return + */ + public boolean isSynchronizing() + { + return inSync; + } } diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java index d18cf23bd74..8b35d437aa2 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java @@ -5,6 +5,7 @@ import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -63,31 +64,10 @@ public void testDeleteLargeMessages() throws Exception assertEquals("we really ought to delete these after delivery", target, getAllMessageFileIds(dir).size()); } - public void testDeleteLargeMessagesDuringSync() throws Exception - { - File backupLMDir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory()); - File liveLMDir = new File(liveServer.getServer().getConfiguration().getLargeMessagesDirectory()); - assertEquals("Should not have any large messages... previous test failed to clean up?", 0, - getAllMessageFileIds(backupLMDir).size()); - createProducerSendSomeMessages(); - - backupServer.start(); - waitForComponent(backupServer.getServer(), 5); - receiveMsgsInRange(0, getNumberOfMessages() / 2); - - finishSyncAndFailover(); - backupServer.stop(); - Set backupLM = getAllMessageFileIds(backupLMDir); - Set liveLM = getAllMessageFileIds(liveLMDir); - assertEquals("live and backup should have the same files ", liveLM, backupLM); - assertEquals("we really ought to delete these after delivery: " + backupLM, getNumberOfMessages() / 2, - backupLM.size()); - } - /** * @throws Exception */ - public void testDeleteLargeMessagesDuringSyncLarger() throws Exception + public void testDeleteLargeMessagesDuringSync() throws Exception { setNumberOfMessages(200); File backupLMdir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory()); @@ -100,7 +80,10 @@ public void testDeleteLargeMessagesDuringSyncLarger() throws Exception waitForComponent(backupServer.getServer(), 5); receiveMsgsInRange(0, getNumberOfMessages() / 2); - finishSyncAndFailover(); + startBackupFinishSyncing(); + Thread.sleep(500); + liveServer.getServer().stop(); + backupServer.getServer().waitForActivation(10, TimeUnit.SECONDS); backupServer.stop(); Set backupLM = getAllMessageFileIds(backupLMdir);