Skip to content

Commit 28f67ff

Browse files
[HUDI-7551] Avoid loading all partitions in CleanPlanner when MDT is enabled (apache#10928)
1 parent 136d075 commit 28f67ff

File tree

10 files changed

+164
-17
lines changed

10 files changed

+164
-17
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java

+6
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Map;
4949
import java.util.stream.Collectors;
5050

51+
import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
5152
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
5253
import static org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
5354

@@ -122,10 +123,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
122123

123124
Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
124125
List<String> partitionsToDelete = new ArrayList<>();
126+
boolean shouldUseBatchLookup = shouldUseBatchLookup(table.getMetaClient().getTableConfig(), config);
125127
for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
126128
// Handles at most 'cleanerParallelism' number of partitions once at a time to avoid overlarge memory pressure to the timeline server
127129
// (remote or local embedded), thus to reduce the risk of an OOM exception.
128130
List<String> subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size()));
131+
if (shouldUseBatchLookup) {
132+
LOG.info("Load partitions and files into file system view in advance. Paths: {}", subPartitionsToClean);
133+
table.getHoodieView().loadPartitions(subPartitionsToClean);
134+
}
129135
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
130136
.map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
131137
.stream()

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java

+3-10
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@
6464
import java.util.stream.Collectors;
6565
import java.util.stream.Stream;
6666

67-
import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
68-
6967
/**
7068
* Cleaner is responsible for garbage collecting older files in a given partition path. Such that
7169
* <p>
@@ -108,14 +106,9 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieT
108106
.map(entry -> Pair.of(new HoodieFileGroupId(entry.getValue().getPartitionPath(), entry.getValue().getFileId()), entry.getValue()))
109107
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
110108

111-
// load all partitions in advance if necessary.
112-
if (shouldUseBatchLookup(hoodieTable.getMetaClient().getTableConfig(), config)) {
113-
LOG.info("Load all partitions and files into file system view in advance.");
114-
fileSystemView.loadAllPartitions();
115-
}
116-
// collect savepointed timestamps to be assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
117-
this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
118-
: Collections.EMPTY_LIST);
109+
// collect savepointed timestamps to assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
110+
this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.emptyList() : (hoodieTable.isPartitioned() ? new ArrayList<>(hoodieTable.getSavepointTimestamps())
111+
: Collections.emptyList());
119112
}
120113

121114
/**

hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -801,11 +801,20 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commi
801801
}
802802

803803
@Override
804-
public Void loadAllPartitions() {
804+
public void loadAllPartitions() {
805805
try {
806806
readLock.lock();
807807
ensureAllPartitionsLoadedCorrectly();
808-
return null;
808+
} finally {
809+
readLock.unlock();
810+
}
811+
}
812+
813+
@Override
814+
public void loadPartitions(List<String> partitionPaths) {
815+
try {
816+
readLock.lock();
817+
ensurePartitionsLoadedCorrectly(partitionPaths);
809818
} finally {
810819
readLock.unlock();
811820
}

hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,29 @@ public Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commitsToRe
168168
}
169169

170170
@Override
171-
public Void loadAllPartitions() {
172-
return execute(preferredView::loadAllPartitions, secondaryView::loadAllPartitions);
171+
public void loadAllPartitions() {
172+
execute(
173+
() -> {
174+
preferredView.loadAllPartitions();
175+
return null;
176+
},
177+
() -> {
178+
secondaryView.loadAllPartitions();
179+
return null;
180+
});
181+
}
182+
183+
@Override
184+
public void loadPartitions(List<String> partitionPaths) {
185+
execute(
186+
() -> {
187+
preferredView.loadPartitions(partitionPaths);
188+
return null;
189+
},
190+
() -> {
191+
secondaryView.loadPartitions(partitionPaths);
192+
return null;
193+
});
173194
}
174195

175196
@Override

hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,10 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
127127
// POST Requests
128128
public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL, "refresh/");
129129
public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadallpartitions/");
130+
public static final String LOAD_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadpartitions/");
130131

131132
public static final String PARTITION_PARAM = "partition";
133+
public static final String PARTITIONS_PARAM = "partitions";
132134
public static final String BASEPATH_PARAM = "basepath";
133135
public static final String INSTANT_PARAM = "instant";
134136
public static final String MAX_INSTANT_PARAM = "maxinstant";
@@ -526,11 +528,21 @@ public boolean refresh() {
526528
}
527529

528530
@Override
529-
public Void loadAllPartitions() {
531+
public void loadAllPartitions() {
530532
Map<String, String> paramsMap = getParams();
531533
try {
532534
executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
533-
return null;
535+
} catch (IOException e) {
536+
throw new HoodieRemoteException(e);
537+
}
538+
}
539+
540+
@Override
541+
public void loadPartitions(List<String> partitionPaths) {
542+
try {
543+
Map<String, String> paramsMap = getParams();
544+
paramsMap.put(PARTITIONS_PARAM, OBJECT_MAPPER.writeValueAsString(partitionPaths));
545+
executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
534546
} catch (IOException e) {
535547
throw new HoodieRemoteException(e);
536548
}

hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -246,5 +246,11 @@ interface SliceView extends SliceViewWithLatestSlice {
246246
/**
247247
* Load all partition and file slices into view
248248
*/
249-
Void loadAllPartitions();
249+
void loadAllPartitions();
250+
251+
/**
252+
* Load all partition and file slices into view for the provided partition paths
253+
* @param partitionPaths List of partition paths to load
254+
*/
255+
void loadPartitions(List<String> partitionPaths);
250256
}

hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java

+55
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,61 @@ void testFileSlicingWithMultipleDeltaWriters() throws Exception {
633633
assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order check");
634634
}
635635

636+
@Test
637+
void testLoadPartitions_unPartitioned() throws Exception {
638+
String partitionPath = "";
639+
Paths.get(basePath, partitionPath).toFile().mkdirs();
640+
String fileId = UUID.randomUUID().toString();
641+
642+
String instantTime1 = "1";
643+
String fileName1 =
644+
FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN);
645+
646+
Paths.get(basePath, partitionPath, fileName1).toFile().createNewFile();
647+
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
648+
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);
649+
650+
saveAsComplete(commitTimeline, instant1, Option.empty());
651+
refreshFsView();
652+
653+
// Assert that no base files are returned without the partitions being loaded
654+
assertEquals(0, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
655+
// Assert that load does not fail for un-partitioned tables
656+
fsView.loadPartitions(Collections.singletonList(partitionPath));
657+
// Assert that base files are returned after the empty-string partition is loaded
658+
assertEquals(1, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
659+
}
660+
661+
@Test
662+
void testLoadPartitions_partitioned() throws Exception {
663+
String partitionPath1 = "2016/05/01";
664+
String partitionPath2 = "2016/05/02";
665+
Paths.get(basePath, partitionPath1).toFile().mkdirs();
666+
Paths.get(basePath, partitionPath2).toFile().mkdirs();
667+
String fileId1 = UUID.randomUUID().toString();
668+
String fileId2 = UUID.randomUUID().toString();
669+
String instantTime1 = "1";
670+
String fileName1 =
671+
FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN);
672+
String fileName2 =
673+
FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN);
674+
675+
Paths.get(basePath, partitionPath1, fileName1).toFile().createNewFile();
676+
Paths.get(basePath, partitionPath2, fileName2).toFile().createNewFile();
677+
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
678+
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);
679+
680+
saveAsComplete(commitTimeline, instant1, Option.empty());
681+
refreshFsView();
682+
683+
// Assert that no base files are returned without the partitions being loaded
684+
assertEquals(0, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
685+
// Only load a single partition path
686+
fsView.loadPartitions(Collections.singletonList(partitionPath1));
687+
// Assert that base file is returned for partitionPath1 only
688+
assertEquals(1, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
689+
}
690+
636691
/**
637692
* Returns all file-slices including uncommitted ones.
638693
*

hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java

+24
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
import static org.junit.jupiter.api.Assertions.assertEquals;
5454
import static org.junit.jupiter.api.Assertions.assertThrows;
5555
import static org.junit.jupiter.api.Assertions.assertTrue;
56+
import static org.mockito.ArgumentMatchers.any;
57+
import static org.mockito.Mockito.doThrow;
58+
import static org.mockito.Mockito.never;
5659
import static org.mockito.Mockito.reset;
5760
import static org.mockito.Mockito.times;
5861
import static org.mockito.Mockito.verify;
@@ -698,6 +701,27 @@ public void testGetLatestFileSlice() {
698701
});
699702
}
700703

704+
@Test
705+
public void testLoadPartitions() {
706+
String partitionPath = "/table2";
707+
708+
fsView.loadPartitions(Collections.singletonList(partitionPath));
709+
verify(primary, times(1)).loadPartitions(Collections.singletonList(partitionPath));
710+
verify(secondary, never()).loadPartitions(any());
711+
712+
resetMocks();
713+
doThrow(new RuntimeException()).when(primary).loadPartitions(Collections.singletonList(partitionPath));
714+
fsView.loadPartitions(Collections.singletonList(partitionPath));
715+
verify(primary, times(1)).loadPartitions(Collections.singletonList(partitionPath));
716+
verify(secondary, times(1)).loadPartitions(Collections.singletonList(partitionPath));
717+
718+
resetMocks();
719+
doThrow(new RuntimeException()).when(secondary).loadPartitions(Collections.singletonList(partitionPath));
720+
assertThrows(RuntimeException.class, () -> {
721+
fsView.loadPartitions(Collections.singletonList(partitionPath));
722+
});
723+
}
724+
701725
@Test
702726
public void testGetPreferredView() {
703727
assertEquals(primary, fsView.getPreferredView());

hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java

+16
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@
3737
import org.apache.hudi.common.util.HoodieTimer;
3838
import org.apache.hudi.common.util.Option;
3939
import org.apache.hudi.exception.HoodieException;
40+
import org.apache.hudi.exception.HoodieIOException;
4041
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
4142
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
4243
import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
4344
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
4445
import org.apache.hudi.timeline.service.handlers.TimelineHandler;
4546

4647
import com.fasterxml.jackson.core.JsonProcessingException;
48+
import com.fasterxml.jackson.core.type.TypeReference;
4749
import com.fasterxml.jackson.databind.ObjectMapper;
4850
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
4951
import io.javalin.Javalin;
@@ -72,6 +74,7 @@ public class RequestHandler {
7274

7375
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
7476
private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class);
77+
private static final TypeReference<List<String>> LIST_TYPE_REFERENCE = new TypeReference<List<String>>() {};
7578

7679
private final TimelineService.Config timelineServiceConfig;
7780
private final FileSystemViewManager viewManager;
@@ -444,6 +447,19 @@ private void registerFileSlicesAPI() {
444447
writeValueAsString(ctx, success);
445448
}, false));
446449

450+
app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new ViewHandler(ctx -> {
451+
metricsRegistry.add("LOAD_PARTITIONS", 1);
452+
String basePath = ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
453+
try {
454+
List<String> partitionPaths = OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM, String.class)
455+
.getOrThrow(e -> new HoodieException("Partitions param is invalid")), LIST_TYPE_REFERENCE);
456+
boolean success = sliceHandler.loadPartitions(basePath, partitionPaths);
457+
writeValueAsString(ctx, success);
458+
} catch (IOException e) {
459+
throw new HoodieIOException("Failed to parse request parameter", e);
460+
}
461+
}, false));
462+
447463
app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new ViewHandler(ctx -> {
448464
metricsRegistry.add("LOAD_ALL_PARTITIONS", 1);
449465
boolean success = sliceHandler

hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java

+5
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,9 @@ public boolean loadAllPartitions(String basePath) {
163163
viewManager.getFileSystemView(basePath).loadAllPartitions();
164164
return true;
165165
}
166+
167+
public boolean loadPartitions(String basePath, List<String> partitionPaths) {
168+
viewManager.getFileSystemView(basePath).loadPartitions(partitionPaths);
169+
return true;
170+
}
166171
}

0 commit comments

Comments
 (0)