Skip to content

Commit

Permalink
Fix refresh cache issue for hudi table (StarRocks#5382) (StarRocks#5384)
Browse files Browse the repository at this point in the history
(cherry picked from commit 844870d)

Co-authored-by: miomiocat <[email protected]>
  • Loading branch information
mergify[bot] and miomiocat authored Apr 22, 2022
1 parent 985f427 commit 76a150f
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,7 @@ public void onCreate() {
@Override
public void onDrop() {
if (this.resourceName != null) {
Catalog.getCurrentCatalog().getHiveRepository().
clearCache(this.resourceName, this.hiveDb, this.hiveTable);
Catalog.getCurrentCatalog().getHiveRepository().clearCache(hmsTableInfo);
Catalog.getCurrentCatalog().getMetastoreEventsProcessor().unregisterTable(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ public void readFields(DataInput in) throws IOException {
@Override
public void onDrop() {
if (this.resourceName != null) {
Catalog.getCurrentCatalog().getHiveRepository().
clearCache(this.resourceName, this.db, this.table, true);
Catalog.getCurrentCatalog().getHiveRepository().clearCache(hmsTableInfo);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,12 @@ public void refreshTable(HiveMetaStoreTableInfo hmsTable)
throws DdlException {
String dbName = hmsTable.getDb();
String tableName = hmsTable.getTable();
Table.TableType tableType = hmsTable.getTableType();
List<Column> partColumns = getPartitionColumns(hmsTable);
List<String> columnNames = getAllColumnNames(hmsTable);
HivePartitionKeysKey hivePartitionKeysKey = HivePartitionKeysKey.gen(dbName, tableName, partColumns);
HivePartitionKeysKey hivePartitionKeysKey = new HivePartitionKeysKey(dbName, tableName, tableType, partColumns);
HiveTableKey hiveTableKey = HiveTableKey.gen(dbName, tableName);
HiveTableColumnsKey hiveTableColumnsKey = HiveTableColumnsKey.gen(dbName, tableName, partColumns, columnNames);
HiveTableColumnsKey hiveTableColumnsKey = new HiveTableColumnsKey(dbName, tableName, partColumns, columnNames, tableType);
Catalog.getCurrentCatalog().getMetastoreEventsProcessor().getEventProcessorLock().writeLock().lock();
try {
ImmutableMap<PartitionKey, Long> partitionKeys = loadPartitionKeys(hivePartitionKeysKey);
Expand All @@ -307,8 +308,7 @@ public void refreshTable(HiveMetaStoreTableInfo hmsTable)

// for unpartition table, refresh the partition info, because there is only one partition
if (partColumns.size() <= 0) {
HivePartitionKey hivePartitionKey = new HivePartitionKey(dbName, tableName,
hmsTable.getTableType(), new ArrayList<>());
HivePartitionKey hivePartitionKey = new HivePartitionKey(dbName, tableName, tableType, new ArrayList<>());
partitionsCache.put(hivePartitionKey, loadPartition(hivePartitionKey));
partitionStatsCache.put(hivePartitionKey, loadPartitionStats(hivePartitionKey));
}
Expand Down Expand Up @@ -344,23 +344,28 @@ public void refreshColumnStats(HiveMetaStoreTableInfo hmsTable)
List<String> columnNames = getAllColumnNames(hmsTable);
try {
HiveTableColumnsKey hiveTableColumnsKey =
HiveTableColumnsKey.gen(hmsTable.getDb(), hmsTable.getTable(), partColumns, columnNames);
new HiveTableColumnsKey(hmsTable.getDb(), hmsTable.getTable(),
partColumns, columnNames, hmsTable.getTableType());
tableColumnStatsCache.put(hiveTableColumnsKey, loadTableColumnStats(hiveTableColumnsKey));
} catch (Exception e) {
throw new DdlException("refresh table column statistic cached failed: " + e.getMessage());
}
}

public void clearCache(String dbName, String tableName, boolean isHudiTable) {
HivePartitionKeysKey hivePartitionKeysKey = HivePartitionKeysKey.gen(dbName, tableName, null);
public void clearCache(HiveMetaStoreTableInfo hmsTable) {
String dbName = hmsTable.getDb();
String tableName = hmsTable.getTable();
Table.TableType tableType = hmsTable.getTableType();
HivePartitionKeysKey hivePartitionKeysKey = new HivePartitionKeysKey(dbName, tableName, tableType, null);
ImmutableMap<PartitionKey, Long> partitionKeys = partitionKeysCache.getIfPresent(hivePartitionKeysKey);
partitionKeysCache.invalidate(hivePartitionKeysKey);
tableStatsCache.invalidate(HiveTableKey.gen(dbName, tableName));
tableColumnStatsCache.invalidate(HiveTableColumnsKey.gen(dbName, tableName, null, null));
tableColumnStatsCache.invalidate(new HiveTableColumnsKey(dbName, tableName, null, null, tableType));
if (partitionKeys != null) {
for (Map.Entry<PartitionKey, Long> entry : partitionKeys.entrySet()) {
HivePartitionKey pKey =
HivePartitionKey.gen(dbName, tableName, Utils.getPartitionValues(entry.getKey(), isHudiTable));
new HivePartitionKey(dbName, tableName, tableType,
Utils.getPartitionValues(entry.getKey(), tableType == Table.TableType.HUDI));
partitionsCache.invalidate(pKey);
partitionStatsCache.invalidate(pKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@ public class HivePartitionKey {
private final List<String> partitionValues;
private final TableType tableType;

public HivePartitionKey(String databaseName, String tableName, List<String> partitionValues) {
this(databaseName, tableName, TableType.HIVE, partitionValues);
}

public HivePartitionKey(String databaseName, String tableName, TableType tableType, List<String> partitionValues) {
this.databaseName = databaseName;
this.tableName = tableName;
this.partitionValues = partitionValues;
this.tableType = tableType;
}

public static HivePartitionKey gen(String databaseName, String tableName, List<String> partitionValues) {
return new HivePartitionKey(databaseName, tableName, partitionValues);
}

public String getTableName() {
return tableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ public HivePartitionKeysKey(String databaseName, String tableName,
this.tableType = tableType;
}

public HivePartitionKeysKey(String databaseName, String tableName, List<Column> partitionColumns) {
this(databaseName, tableName, TableType.HIVE, partitionColumns);
}

public static HivePartitionKeysKey gen(String databaseName, String tableName, List<Column> partitionColumns) {
return new HivePartitionKeysKey(databaseName, tableName, partitionColumns);
}

public String getDatabaseName() {
return databaseName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,12 @@ public void refreshTableColumnStats(HiveMetaStoreTableInfo hmsTable)
metaCache.refreshColumnStats(hmsTable);
}

public void clearCache(String resourceName, String dbName, String tableName) {
clearCache(resourceName, dbName, tableName, false);
}

public void clearCache(String resourceName, String dbName, String tableName, boolean isHudiTable) {
public void clearCache(HiveMetaStoreTableInfo hmsTable) {
try {
HiveMetaCache metaCache = getMetaCache(resourceName);
metaCache.clearCache(dbName, tableName, isHudiTable);
HiveMetaCache metaCache = getMetaCache(hmsTable.getResourceName());
metaCache.clearCache(hmsTable);
} catch (DdlException e) {
LOG.warn("clean table {}.{} cache failed.", dbName, tableName, e);
LOG.warn("clean table {}.{} cache failed.", hmsTable.getDb(), hmsTable.getTable(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ public HiveTableColumnsKey(String databaseName, String tableName, List<Column> p
this.tableType = tableType;
}

public HiveTableColumnsKey(String databaseName, String tableName, List<Column> partitionColumns,
List<String> columnNames) {
this(databaseName, tableName, partitionColumns, columnNames, TableType.HIVE);
}

public static HiveTableColumnsKey gen(String databaseName, String tableName, List<Column> partitionColumns,
List<String> columnNames) {
return new HiveTableColumnsKey(databaseName, tableName, partitionColumns, columnNames);
}

public String getDatabaseName() {
return databaseName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.Lists;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.external.hive.HiveMetaCache;
import com.starrocks.external.hive.HivePartitionKey;
import com.starrocks.external.hive.HivePartitionKeysKey;
Expand Down Expand Up @@ -50,7 +51,7 @@ private AddPartitionEvent(NotificationEvent event,
this.partCols = partCols;
hmsTbl = addPartitionMessage.getTableObj();
hivePartitionKeys.clear();
hivePartitionKeys.add(HivePartitionKey.gen(dbName, tblName, addedPartition.getValues()));
hivePartitionKeys.add(new HivePartitionKey(dbName, tblName, Table.TableType.HIVE, addedPartition.getValues()));
} catch (Exception ex) {
throw new MetastoreNotificationException(ex);
}
Expand Down Expand Up @@ -107,7 +108,7 @@ protected void process() throws MetastoreNotificationException {
return;
}
try {
HivePartitionKeysKey partitionKeysKey = HivePartitionKeysKey.gen(dbName, tblName, partCols);
HivePartitionKeysKey partitionKeysKey = new HivePartitionKeysKey(dbName, tblName, Table.TableType.HIVE, partCols);
PartitionKey partitionKey = Utils.createPartitionKey(addedPartition.getValues(), partCols);
cache.addPartitionKeyByEvent(partitionKeysKey, partitionKey, getHivePartitionKey());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.starrocks.catalog.Table;
import com.starrocks.external.hive.HiveMetaCache;
import com.starrocks.external.hive.HivePartitionKey;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
Expand Down Expand Up @@ -38,7 +39,7 @@ private AlterPartitionEvent(NotificationEvent event, HiveMetaCache metaCache) {
partitionAfter = Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
hmsTbl = alterPartitionMessage.getTableObj();
hivePartitionKeys.clear();
hivePartitionKeys.add(HivePartitionKey.gen(dbName, tblName, partitionAfter.getValues()));
hivePartitionKeys.add(new HivePartitionKey(dbName, tblName, Table.TableType.HIVE, partitionAfter.getValues()));
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter partition message"), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.Lists;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.external.hive.HiveMetaCache;
import com.starrocks.external.hive.HivePartitionKey;
import com.starrocks.external.hive.HivePartitionKeysKey;
Expand Down Expand Up @@ -46,7 +47,8 @@ private DropPartitionEvent(NotificationEvent event,
Preconditions.checkState(!partCols.isEmpty());
this.partCols = partCols;
hivePartitionKeys.clear();
hivePartitionKeys.add(HivePartitionKey.gen(dbName, tblName, Lists.newArrayList(droppedPartition.values())));
hivePartitionKeys.add(new HivePartitionKey(dbName, tblName, Table.TableType.HIVE,
Lists.newArrayList(droppedPartition.values())));
} catch (Exception ex) {
throw new MetastoreNotificationException(
debugString("Could not parse drop event message. "), ex);
Expand Down Expand Up @@ -96,7 +98,7 @@ protected boolean isSupported() {
@Override
protected void process() throws MetastoreNotificationException {
try {
HivePartitionKeysKey partitionKeysKey = HivePartitionKeysKey.gen(dbName, tblName, partCols);
HivePartitionKeysKey partitionKeysKey = new HivePartitionKeysKey(dbName, tblName, Table.TableType.HIVE, partCols);
PartitionKey partitionKey = Utils.createPartitionKey(Lists.newArrayList(droppedPartition.values()), partCols);
cache.dropPartitionKeyByEvent(partitionKeysKey, partitionKey, getHivePartitionKey());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.starrocks.catalog.Table;
import com.starrocks.external.hive.HiveMetaCache;
import com.starrocks.external.hive.HivePartitionKey;
import com.starrocks.external.hive.HiveTableKey;
Expand Down Expand Up @@ -37,7 +38,7 @@ private InsertEvent(NotificationEvent event, HiveMetaCache metaCache) {
insertPartition = insertMessage.getPtnObj();
if (insertPartition != null) {
hivePartitionKeys.clear();
hivePartitionKeys.add(HivePartitionKey.gen(dbName, tblName, insertPartition.getValues()));
hivePartitionKeys.add(new HivePartitionKey(dbName, tblName, Table.TableType.HIVE, insertPartition.getValues()));
}
} catch (Exception e) {
LOG.warn("The InsertEvent of the current hive version cannot be parsed, " +
Expand Down Expand Up @@ -76,7 +77,7 @@ protected boolean existInCache() {
return cache.tableExistInCache(tableKey);
} else {
List<String> partVals = insertPartition.getValues();
HivePartitionKey partitionKey = HivePartitionKey.gen(dbName, tblName, partVals);
HivePartitionKey partitionKey = new HivePartitionKey(dbName, tblName, Table.TableType.HIVE, partVals);
return cache.partitionExistInCache(partitionKey);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clearspring.analytics.util.Lists;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.starrocks.catalog.Table.TableType;
import com.starrocks.external.hive.HiveMetaCache;
import com.starrocks.external.hive.HivePartitionKey;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
Expand All @@ -31,7 +32,7 @@ protected MetastoreTableEvent(NotificationEvent event, HiveMetaCache metaCache)
Preconditions.checkNotNull(dbName, "Database name cannot be null");
tblName = Preconditions.checkNotNull(event.getTableName());

HivePartitionKey hivePartitionKey = HivePartitionKey.gen(dbName, tblName, Lists.newArrayList());
HivePartitionKey hivePartitionKey = new HivePartitionKey(dbName, tblName, TableType.HIVE, Lists.newArrayList());
hivePartitionKeys.add(hivePartitionKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ public void testAddPartitionByEvent() throws Exception {
Assert.assertTrue(
partitionKeys.containsKey(Utils.createPartitionKey(Lists.newArrayList("1", "2", "5"), partColumns)));

HivePartitionKeysKey newPartitionKeysKey = HivePartitionKeysKey.gen("db", "tbl", partColumns);
HivePartitionKeysKey newPartitionKeysKey = new HivePartitionKeysKey("db", "tbl", Table.TableType.HIVE, partColumns);
List<String> partValues = Lists.newArrayList("11", "22", "33");
PartitionKey newPartitionKey = Utils.createPartitionKey(partValues, partColumns);
HivePartitionKey newHivePartitionKey = HivePartitionKey.gen("db", "tbl", partValues);
HivePartitionKey newHivePartitionKey = new HivePartitionKey("db", "tbl", Table.TableType.HIVE, partValues);
metaCache.addPartitionKeyByEvent(newPartitionKeysKey, newPartitionKey, newHivePartitionKey);
partitionKeys = metaCache.getPartitionKeys(hmsTable);
Assert.assertEquals(4, partitionKeys.size());
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testAlterPartitionByEvent() throws Exception {
Map<String, String> params = Maps.newHashMap();
params.put("numRows", "5");
List<String> partValues = Lists.newArrayList("1", "2", "3");
HivePartitionKey partitionKey = HivePartitionKey.gen("db", "tbl", partValues);
HivePartitionKey partitionKey = new HivePartitionKey("db", "tbl", Table.TableType.HIVE, partValues);
StorageDescriptor sd = new StorageDescriptor();
sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
SerDeInfo serDeInfo = new SerDeInfo();
Expand Down Expand Up @@ -211,10 +211,10 @@ public void testDropPartitionByEvent() throws Exception {
Assert.assertTrue(
partitionKeys.containsKey(Utils.createPartitionKey(Lists.newArrayList("1", "2", "5"), partColumns)));

HivePartitionKeysKey dropPartitionKeysKey = HivePartitionKeysKey.gen("db", "tbl", partColumns);
HivePartitionKeysKey dropPartitionKeysKey = new HivePartitionKeysKey("db", "tbl", Table.TableType.HIVE, partColumns);
List<String> partValues = Lists.newArrayList("1", "2", "3");
PartitionKey dropPartitionKey = Utils.createPartitionKey(partValues, partColumns);
HivePartitionKey dropHivePartitionKey = HivePartitionKey.gen("db", "tbl", partValues);
HivePartitionKey dropHivePartitionKey = new HivePartitionKey("db", "tbl", Table.TableType.HIVE, partValues);
metaCache.dropPartitionKeyByEvent(dropPartitionKeysKey, dropPartitionKey, dropHivePartitionKey);
partitionKeys = metaCache.getPartitionKeys(hmsTable);
Assert.assertEquals(2, partitionKeys.size());
Expand All @@ -240,7 +240,7 @@ public void clearCache() throws Exception {
Assert.assertEquals(1, clientMethodGetTableStatsCalledTimes);
Assert.assertEquals(1, clientMethodGetPartitionStatsCalledTimes);

metaCache.clearCache("db", "tbl", false);
metaCache.clearCache(hmsTable);

metaCache.getPartitionKeys(hmsTable);
metaCache.getPartition(hmsTable,
Expand Down

0 comments on commit 76a150f

Please sign in to comment.