Skip to content

Commit 9c61c4c

Browse files
[HUDI-7575] Avoid repeated fetching of pending replace instants (apache#10976)
1 parent e5e635f commit 9c61c4c

File tree

1 file changed

+24
-8
lines changed

1 file changed

+24
-8
lines changed

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
6363
private List<HoodieInstant> instants;
6464
// for efficient #contains queries.
6565
private transient volatile Set<String> instantTimeSet;
66+
// for efficient #isPendingClusterInstant queries
67+
private transient volatile Set<String> pendingReplaceClusteringInstants;
6668
// for efficient #isBeforeTimelineStarts check.
6769
private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
6870
private String timelineHash;
@@ -540,14 +542,7 @@ private Option<HoodieInstant> getLastOrFirstPendingClusterInstant(boolean isLast
540542

541543
@Override
542544
public boolean isPendingClusterInstant(String instantTime) {
543-
HoodieTimeline potentialTimeline = getCommitsTimeline().filterPendingReplaceTimeline().filter(i -> i.getTimestamp().equals(instantTime));
544-
if (potentialTimeline.countInstants() == 0) {
545-
return false;
546-
}
547-
if (potentialTimeline.countInstants() > 1) {
548-
throw new IllegalStateException("Multiple instants with same timestamp: " + potentialTimeline);
549-
}
550-
return ClusteringUtils.isClusteringInstant(this, potentialTimeline.firstInstant().get());
545+
return getOrCreatePendingClusteringInstantSet().contains(instantTime);
551546
}
552547

553548
@Override
@@ -576,6 +571,27 @@ private Set<String> getOrCreateInstantSet() {
576571
return this.instantTimeSet;
577572
}
578573

574+
private Set<String> getOrCreatePendingClusteringInstantSet() {
575+
if (this.pendingReplaceClusteringInstants == null) {
576+
synchronized (this) {
577+
if (this.pendingReplaceClusteringInstants == null) {
578+
List<HoodieInstant> pendingReplaceInstants = getCommitsTimeline().filterPendingReplaceTimeline().getInstants();
579+
// Validate that there are no instants with same timestamp
580+
pendingReplaceInstants.stream().collect(Collectors.groupingBy(HoodieInstant::getTimestamp)).forEach((timestamp, instants) -> {
581+
if (instants.size() > 1) {
582+
throw new IllegalStateException("Multiple instants with same timestamp: " + timestamp + " instants: " + instants);
583+
}
584+
});
585+
// Filter replace commits down to those that are due to clustering
586+
this.pendingReplaceClusteringInstants = pendingReplaceInstants.stream()
587+
.filter(instant -> ClusteringUtils.isClusteringInstant(this, instant))
588+
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
589+
}
590+
}
591+
}
592+
return this.pendingReplaceClusteringInstants;
593+
}
594+
579595
/**
580596
* Returns the first non savepoint commit on the timeline.
581597
*/

0 commit comments

Comments
 (0)