Skip to content

Commit

Permalink
Memtable memory allocations may deadlock
Browse files Browse the repository at this point in the history
patch by Benedict; reviewed by Blake Egglestone for CASSANDRA-15367
  • Loading branch information
belliottsmith committed Apr 7, 2020
1 parent aebd33f commit e3f54d4
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
3.0.21
* Memtable memory allocations may deadlock (CASSANDRA-15367)
* Run evictFromMembership in GossipStage (CASSANDRA-15592)
Merged from 2.2:
* Disable JMX rebinding (CASSANDRA-15653)
Expand Down
5 changes: 2 additions & 3 deletions src/java/org/apache/cassandra/db/Memtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
Expand All @@ -70,7 +69,7 @@ public class Memtable implements Comparable<Memtable>
private final AtomicLong liveDataSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);

// the write barrier for directing writes to this memtable during a switch
// the write barrier for directing writes to this memtable or the next during a switch
private volatile OpOrder.Barrier writeBarrier;
// the precise upper bound of ReplayPosition owned by this memtable
private volatile AtomicReference<ReplayPosition> commitLogUpperBound;
Expand Down Expand Up @@ -381,7 +380,7 @@ private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
continue;

if (trackContention && partition.usePessimisticLocking())
if (trackContention && partition.useLock())
heavilyContendedRowCount++;

if (!partition.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,7 @@ public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group wr
boolean monitorOwned = false;
try
{
if (usePessimisticLocking())
{
Locks.monitorEnterUnsafe(this);
monitorOwned = true;
}

monitorOwned = maybeLock(writeOp);
indexer.start();

while (true)
Expand Down Expand Up @@ -162,16 +157,7 @@ public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group wr
}
else if (!monitorOwned)
{
boolean shouldLock = usePessimisticLocking();
if (!shouldLock)
{
shouldLock = updateWastedAllocationTracker(updater.heapSize);
}
if (shouldLock)
{
Locks.monitorEnterUnsafe(this);
monitorOwned = true;
}
monitorOwned = maybeLock(updater.heapSize, writeOp);
}
}
}
Expand All @@ -181,10 +167,38 @@ else if (!monitorOwned)
if (monitorOwned)
Locks.monitorExitUnsafe(this);
}
}

private boolean maybeLock(OpOrder.Group writeOp)
{
if (!useLock())
return false;

return lockIfOldest(writeOp);
}

private boolean maybeLock(long addWaste, OpOrder.Group writeOp)
{
if (!updateWastedAllocationTracker(addWaste))
return false;

return lockIfOldest(writeOp);
}

private boolean lockIfOldest(OpOrder.Group writeOp)
{
if (!writeOp.isOldestLiveGroup())
{
Thread.yield();
if (!writeOp.isOldestLiveGroup())
return false;
}

Locks.monitorEnterUnsafe(this);
return true;
}

public boolean usePessimisticLocking()
public boolean useLock()
{
return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
}
Expand Down
58 changes: 34 additions & 24 deletions src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,37 @@ else if (runningUpdater.compareAndSet(this, current, current - 1))
}
}

public boolean isFinished()
{
return next.prev == null;
}

public boolean isOldestLiveGroup()
{
return prev == null;
}

public void await()
{
while (!isFinished())
{
WaitQueue.Signal signal = waiting.register();
if (isFinished())
{
signal.cancel();
return;
}
else
signal.awaitUninterruptibly();
}
assert running == FINISHED;
}

public OpOrder.Group prev()
{
return prev;
}

/**
* called once we know all operations started against this Ordered have completed,
* however we do not know if operations against its ancestors have completed, or
Expand Down Expand Up @@ -390,35 +421,14 @@ public WaitQueue.Signal register()
}

/**
* @return true if all operations started prior to barrier.issue() have completed
* wait for all operations started prior to issuing the barrier to complete
*/
public boolean allPriorOpsAreFinished()
public void await()
{
Group current = orderOnOrBefore;
if (current == null)
throw new IllegalStateException("This barrier needs to have issue() called on it before prior operations can complete");
if (current.next.prev == null)
return true;
return false;
}

/**
* wait for all operations started prior to issuing the barrier to complete
*/
public void await()
{
while (!allPriorOpsAreFinished())
{
WaitQueue.Signal signal = register();
if (allPriorOpsAreFinished())
{
signal.cancel();
return;
}
else
signal.awaitUninterruptibly();
}
assert orderOnOrBefore.running == FINISHED;
current.await();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public void allocate(long size, OpOrder.Group opGroup)
acquired(size);
return;
}
if (opGroup.isBlocking())
{
allocated(size);
return;
}
WaitQueue.Signal signal = opGroup.isBlockingSignal(parent.hasRoom().register(parent.blockedTimerContext()));
boolean allocated = parent.tryAllocate(size);
if (allocated || opGroup.isBlocking())
Expand Down
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/index/CustomIndexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ public void indexBuildingPagesLargePartitions() throws Throwable
assertTrue(index.writeGroups.size() > 1);
assertFalse(index.readOrderingAtFinish.isBlocking());
index.writeGroups.forEach(group -> assertFalse(group.isBlocking()));
index.barriers.forEach(OpOrder.Barrier::allPriorOpsAreFinished);
index.barriers.forEach(b -> assertTrue(b.getSyncPoint().isFinished()));
}

@Test
Expand Down

0 comments on commit e3f54d4

Please sign in to comment.