Skip to content

Commit

Permalink
HIVE-25413 : Optimise ObjectStore::alterPartitions to reduce DB calls…
Browse files Browse the repository at this point in the history
… for getPartitions (Mahesh Kumar Behera, reviewed by Rajesh Balamohan)
  • Loading branch information
maheshk114 authored Aug 17, 2021
1 parent baf8247 commit ed1882e
Showing 1 changed file with 77 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2738,7 +2738,7 @@ public Partition getPartition(String catName, String dbName, String tableName,
try {
openTransaction();
MTable table = this.getMTable(catName, dbName, tableName);
MPartition mpart = getMPartition(catName, dbName, tableName, part_vals);
MPartition mpart = getMPartition(catName, dbName, tableName, part_vals, table);
part = convertToPart(mpart, false);
committed = commitTransaction();
if (part == null) {
Expand Down Expand Up @@ -2781,7 +2781,7 @@ public Partition getPartition(String catName, String dbName, String tableName,
* @param part_vals The values defining the partition
* @return The MPartition object in the backend database
*/
private MPartition getMPartition(String catName, String dbName, String tableName, List<String> part_vals)
private MPartition getMPartition(String catName, String dbName, String tableName, List<String> part_vals, MTable mtbl)
throws MetaException {
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
Expand All @@ -2790,9 +2790,11 @@ private MPartition getMPartition(String catName, String dbName, String tableName
MPartition result = null;
try {
openTransaction();
MTable mtbl = getMTable(catName, dbName, tableName);
if (mtbl == null) {
return null;
mtbl = getMTable(catName, dbName, tableName);
if (mtbl == null) {
return null;
}
}
// Change the query to use part_vals instead of the name which is
// redundant TODO: callers of this often get part_vals out of name for no reason...
Expand Down Expand Up @@ -2952,7 +2954,7 @@ public boolean dropPartition(String catName, String dbName, String tableName,
boolean success = false;
try {
openTransaction();
MPartition part = getMPartition(catName, dbName, tableName, part_vals);
MPartition part = getMPartition(catName, dbName, tableName, part_vals, null);
dropPartitionCommon(part);
success = commitTransaction();
} finally {
Expand Down Expand Up @@ -3211,15 +3213,14 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN
boolean success = false;
try {
openTransaction();
MPartition mpart = getMPartition(catName, dbName, tblName, partVals);
MPartition mpart = getMPartition(catName, dbName, tblName, partVals, null);
if (mpart == null) {
commitTransaction();
throw new NoSuchObjectException("partition values="
+ partVals.toString());
}
Partition part = null;
MTable mtbl = mpart.getTable();
part = convertToPart(mpart, false);
Partition part = convertToPart(mpart, false);
if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) {
String partName = Warehouse.makePartName(this.convertToFieldSchemas(mtbl
.getPartitionKeys()), partVals);
Expand Down Expand Up @@ -4997,19 +4998,19 @@ private Partition alterPartitionNoTxn(String catName, String dbname, String name
List<String> part_vals, Partition newPart, String validWriteIds, Ref<MColumnDescriptor> oldCd)
throws InvalidObjectException, MetaException {
MTable table = this.getMTable(newPart.getCatName(), newPart.getDbName(), newPart.getTableName());
return alterPartitionNoTxn(catName, dbname, name, part_vals, newPart,
MPartition oldp = getMPartition(catName, dbname, name, part_vals, table);
return alterPartitionNoTxn(catName, dbname, name, oldp, newPart,
validWriteIds, oldCd, table);
}

private Partition alterPartitionNoTxn(String catName, String dbname,
String name, List<String> part_vals, Partition newPart,
String name, MPartition oldp, Partition newPart,
String validWriteIds,
Ref<MColumnDescriptor> oldCd, MTable table)
throws InvalidObjectException, MetaException {
catName = normalizeIdentifier(catName);
name = normalizeIdentifier(name);
dbname = normalizeIdentifier(dbname);
MPartition oldp = getMPartition(catName, dbname, name, part_vals);
MPartition newp = convertToMPart(newPart, table, false);
MColumnDescriptor oldCD = null;
MStorageDescriptor oldSD = oldp.getSd();
Expand Down Expand Up @@ -5096,37 +5097,74 @@ public Partition alterPartition(String catName, String dbname, String name, List
}

@Override
public List<Partition> alterPartitions(String catName, String dbname, String name,
public List<Partition> alterPartitions(String catName, String dbName, String tblName,
List<List<String>> part_vals, List<Partition> newParts,
long writeId, String queryWriteIdList)
throws InvalidObjectException, MetaException {
boolean success = false;
Exception e = null;
List<Partition> results = new ArrayList<>(newParts.size());
if (newParts.isEmpty()) {
return results;
}
try {
openTransaction();
Iterator<List<String>> part_val_itr = part_vals.iterator();
Set<MColumnDescriptor> oldCds = new HashSet<>();
Ref<MColumnDescriptor> oldCdRef = new Ref<>();
MTable table = null;
for (Partition tmpPart: newParts) {
List<String> tmpPartVals = part_val_itr.next();
if (writeId > 0) {
tmpPart.setWriteId(writeId);

MTable table = this.getMTable(catName, dbName, tblName);
List<String> partNames = new ArrayList<>();
for (List<String> partVal : part_vals) {
partNames.add(
Warehouse.makePartName(convertToFieldSchemas(table.getPartitionKeys()), partVal)
);
}

catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
List<MPartition> mPartitionList;

try (Query query = pm.newQuery(MPartition.class,
"table.tableName == t1 && table.database.name == t2 && t3.contains(partitionName) " +
" && table.database.catalogName == t4")) {
query.declareParameters("java.lang.String t1, java.lang.String t2, java.util.Collection t3, "
+ "java.lang.String t4");
mPartitionList = (List<MPartition>) query.executeWithArray(tblName, dbName, partNames, catName);
pm.retrieveAll(mPartitionList);

if (mPartitionList.size() > newParts.size()) {
throw new MetaException("Expecting only one partition but more than one partitions are found.");
}
oldCdRef.t = null;
if (table == null) {
table = this.getMTable(tmpPart.getCatName(), tmpPart.getDbName(), tmpPart.getTableName());

Map<List<String>, MPartition> mPartsMap = new HashMap();
for (MPartition mPartition : mPartitionList) {
mPartsMap.put(mPartition.getValues(), mPartition);
}
Partition result = alterPartitionNoTxn(
catName, dbname, name, tmpPartVals, tmpPart, queryWriteIdList, oldCdRef, table);
results.add(result);
if (oldCdRef.t != null) {
oldCds.add(oldCdRef.t);

Set<MColumnDescriptor> oldCds = new HashSet<>();
Ref<MColumnDescriptor> oldCdRef = new Ref<>();
for (Partition tmpPart : newParts) {
if (!tmpPart.getDbName().equalsIgnoreCase(dbName)) {
throw new MetaException("Invalid DB name : " + tmpPart.getDbName());
}

if (!tmpPart.getTableName().equalsIgnoreCase(tblName)) {
throw new MetaException("Invalid table name : " + tmpPart.getDbName());
}

if (writeId > 0) {
tmpPart.setWriteId(writeId);
}
oldCdRef.t = null;
Partition result = alterPartitionNoTxn(catName, dbName, tblName, mPartsMap.get(tmpPart.getValues()),
tmpPart, queryWriteIdList, oldCdRef, table);
results.add(result);
if (oldCdRef.t != null) {
oldCds.add(oldCdRef.t);
}
}
for (MColumnDescriptor oldCd : oldCds) {
removeUnusedColumnDescriptor(oldCd);
}
}
for (MColumnDescriptor oldCd : oldCds) {
removeUnusedColumnDescriptor(oldCd);
}
// commit the changes
success = commitTransaction();
Expand Down Expand Up @@ -6947,7 +6985,7 @@ public boolean grantPrivileges(PrivilegeBag privileges) throws InvalidObjectExce
}
} else if (hiveObject.getObjectType() == HiveObjectType.PARTITION) {
MPartition partObj = this.getMPartition(catName, hiveObject.getDbName(),
hiveObject.getObjectName(), hiveObject.getPartValues());
hiveObject.getObjectName(), hiveObject.getPartValues(), null);
String partName = null;
if (partObj != null) {
partName = partObj.getPartitionName();
Expand Down Expand Up @@ -6982,7 +7020,7 @@ public boolean grantPrivileges(PrivilegeBag privileges) throws InvalidObjectExce
MPartition partObj = null;
List<MPartitionColumnPrivilege> colPrivs = null;
partObj = this.getMPartition(catName, hiveObject.getDbName(), hiveObject
.getObjectName(), hiveObject.getPartValues());
.getObjectName(), hiveObject.getPartValues(), tblObj);
if (partObj == null) {
continue;
}
Expand Down Expand Up @@ -9649,9 +9687,10 @@ public Map<String, String> updatePartitionColumnStatistics(ColumnStatistics colS
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf);
Table table = ensureGetTable(catName, statsDesc.getDbName(), statsDesc.getTableName());
MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(), statsDesc.getTableName());
Table table = convertToTable(mTable);
Partition partition = convertToPart(getMPartition(
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals), false);
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable), false);
List<String> colNames = new ArrayList<>();

for(ColumnStatisticsObj statsObj : statsObjs) {
Expand All @@ -9662,7 +9701,7 @@ public Map<String, String> updatePartitionColumnStatistics(ColumnStatistics colS
.getPartName(), colNames, colStats.getEngine());

MPartition mPartition = getMPartition(
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals);
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable);
if (partition == null) {
throw new NoSuchObjectException("Partition for which stats is gathered doesn't exist.");
}
Expand Down Expand Up @@ -9978,7 +10017,7 @@ public List<ColumnStatistics> getPartitionColumnStatistics(
// TODO: this could be improved to get partitions in bulk
for (ColumnStatistics cs : allStats) {
MPartition mpart = getMPartition(catName, dbName, tableName,
Warehouse.getPartValuesFromPartName(cs.getStatsDesc().getPartName()));
Warehouse.getPartValuesFromPartName(cs.getStatsDesc().getPartName()), null);
if (mpart == null
|| !isCurrentStatsValidForTheQuery(mpart, writeIdList, false)) {
if (mpart != null) {
Expand Down Expand Up @@ -10307,7 +10346,7 @@ public boolean deletePartitionColumnStatistics(String catName, String dbName, St
}
// Note: this does not verify ACID state; called internally when removing cols/etc.
// Also called via an unused metastore API that checks for ACID tables.
MPartition mPartition = getMPartition(catName, dbName, tableName, partVals);
MPartition mPartition = getMPartition(catName, dbName, tableName, partVals, mTable);
if (mPartition == null) {
throw new NoSuchObjectException("Partition " + partName
+ " for which stats deletion is requested doesn't exist");
Expand Down

0 comments on commit ed1882e

Please sign in to comment.