59
59
import org .apache .hudi .exception .HoodieRollbackException ;
60
60
import org .apache .hudi .metadata .HoodieTableMetadata ;
61
61
import org .apache .hudi .metadata .HoodieTableMetadataUtil ;
62
+ import org .apache .hudi .storage .StorageConfiguration ;
62
63
import org .apache .hudi .table .HoodieTable ;
63
64
import org .apache .hudi .table .action .HoodieWriteMetadata ;
64
65
import org .apache .hudi .table .action .compact .CompactHelpers ;
65
66
import org .apache .hudi .table .action .rollback .RollbackUtils ;
66
67
import org .apache .hudi .table .marker .WriteMarkersFactory ;
67
68
68
69
import com .codahale .metrics .Timer ;
69
- import org .apache .hadoop .conf .Configuration ;
70
70
import org .slf4j .Logger ;
71
71
import org .slf4j .LoggerFactory ;
72
72
@@ -165,7 +165,7 @@ protected void setPendingInflightAndRequestedInstants(Set<String> pendingInfligh
165
165
protected void preCommit (HoodieCommitMetadata metadata ) {
166
166
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
167
167
// Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
168
- HoodieTable table = createTable (config , hadoopConf );
168
+ HoodieTable table = createTable (config , storageConf );
169
169
resolveWriteConflict (table , metadata , this .pendingInflightAndRequestedInstants );
170
170
}
171
171
@@ -198,7 +198,7 @@ private void inlineCompaction(HoodieTable table, Option<Map<String, String>> ext
198
198
* @return Collection of Write Status
199
199
*/
200
200
protected HoodieWriteMetadata <O > logCompact (String logCompactionInstantTime , boolean shouldComplete ) {
201
- HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf (). unwrapAs ( Configuration . class ) );
201
+ HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf ());
202
202
203
203
// Check if a commit or compaction instant with a greater timestamp is on the timeline.
204
204
// If an instant is found then abort log compaction, since it is no longer needed.
@@ -288,7 +288,7 @@ public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetada
288
288
* @return Collection of Write Status
289
289
*/
290
290
protected HoodieWriteMetadata <O > compact (String compactionInstantTime , boolean shouldComplete ) {
291
- HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf (). unwrapAs ( Configuration . class ) );
291
+ HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf ());
292
292
HoodieTimeline pendingCompactionTimeline = table .getActiveTimeline ().filterPendingCompactionTimeline ();
293
293
HoodieInstant inflightInstant = HoodieTimeline .getCompactionInflightInstant (compactionInstantTime );
294
294
if (pendingCompactionTimeline .containsInstant (inflightInstant )) {
@@ -313,7 +313,7 @@ protected HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean s
313
313
*/
314
314
public void commitCompaction (String compactionInstantTime , HoodieCommitMetadata metadata , Option <Map <String , String >> extraMetadata ) {
315
315
extraMetadata .ifPresent (m -> m .forEach (metadata ::addMetadata ));
316
- completeCompaction (metadata , createTable (config , context .getStorageConf (). unwrapAs ( Configuration . class ) ), compactionInstantTime );
316
+ completeCompaction (metadata , createTable (config , context .getStorageConf ()), compactionInstantTime );
317
317
}
318
318
319
319
/**
@@ -446,7 +446,7 @@ public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String
446
446
* @return Collection of Write Status
447
447
*/
448
448
public HoodieWriteMetadata <O > cluster (String clusteringInstant , boolean shouldComplete ) {
449
- HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf (). unwrapAs ( Configuration . class ) );
449
+ HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf ());
450
450
HoodieTimeline pendingClusteringTimeline = table .getActiveTimeline ().filterPendingReplaceOrClusteringTimeline ();
451
451
Option <HoodieInstant > inflightInstantOpt = ClusteringUtils .getInflightClusteringInstant (clusteringInstant , table .getActiveTimeline ());
452
452
if (inflightInstantOpt .isPresent ()) {
@@ -481,7 +481,7 @@ public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldCo
481
481
}
482
482
483
483
public boolean purgePendingClustering (String clusteringInstant ) {
484
- HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf (). unwrapAs ( Configuration . class ) );
484
+ HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf ());
485
485
Option <HoodieInstant > inflightInstantOpt = ClusteringUtils .getInflightClusteringInstant (clusteringInstant , table .getActiveTimeline ());
486
486
if (inflightInstantOpt .isPresent ()) {
487
487
table .rollbackInflightClustering (inflightInstantOpt .get (), commitToRollback -> getPendingRollbackInfo (table .getMetaClient (), commitToRollback , false ), true );
@@ -498,7 +498,7 @@ public boolean purgePendingClustering(String clusteringInstant) {
498
498
* @return HoodieWriteMetadata
499
499
*/
500
500
public HoodieWriteMetadata <T > managePartitionTTL (String instantTime ) {
501
- HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf (). unwrapAs ( Configuration . class ) );
501
+ HoodieTable <?, I , ?, T > table = createTable (config , context .getStorageConf ());
502
502
return table .managePartitionTTL (context , instantTime );
503
503
}
504
504
@@ -633,7 +633,7 @@ protected Option<String> scheduleTableServiceInternal(String instantTime, Option
633
633
}
634
634
635
635
Option <String > option = Option .empty ();
636
- HoodieTable <?, ?, ?, ?> table = createTable (config , hadoopConf );
636
+ HoodieTable <?, ?, ?, ?> table = createTable (config , storageConf );
637
637
638
638
switch (tableServiceType ) {
639
639
case ARCHIVE :
@@ -675,7 +675,7 @@ protected Option<String> scheduleTableServiceInternal(String instantTime, Option
675
675
return option ;
676
676
}
677
677
678
- protected abstract HoodieTable <?, I , ?, T > createTable (HoodieWriteConfig config , Configuration hadoopConf );
678
+ protected abstract HoodieTable <?, I , ?, T > createTable (HoodieWriteConfig config , StorageConfiguration <?> storageConf );
679
679
680
680
/**
681
681
* Executes a clustering plan on a table, serially before or after an insert/upsert action.
@@ -754,7 +754,7 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
754
754
CleanerUtils .rollbackFailedWrites (config .getFailedWritesCleanPolicy (),
755
755
HoodieTimeline .CLEAN_ACTION , () -> rollbackFailedWrites ());
756
756
757
- HoodieTable table = createTable (config , hadoopConf );
757
+ HoodieTable table = createTable (config , storageConf );
758
758
if (config .allowMultipleCleans () || !table .getActiveTimeline ().getCleanerTimeline ().filterInflightsAndRequested ().firstInstant ().isPresent ()) {
759
759
LOG .info ("Cleaner started" );
760
760
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
@@ -892,7 +892,7 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos
892
892
* @return {@code true} if rollback happens; {@code false} otherwise.
893
893
*/
894
894
protected boolean rollbackFailedIndexingCommits () {
895
- HoodieTable table = createTable (config , hadoopConf );
895
+ HoodieTable table = createTable (config , storageConf );
896
896
List <String > instantsToRollback = getFailedIndexingCommitsToRollbackForMetadataTable (table .getMetaClient ());
897
897
Map <String , Option <HoodiePendingRollbackInfo >> pendingRollbacks = getPendingRollbackInfos (table .getMetaClient ());
898
898
instantsToRollback .forEach (entry -> pendingRollbacks .putIfAbsent (entry , Option .empty ()));
@@ -929,7 +929,7 @@ && isIndexingCommit(dataIndexTimeline, instant.getTimestamp()))
929
929
* @return true if rollback was triggered. false otherwise.
930
930
*/
931
931
protected Boolean rollbackFailedWrites () {
932
- HoodieTable table = createTable (config , hadoopConf );
932
+ HoodieTable table = createTable (config , storageConf );
933
933
List <String > instantsToRollback = getInstantsToRollback (table .getMetaClient (), config .getFailedWritesCleanPolicy (), Option .empty ());
934
934
Map <String , Option <HoodiePendingRollbackInfo >> pendingRollbacks = getPendingRollbackInfos (table .getMetaClient ());
935
935
instantsToRollback .forEach (entry -> pendingRollbacks .putIfAbsent (entry , Option .empty ()));
@@ -1051,7 +1051,7 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
1051
1051
LOG .info ("Begin rollback of instant " + commitInstantTime );
1052
1052
final Timer .Context timerContext = this .metrics .getRollbackCtx ();
1053
1053
try {
1054
- HoodieTable table = createTable (config , hadoopConf );
1054
+ HoodieTable table = createTable (config , storageConf );
1055
1055
Option <HoodieInstant > commitInstantOpt = Option .fromJavaOptional (table .getActiveTimeline ().getCommitsTimeline ().getInstantsAsStream ()
1056
1056
.filter (instant -> HoodieActiveTimeline .EQUALS .test (instant .getTimestamp (), commitInstantTime ))
1057
1057
.findFirst ());
@@ -1096,7 +1096,7 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
1096
1096
*/
1097
1097
public void rollbackFailedBootstrap () {
1098
1098
LOG .info ("Rolling back pending bootstrap if present" );
1099
- HoodieTable table = createTable (config , hadoopConf );
1099
+ HoodieTable table = createTable (config , storageConf );
1100
1100
HoodieTimeline inflightTimeline = table .getMetaClient ().getCommitsTimeline ().filterPendingExcludingMajorAndMinorCompaction ();
1101
1101
Option <String > instant = Option .fromJavaOptional (
1102
1102
inflightTimeline .getReverseOrderedInstants ().map (HoodieInstant ::getTimestamp ).findFirst ());
0 commit comments