@@ -109,10 +109,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
109
109
110
110
private BootstrapIndex bootstrapIndex ;
111
111
112
- private String getPartitionPathFor (HoodieBaseFile baseFile ) {
113
- return FSUtils .getRelativePartitionPath (metaClient .getBasePathV2 (), baseFile .getStoragePath ().getParent ());
114
- }
115
-
116
112
/**
117
113
* Initialize the view.
118
114
*/
@@ -158,10 +154,21 @@ public Option<String> getCompletionTime(String instantTime) {
158
154
159
155
/**
160
156
* Adds the provided statuses into the file system view, and also caches it inside this object.
157
+ * If the file statuses are limited to a single partition, use {@link #addFilesToView(String, List)} instead.
161
158
*/
162
159
public List <HoodieFileGroup > addFilesToView (List <StoragePathInfo > statuses ) {
160
+ Map <String , List <StoragePathInfo >> statusesByPartitionPath = statuses .stream ()
161
+ .collect (Collectors .groupingBy (fileStatus -> FSUtils .getRelativePartitionPath (metaClient .getBasePathV2 (), fileStatus .getPath ().getParent ())));
162
+ return statusesByPartitionPath .entrySet ().stream ().map (entry -> addFilesToView (entry .getKey (), entry .getValue ()))
163
+ .flatMap (List ::stream ).collect (Collectors .toList ());
164
+ }
165
+
166
+ /**
167
+ * Adds the provided statuses into the file system view for a single partition, and also caches it inside this object.
168
+ */
169
+ public List <HoodieFileGroup > addFilesToView (String partitionPath , List <StoragePathInfo > statuses ) {
163
170
HoodieTimer timer = HoodieTimer .start ();
164
- List <HoodieFileGroup > fileGroups = buildFileGroups (statuses , visibleCommitsAndCompactionTimeline , true );
171
+ List <HoodieFileGroup > fileGroups = buildFileGroups (partitionPath , statuses , visibleCommitsAndCompactionTimeline , true );
165
172
long fgBuildTimeTakenMs = timer .endTimer ();
166
173
timer .startTimer ();
167
174
// Group by partition for efficient updates for both InMemory and DiskBased structures.
@@ -191,37 +198,28 @@ public List<HoodieFileGroup> addFilesToView(List<StoragePathInfo> statuses) {
191
198
/**
192
199
* Build FileGroups from passed in file-status.
193
200
*/
194
- protected List <HoodieFileGroup > buildFileGroups (List <StoragePathInfo > statuses , HoodieTimeline timeline ,
201
+ protected List <HoodieFileGroup > buildFileGroups (String partition , List <StoragePathInfo > statuses , HoodieTimeline timeline ,
195
202
boolean addPendingCompactionFileSlice ) {
196
- return buildFileGroups (convertFileStatusesToBaseFiles (statuses ), convertFileStatusesToLogFiles (statuses ),
203
+ return buildFileGroups (partition , convertFileStatusesToBaseFiles (statuses ), convertFileStatusesToLogFiles (statuses ),
197
204
timeline ,
198
205
addPendingCompactionFileSlice );
199
206
}
200
207
201
- protected List <HoodieFileGroup > buildFileGroups (Stream <HoodieBaseFile > baseFileStream ,
208
+ protected List <HoodieFileGroup > buildFileGroups (String partition , Stream <HoodieBaseFile > baseFileStream ,
202
209
Stream <HoodieLogFile > logFileStream , HoodieTimeline timeline , boolean addPendingCompactionFileSlice ) {
203
- Map <Pair <String , String >, List <HoodieBaseFile >> baseFiles =
204
- baseFileStream .collect (Collectors .groupingBy (baseFile -> {
205
- String partitionPathStr = getPartitionPathFor (baseFile );
206
- return Pair .of (partitionPathStr , baseFile .getFileId ());
207
- }));
208
-
209
- Map <Pair <String , String >, List <HoodieLogFile >> logFiles = logFileStream .collect (Collectors .groupingBy ((logFile ) -> {
210
- String partitionPathStr =
211
- FSUtils .getRelativePartitionPath (metaClient .getBasePathV2 (), logFile .getPath ().getParent ());
212
- return Pair .of (partitionPathStr , logFile .getFileId ());
213
- }));
214
-
215
- Set <Pair <String , String >> fileIdSet = new HashSet <>(baseFiles .keySet ());
210
+ Map <String , List <HoodieBaseFile >> baseFiles =
211
+ baseFileStream .collect (Collectors .groupingBy (HoodieBaseFile ::getFileId ));
212
+
213
+ Map <String , List <HoodieLogFile >> logFiles = logFileStream .collect (Collectors .groupingBy (HoodieLogFile ::getFileId ));
214
+
215
+ Set <String > fileIdSet = new HashSet <>(baseFiles .keySet ());
216
216
fileIdSet .addAll (logFiles .keySet ());
217
217
218
- List <HoodieFileGroup > fileGroups = new ArrayList <>();
219
- fileIdSet .forEach (pair -> {
220
- String fileId = pair .getValue ();
221
- String partitionPath = pair .getKey ();
222
- HoodieFileGroup group = new HoodieFileGroup (partitionPath , fileId , timeline );
223
- if (baseFiles .containsKey (pair )) {
224
- baseFiles .get (pair ).forEach (group ::addBaseFile );
218
+ List <HoodieFileGroup > fileGroups = new ArrayList <>(fileIdSet .size ());
219
+ fileIdSet .forEach (fileId -> {
220
+ HoodieFileGroup group = new HoodieFileGroup (partition , fileId , timeline );
221
+ if (baseFiles .containsKey (fileId )) {
222
+ baseFiles .get (fileId ).forEach (group ::addBaseFile );
225
223
}
226
224
if (addPendingCompactionFileSlice ) {
227
225
// pending compaction file slice must be added before log files so that
@@ -235,8 +233,8 @@ protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile> baseFileS
235
233
group .addNewFileSliceAtInstant (pendingCompaction .get ().getKey ());
236
234
}
237
235
}
238
- if (logFiles .containsKey (pair )) {
239
- logFiles .get (pair ).stream ().sorted (HoodieLogFile .getLogFileComparator ()).forEach (logFile -> group .addLogFile (completionTimeQueryView , logFile ));
236
+ if (logFiles .containsKey (fileId )) {
237
+ logFiles .get (fileId ).stream ().sorted (HoodieLogFile .getLogFileComparator ()).forEach (logFile -> group .addLogFile (completionTimeQueryView , logFile ));
240
238
}
241
239
fileGroups .add (group );
242
240
});
@@ -379,9 +377,9 @@ private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
379
377
LOG .debug ("Time taken to list partitions " + partitionSet + " =" + (endLsTs - beginLsTs ));
380
378
pathInfoMap .forEach ((partitionPair , statuses ) -> {
381
379
String relativePartitionStr = partitionPair .getLeft ();
382
- List <HoodieFileGroup > groups = addFilesToView (statuses );
380
+ List <HoodieFileGroup > groups = addFilesToView (relativePartitionStr , statuses );
383
381
if (groups .isEmpty ()) {
384
- storePartitionView (relativePartitionStr , new ArrayList <> ());
382
+ storePartitionView (relativePartitionStr , Collections . emptyList ());
385
383
}
386
384
LOG .debug ("#files found in partition (" + relativePartitionStr + ") =" + statuses .size ());
387
385
});
@@ -469,7 +467,7 @@ protected void ensurePartitionLoadedCorrectly(String partition) {
469
467
// Not loaded yet
470
468
try {
471
469
LOG .info ("Building file system view for partition (" + partitionPathStr + ")" );
472
- List <HoodieFileGroup > groups = addFilesToView (getAllFilesInPartition (partitionPathStr ));
470
+ List <HoodieFileGroup > groups = addFilesToView (partitionPathStr , getAllFilesInPartition (partitionPathStr ));
473
471
if (groups .isEmpty ()) {
474
472
storePartitionView (partitionPathStr , new ArrayList <>());
475
473
}
@@ -541,11 +539,10 @@ private Stream<HoodieLogFile> convertFileStatusesToLogFiles(List<StoragePathInfo
541
539
* With async compaction, it is possible to see partial/complete base-files due to inflight-compactions, Ignore those
542
540
* base-files.
543
541
*
542
+ * @param partitionPath partition path for the base file
544
543
* @param baseFile base File
545
544
*/
546
- protected boolean isBaseFileDueToPendingCompaction (HoodieBaseFile baseFile ) {
547
- final String partitionPath = getPartitionPathFor (baseFile );
548
-
545
+ protected boolean isBaseFileDueToPendingCompaction (String partitionPath , HoodieBaseFile baseFile ) {
549
546
Option <Pair <String , CompactionOperation >> compactionWithInstantTime =
550
547
getPendingCompactionOperationWithInstant (new HoodieFileGroupId (partitionPath , baseFile .getFileId ()));
551
548
return (compactionWithInstantTime .isPresent ()) && (null != compactionWithInstantTime .get ().getKey ())
@@ -745,7 +742,7 @@ private Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOnFromCache(String part
745
742
.map (fileGroup -> Option .fromJavaOptional (fileGroup .getAllBaseFiles ()
746
743
.filter (baseFile -> HoodieTimeline .compareTimestamps (baseFile .getCommitTime (), HoodieTimeline .LESSER_THAN_OR_EQUALS , maxCommitTime
747
744
))
748
- .filter (df -> !isBaseFileDueToPendingCompaction (df ) && !isBaseFileDueToPendingClustering (df )).findFirst ()))
745
+ .filter (df -> !isBaseFileDueToPendingCompaction (partitionPath , df ) && !isBaseFileDueToPendingClustering (df )).findFirst ()))
749
746
.filter (Option ::isPresent ).map (Option ::get )
750
747
.map (df -> addBootstrapBaseFileIfPresent (new HoodieFileGroupId (partitionPath , df .getFileId ()), df ));
751
748
}
@@ -761,7 +758,7 @@ public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr, String in
761
758
} else {
762
759
return fetchHoodieFileGroup (partitionPath , fileId ).map (fileGroup -> fileGroup .getAllBaseFiles ()
763
760
.filter (baseFile -> HoodieTimeline .compareTimestamps (baseFile .getCommitTime (), HoodieTimeline .EQUALS ,
764
- instantTime )).filter (df -> !isBaseFileDueToPendingCompaction (df ) && !isBaseFileDueToPendingClustering (df )).findFirst ().orElse (null ))
761
+ instantTime )).filter (df -> !isBaseFileDueToPendingCompaction (partitionPath , df ) && !isBaseFileDueToPendingClustering (df )).findFirst ().orElse (null ))
765
762
.map (df -> addBootstrapBaseFileIfPresent (new HoodieFileGroupId (partitionPath , fileId ), df ));
766
763
}
767
764
} finally {
@@ -797,7 +794,7 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commi
797
794
.filter (fileGroup -> !isFileGroupReplacedBeforeAny (fileGroup .getFileGroupId (), commitsToReturn ))
798
795
.map (fileGroup -> Pair .of (fileGroup .getFileGroupId (), Option .fromJavaOptional (
799
796
fileGroup .getAllBaseFiles ().filter (baseFile -> commitsToReturn .contains (baseFile .getCommitTime ())
800
- && !isBaseFileDueToPendingCompaction (baseFile ) && !isBaseFileDueToPendingClustering (baseFile )).findFirst ()))).filter (p -> p .getValue ().isPresent ())
797
+ && !isBaseFileDueToPendingCompaction (fileGroup . getPartitionPath (), baseFile ) && !isBaseFileDueToPendingClustering (baseFile )).findFirst ()))).filter (p -> p .getValue ().isPresent ())
801
798
.map (p -> addBootstrapBaseFileIfPresent (p .getKey (), p .getValue ().get ()));
802
799
} finally {
803
800
readLock .unlock ();
@@ -833,7 +830,7 @@ public final Stream<HoodieBaseFile> getAllBaseFiles(String partitionStr) {
833
830
return fetchAllBaseFiles (partitionPath )
834
831
.filter (df -> !isFileGroupReplaced (partitionPath , df .getFileId ()))
835
832
.filter (df -> visibleCommitsAndCompactionTimeline .containsOrBeforeTimelineStarts (df .getCommitTime ()))
836
- .filter (df -> !isBaseFileDueToPendingCompaction (df ) && !isBaseFileDueToPendingClustering (df ))
833
+ .filter (df -> !isBaseFileDueToPendingCompaction (partitionPath , df ) && !isBaseFileDueToPendingClustering (df ))
837
834
.map (df -> addBootstrapBaseFileIfPresent (new HoodieFileGroupId (partitionPath , df .getFileId ()), df ));
838
835
} finally {
839
836
readLock .unlock ();
@@ -875,7 +872,7 @@ public final Stream<FileSlice> getLatestFileSlicesStateless(String partitionStr)
875
872
return getLatestFileSlices (partition );
876
873
} else {
877
874
try {
878
- Stream <FileSlice > fileSliceStream = buildFileGroups (getAllFilesInPartition (partition ), visibleCommitsAndCompactionTimeline , true ).stream ()
875
+ Stream <FileSlice > fileSliceStream = buildFileGroups (partition , getAllFilesInPartition (partition ), visibleCommitsAndCompactionTimeline , true ).stream ()
879
876
.filter (fg -> !isFileGroupReplaced (fg ))
880
877
.map (HoodieFileGroup ::getLatestFileSlice )
881
878
.filter (Option ::isPresent ).map (Option ::get )
@@ -1090,7 +1087,7 @@ public final Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionS
1090
1087
return getAllFileGroups (partition );
1091
1088
} else {
1092
1089
try {
1093
- Stream <HoodieFileGroup > fileGroupStream = buildFileGroups (getAllFilesInPartition (partition ), visibleCommitsAndCompactionTimeline , true ).stream ()
1090
+ Stream <HoodieFileGroup > fileGroupStream = buildFileGroups (partition , getAllFilesInPartition (partition ), visibleCommitsAndCompactionTimeline , true ).stream ()
1094
1091
.filter (fg -> !isFileGroupReplaced (fg ));
1095
1092
if (bootstrapIndex .useIndex ()) {
1096
1093
final Map <HoodieFileGroupId , BootstrapBaseFileMapping > bootstrapBaseFileMappings = getBootstrapBaseFileMappings (partition );
@@ -1430,7 +1427,7 @@ public Stream<HoodieBaseFile> fetchLatestBaseFiles(final String partitionPath) {
1430
1427
1431
1428
protected Option <HoodieBaseFile > getLatestBaseFile (HoodieFileGroup fileGroup ) {
1432
1429
return Option
1433
- .fromJavaOptional (fileGroup .getAllBaseFiles ().filter (df -> !isBaseFileDueToPendingCompaction (df ) && !isBaseFileDueToPendingClustering (df )).findFirst ());
1430
+ .fromJavaOptional (fileGroup .getAllBaseFiles ().filter (df -> !isBaseFileDueToPendingCompaction (fileGroup . getPartitionPath (), df ) && !isBaseFileDueToPendingClustering (df )).findFirst ());
1434
1431
}
1435
1432
1436
1433
/**
0 commit comments