83
83
84
84
import java .io .FileNotFoundException ;
85
85
import java .io .IOException ;
86
+ import java .util .ArrayDeque ;
86
87
import java .util .ArrayList ;
87
88
import java .util .Collections ;
88
89
import java .util .HashMap ;
89
90
import java .util .LinkedList ;
90
91
import java .util .List ;
91
92
import java .util .Map ;
93
+ import java .util .Queue ;
92
94
import java .util .Set ;
93
95
import java .util .function .Function ;
94
96
import java .util .stream .Collectors ;
@@ -761,7 +763,10 @@ private HoodieTableMetaClient initializeMetaClient() throws IOException {
761
763
* @return List consisting of {@code DirectoryInfo} for each partition found.
762
764
*/
763
765
private List <DirectoryInfo > listAllPartitionsFromFilesystem (String initializationTime , Set <String > pendingDataInstants ) {
764
- List <StoragePath > pathsToList = new LinkedList <>();
766
+ if (dataMetaClient .getActiveTimeline ().countInstants () == 0 ) {
767
+ return Collections .emptyList ();
768
+ }
769
+ Queue <StoragePath > pathsToList = new ArrayDeque <>();
765
770
pathsToList .add (new StoragePath (dataWriteConfig .getBasePath ()));
766
771
767
772
List <DirectoryInfo > partitionsToBootstrap = new LinkedList <>();
@@ -773,16 +778,18 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio
773
778
while (!pathsToList .isEmpty ()) {
774
779
// In each round we will list a section of directories
775
780
int numDirsToList = Math .min (fileListingParallelism , pathsToList .size ());
781
+ List <StoragePath > pathsToProcess = new ArrayList <>(numDirsToList );
782
+ for (int i = 0 ; i < numDirsToList ; i ++) {
783
+ pathsToProcess .add (pathsToList .poll ());
784
+ }
776
785
// List all directories in parallel
777
786
engineContext .setJobStatus (this .getClass ().getSimpleName (), "Listing " + numDirsToList + " partitions from filesystem" );
778
- List <DirectoryInfo > processedDirectories = engineContext .map (pathsToList . subList ( 0 , numDirsToList ) , path -> {
787
+ List <DirectoryInfo > processedDirectories = engineContext .map (pathsToProcess , path -> {
779
788
HoodieStorage storage = new HoodieHadoopStorage (path , storageConf );
780
789
String relativeDirPath = FSUtils .getRelativePartitionPath (storageBasePath , path );
781
790
return new DirectoryInfo (relativeDirPath , storage .listDirectEntries (path ), initializationTime , pendingDataInstants );
782
791
}, numDirsToList );
783
792
784
- pathsToList = new LinkedList <>(pathsToList .subList (numDirsToList , pathsToList .size ()));
785
-
786
793
// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
787
794
// the results.
788
795
for (DirectoryInfo dirInfo : processedDirectories ) {
@@ -815,10 +822,10 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio
815
822
* @return List consisting of {@code DirectoryInfo} for each partition found.
816
823
*/
817
824
private List <DirectoryInfo > listAllPartitionsFromMDT (String initializationTime , Set <String > pendingDataInstants ) throws IOException {
818
- List <DirectoryInfo > dirinfoList = new LinkedList <>();
819
825
List <String > allPartitionPaths = metadata .getAllPartitionPaths ().stream ()
820
826
.map (partitionPath -> dataWriteConfig .getBasePath () + StoragePath .SEPARATOR_CHAR + partitionPath ).collect (Collectors .toList ());
821
827
Map <String , List <StoragePathInfo >> partitionFileMap = metadata .getAllFilesInPartitions (allPartitionPaths );
828
+ List <DirectoryInfo > dirinfoList = new ArrayList <>(partitionFileMap .size ());
822
829
for (Map .Entry <String , List <StoragePathInfo >> entry : partitionFileMap .entrySet ()) {
823
830
dirinfoList .add (new DirectoryInfo (entry .getKey (), entry .getValue (), initializationTime , pendingDataInstants ));
824
831
}
0 commit comments