Skip to content

Commit

Permalink
[core] Introduce gentle lookup compaction mode to reduce overall comp…
Browse files Browse the repository at this point in the history
…action frequency
  • Loading branch information
xiangyuf committed Feb 28, 2025
1 parent 1d128b7 commit e5af0e5
Show file tree
Hide file tree
Showing 17 changed files with 545 additions and 30 deletions.
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@
<td>Integer</td>
<td>The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.</td>
</tr>
<tr>
<td><h5>lookup-compact</h5></td>
<td style="word-wrap: break-word;">RADICAL</td>
<td><p>Enum</p></td>
<td>Lookup compact mode used for lookup compaction.<br /><br />Possible values:<ul><li>"RADICAL"</li><li>"GENTLE"</li></ul></td>
</tr>
<tr>
<td><h5>lookup-compact.max-interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The max interval for a gentle mode lookup compaction to be triggered. For every interval, a forced lookup compaction will be performed to flush L0 files to higher level. This option is only valid when lookup-compact mode is gentle.</td>
</tr>
<tr>
<td><h5>lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
44 changes: 44 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,21 @@ public class CoreOptions implements Serializable {
.withDescription(
"When need to lookup, commit will wait for compaction by lookup.");

public static final ConfigOption<LookupCompactMode> LOOKUP_COMPACT =
key("lookup-compact")
.enumType(LookupCompactMode.class)
.defaultValue(LookupCompactMode.RADICAL)
.withDescription("Lookup compact mode used for lookup compaction.");

public static final ConfigOption<Integer> LOOKUP_COMPACT_MAX_INTERVAL =
key("lookup-compact.max-interval")
.intType()
.noDefaultValue()
.withDescription(
"The max interval for a gentle mode lookup compaction to be triggered. For every interval, "
+ "a forced lookup compaction will be performed to flush L0 files to higher level. "
+ "This option is only valid when lookup-compact mode is gentle.");

public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
key("delete-file.thread-num")
.intType()
Expand Down Expand Up @@ -2519,6 +2534,23 @@ public boolean prepareCommitWaitCompaction() {
return options.get(LOOKUP_WAIT);
}

public boolean statefulLookup() {
return needLookup()
&& (!options.get(LOOKUP_WAIT) || LookupCompactMode.GENTLE.equals(lookupCompact()));
}

public LookupCompactMode lookupCompact() {
return options.get(LOOKUP_COMPACT);
}

public int lookupCompactMaxInterval() {
Integer maxInterval = options.get(LOOKUP_COMPACT_MAX_INTERVAL);
if (maxInterval == null) {
maxInterval = MathUtils.multiplySafely(numSortedRunCompactionTrigger(), 2);
}
return Math.max(numSortedRunCompactionTrigger(), maxInterval);
}

public boolean asyncFileWrite() {
return options.get(ASYNC_FILE_WRITE);
}
Expand Down Expand Up @@ -3280,4 +3312,16 @@ public static OrderType of(String orderType) {
throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
}
}

/** The compact mode for lookup compaction. */
public enum LookupCompactMode {
/**
* Lookup compaction will use ForceUpLevel0Compaction strategy to radically compact new
* files.
*/
RADICAL,

/** Lookup compaction will use UniversalCompaction strategy to gently compact new files. */
GENTLE
}
}
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,15 @@ public static int addSafely(int a, int b) {
return Integer.MAX_VALUE;
}
}

/**
* Safely multiply the given int value by another int value, ensuring that no overflow occurs.
*/
public static int multiplySafely(int a, int b) {
try {
return Math.multiplyExact(a, b);
} catch (ArithmeticException e) {
return Integer.MAX_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Optional<CompactResult> getCompactionResult(boolean blocking)
/** Cancel currently running compaction task. */
void cancelCompaction();

/** Check if a compaction is in progress, or if a compaction result remains to be fetched. */
/**
* Check if a compaction is in progress, or if a compaction result remains to be fetched, or if
* a compaction should be triggered later.
*/
boolean isCompacting();
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private long newSequenceNumber() {
}

@VisibleForTesting
CompactManager compactManager() {
public CompactManager compactManager() {
return compactManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,6 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
return pick;
}

// collect all level 0 files
int candidateCount = 0;
for (int i = candidateCount; i < runs.size(); i++) {
if (runs.get(i).level() > 0) {
break;
}
candidateCount++;
}

return candidateCount == 0
? Optional.empty()
: Optional.of(
universal.pickForSizeRatio(numLevels - 1, runs, candidateCount, true));
return universal.forcePickL0(numLevels, runs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class MergeTreeCompactManager extends CompactFutureManager {
@Nullable private final CompactionMetrics.Reporter metricsReporter;
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
private final boolean lazyGenDeletionFile;
private final boolean needLookup;

public MergeTreeCompactManager(
ExecutorService executor,
Expand All @@ -74,7 +75,8 @@ public MergeTreeCompactManager(
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter,
@Nullable DeletionVectorsMaintainer dvMaintainer,
boolean lazyGenDeletionFile) {
boolean lazyGenDeletionFile,
boolean needLookup) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
Expand All @@ -85,6 +87,7 @@ public MergeTreeCompactManager(
this.metricsReporter = metricsReporter;
this.dvMaintainer = dvMaintainer;
this.lazyGenDeletionFile = lazyGenDeletionFile;
this.needLookup = needLookup;

MetricUtils.safeCall(this::reportMetrics, LOG);
}
Expand Down Expand Up @@ -240,6 +243,11 @@ public Optional<CompactResult> getCompactionResult(boolean blocking)
return result;
}

@Override
public boolean isCompacting() {
return super.isCompacting() || (needLookup && !levels().level0().isEmpty());
}

private void reportMetrics() {
if (metricsReporter != null) {
metricsReporter.reportLevel0FileCount(levels.level0().size());
Expand All @@ -254,4 +262,9 @@ public void close() throws IOException {
MetricUtils.safeCall(metricsReporter::unregister, LOG);
}
}

@VisibleForTesting
public CompactStrategy getStrategy() {
return strategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Universal Compaction Style is a compaction style, targeting the use cases requiring lower write
Expand All @@ -50,6 +51,9 @@ public class UniversalCompaction implements CompactStrategy {
@Nullable private final Long opCompactionInterval;
@Nullable private Long lastOptimizedCompaction;

@Nullable private final Integer maxLookupCompactInterval;
@Nullable private final AtomicInteger lookupCompactTriggerCount;

public UniversalCompaction(int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) {
this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null);
}
Expand All @@ -59,11 +63,22 @@ public UniversalCompaction(
int sizeRatio,
int numRunCompactionTrigger,
@Nullable Duration opCompactionInterval) {
this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, opCompactionInterval, null);
}

public UniversalCompaction(
int maxSizeAmp,
int sizeRatio,
int numRunCompactionTrigger,
@Nullable Duration opCompactionInterval,
@Nullable Integer maxLookupCompactInterval) {
this.maxSizeAmp = maxSizeAmp;
this.sizeRatio = sizeRatio;
this.numRunCompactionTrigger = numRunCompactionTrigger;
this.opCompactionInterval =
opCompactionInterval == null ? null : opCompactionInterval.toMillis();
this.maxLookupCompactInterval = maxLookupCompactInterval;
this.lookupCompactTriggerCount = new AtomicInteger(0);
}

@Override
Expand Down Expand Up @@ -107,9 +122,44 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
}

// 4 checking if a forced L0 compact should be triggered
if (maxLookupCompactInterval != null && lookupCompactTriggerCount != null) {
lookupCompactTriggerCount.getAndIncrement();
if (lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Universal compaction due to max lookup compaction interval {}.",
maxLookupCompactInterval);
}
return forcePickL0(numLevels, runs);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Skip universal compaction due to lookup compaction trigger count {} is less than the max interval {}.",
lookupCompactTriggerCount.get(),
maxLookupCompactInterval);
}
}
}

return Optional.empty();
}

Optional<CompactUnit> forcePickL0(int numLevels, List<LevelSortedRun> runs) {
// collect all level 0 files
int candidateCount = 0;
for (int i = candidateCount; i < runs.size(); i++) {
if (runs.get(i).level() > 0) {
break;
}
candidateCount++;
}

return candidateCount == 0
? Optional.empty()
: Optional.of(pickForSizeRatio(numLevels - 1, runs, candidateCount, true));
}

@VisibleForTesting
CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
if (runs.size() < numRunCompactionTrigger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ Function<WriterContainer<T>, Boolean> createConflictAwareWriterCleanChecker(
//
// Condition 2: No compaction is in progress. That is, no more changelog will be
// produced.
//
// Condition 3: The writer has no postponed compaction like gentle lookup compaction.
return writerContainer ->
writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier
&& !writerContainer.writer.isCompacting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,7 @@ protected MergeTreeWriter createWriter(
writerFactoryBuilder.build(partition, bucket, options);
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
UniversalCompaction universalCompaction =
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
options.optimizedCompactionInterval());
CompactStrategy compactStrategy =
options.needLookup()
? new ForceUpLevel0Compaction(universalCompaction)
: universalCompaction;
CompactStrategy compactStrategy = createCompactStrategy(options);
CompactManager compactManager =
createCompactManager(
partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
Expand All @@ -232,6 +223,32 @@ public boolean bufferSpillable() {
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
}

private CompactStrategy createCompactStrategy(CoreOptions options) {
if (options.needLookup()) {
if (CoreOptions.LookupCompactMode.RADICAL.equals(options.lookupCompact())) {
return new ForceUpLevel0Compaction(
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
options.optimizedCompactionInterval()));
} else if (CoreOptions.LookupCompactMode.GENTLE.equals(options.lookupCompact())) {
return new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
options.optimizedCompactionInterval(),
options.lookupCompactMaxInterval());
}
}

return new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
options.optimizedCompactionInterval());
}

private CompactManager createCompactManager(
BinaryRow partition,
int bucket,
Expand Down Expand Up @@ -264,7 +281,8 @@ private CompactManager createCompactManager(
? null
: compactionMetrics.createReporter(partition, bucket),
dvMaintainer,
options.prepareCommitWaitCompaction());
options.prepareCommitWaitCompaction(),
options.needLookup());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public interface RecordWriter<T> {
*/
CommitIncrement prepareCommit(boolean waitCompaction) throws Exception;

/** Check if a compaction is in progress, or if a compaction result remains to be fetched. */
/**
* Check if a compaction is in progress, or if a compaction result remains to be fetched, or if
* a compaction should be triggered later.
*/
boolean isCompacting();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,8 @@ private MergeTreeCompactManager createCompactManager(
new TestRewriter(),
null,
null,
false);
false,
options.needLookup());
}

static class MockFailResultCompactionManager extends MergeTreeCompactManager {
Expand All @@ -562,6 +563,7 @@ public MockFailResultCompactionManager(
rewriter,
null,
null,
false,
false);
}

Expand Down
Loading

0 comments on commit e5af0e5

Please sign in to comment.