Skip to content

Commit

Permalink
Merge pull request hornetq#1969 from clebertsuconic/master
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Bertram committed Dec 9, 2014
2 parents 3610059 + 5fda312 commit 75a74b9
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,8 @@ public void deliverAsync()
// no-op
scheduledRunners.decrementAndGet();
}

checkDepage();
}

}
Expand Down Expand Up @@ -2184,12 +2186,29 @@ else if (status == HandleStatus.NO_MATCH)
}
}

if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext() && !depagePending)
checkDepage();
}

private void checkDepage()
{
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext())
{
scheduleDepage(false);
}
}

/**
* This is a common check we do before scheduling depaging.. or while depaging.
* Before scheduling a depage runnable we verify if it fits / needs depaging.
* We also check for while needsDepage While depaging.
* This is just to avoid a copy & paste dependency
* @return
*/
private boolean needsDepage()
{
return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
}

private SimpleString extractGroupID(MessageReference ref)
{
if (internalQueue)
Expand Down Expand Up @@ -2264,7 +2283,7 @@ private void depage(final boolean scheduleExpiry)
this.directDeliver = false;

int depaged = 0;
while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext())
{
depaged++;
PagedReference reference = pageIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public ServerConsumerImpl(final long id,

if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5854,6 +5854,119 @@ public void testPageHole() throws Throwable

}

@Test
public void testMultiFiltersBrowsing() throws Throwable
{
internalTestMultiFilters(true);
}

@Test
public void testMultiFiltersRegularConsumer() throws Throwable
{
internalTestMultiFilters(false);
}

public void internalTestMultiFilters(boolean browsing) throws Throwable
{
clearDataRecreateServerDirs();

Configuration config = createDefaultConfig();
config.setJournalSyncNonTransactional(false);

server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());

server.start();

try
{
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnDurableSend(true);
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true, 0);

session.createQueue(ADDRESS.toString(), "Q1", null, true);

PagingStore store = server.getPagingManager().getPageStore(ADDRESS);

ClientProducer prod = session.createProducer(ADDRESS);

ClientMessage msg = null;
store.startPaging();

for (int i = 0; i < 100; i++)
{
msg = session.createMessage(true);
msg.putStringProperty("color", "red");
msg.putIntProperty("count", i);
prod.send(msg);

if (i > 0 && i % 10 == 0)
{
store.startPaging();
store.forceAnotherPage();
}
}

for (int i = 0; i < 100; i++)
{
msg = session.createMessage(true);
msg.putStringProperty("color", "green");
msg.putIntProperty("count", i);
prod.send(msg);

if (i > 0 && i % 10 == 0)
{
store.startPaging();
store.forceAnotherPage();
}
}

session.commit();

session.close();

session = sf.createSession(false, false, 0);
session.start();


ClientConsumer cons1;

if (browsing)
{
cons1 = session.createConsumer("Q1", "color='green'", true);
}
else
{
cons1 = session.createConsumer("Q1", "color='red'", false);
}

for (int i = 0; i < 100; i++)
{
msg = cons1.receive(5000);

System.out.println("Received " + msg);
assertNotNull(msg);
if (!browsing)
{
msg.acknowledge();
}
}

session.commit();

session.close();
}
finally
{
server.stop();
}

}


@Test
public void testPendingACKOutOfOrder() throws Throwable
Expand Down

0 comments on commit 75a74b9

Please sign in to comment.