From 2288c07983322017bfb63fa22c4e998a25dca2c5 Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 7 Jan 2025 15:42:16 +0800 Subject: [PATCH] [HUDI-8782] BulkInsertWriterHelper parallel close (#12518) * parralel close draft * update awaitTermination to 10 minutes in close * deal with empty handles in close * hard code close to 10 max parallelism --------- Co-authored-by: fhan --- .../sink/bulk/BulkInsertWriterHelper.java | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index 9b9b85ba6e3d..dd2368b1ff52 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -25,6 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; import org.apache.hudi.metrics.FlinkStreamWriteMetrics; import org.apache.hudi.table.HoodieTable; @@ -45,6 +46,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.FutureUtils.allOf; /** * Helper class for bulk insert used by Flink. @@ -164,9 +172,29 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throw } public void close() throws IOException { - for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) { - LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName()); - writeStatusList.add(closeWriteHandle(rowCreateHandle)); + if (handles.isEmpty()) { + return; + } + int handsSize = Math.min(handles.size(), 10); + ExecutorService executorService = Executors.newFixedThreadPool(handsSize); + allOf(handles.values().stream() + .map(rowCreateHandle -> CompletableFuture.supplyAsync(() -> { + try { + LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName()); + return rowCreateHandle.close(); + } catch (IOException e) { + throw new HoodieIOException("IOE during rowCreateHandle.close()", e); + } + }, executorService)) + .collect(Collectors.toList()) + ).whenComplete((result, throwable) -> { + writeStatusList.addAll(result); + }).join(); + try { + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); } handles.clear(); handle = null;