Skip to content

Commit

Permalink
[HUDI-8782] BulkInsertWriterHelper parallel close (apache#12518)
Browse files Browse the repository at this point in the history
* parralel close draft
* update awaitTermination to 10 minutes in close
* deal with empty handles in close
* hard code close to 10 max parallelism

---------

Co-authored-by: fhan <[email protected]>
  • Loading branch information
fhan688 and fhan authored Jan 7, 2025
1 parent 3617661 commit 2288c07
Showing 1 changed file with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -45,6 +46,13 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.FutureUtils.allOf;

/**
* Helper class for bulk insert used by Flink.
Expand Down Expand Up @@ -164,9 +172,29 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throw
}

public void close() throws IOException {
for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) {
LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
writeStatusList.add(closeWriteHandle(rowCreateHandle));
if (handles.isEmpty()) {
return;
}
int handsSize = Math.min(handles.size(), 10);
ExecutorService executorService = Executors.newFixedThreadPool(handsSize);
allOf(handles.values().stream()
.map(rowCreateHandle -> CompletableFuture.supplyAsync(() -> {
try {
LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
return rowCreateHandle.close();
} catch (IOException e) {
throw new HoodieIOException("IOE during rowCreateHandle.close()", e);
}
}, executorService))
.collect(Collectors.toList())
).whenComplete((result, throwable) -> {
writeStatusList.addAll(result);
}).join();
try {
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
handles.clear();
handle = null;
Expand Down

0 comments on commit 2288c07

Please sign in to comment.