Skip to content

Commit

Permalink
[FLINK-12649][hive] Add a shim layer to support multiple versions of …
Browse files Browse the repository at this point in the history
…Hive Metastore

To add shim layer for HMS client, in order to support different versions of HMS.

This closes apache#8564.
  • Loading branch information
lirui-apache authored and bowenli86 committed May 31, 2019
1 parent c691c13 commit 038ab38
Show file tree
Hide file tree
Showing 8 changed files with 562 additions and 24 deletions.
21 changes: 20 additions & 1 deletion flink-connectors/flink-connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ under the License.
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
Expand Down Expand Up @@ -385,7 +386,6 @@ under the License.
<include>commons-beanutils:commons-beanutils</include>
<include>com.fasterxml.jackson.core:*</include>
<include>com.jolbox:bonecp</include>
<include>org.apache.hive:*</include>
<include>org.apache.thrift:libthrift</include>
<include>org.datanucleus:*</include>
<include>org.antlr:antlr-runtime</include>
Expand Down Expand Up @@ -423,4 +423,23 @@ under the License.
</plugin>
</plugins>
</build>

<profiles>
<!-- Activate this profile with -Phive-1.2.1 to build and test against hive-1.2.1 -->
<profile>
<id>hive-1.2.1</id>
<properties>
<hive.version>1.2.1</hive.version>
<hivemetastore.hadoop.version>2.6.0</hivemetastore.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.jdo</groupId>
<artifactId>jdo-api</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@
import org.apache.flink.util.StringUtils;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
Expand Down Expand Up @@ -108,7 +105,7 @@ public class HiveCatalog extends AbstractCatalog {

protected final HiveConf hiveConf;

protected IMetaStoreClient client;
protected HiveMetastoreClientWrapper client;

public HiveCatalog(String catalogName, String hivemetastoreURI) {
this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
Expand All @@ -133,23 +130,10 @@ private static HiveConf getHiveConf(String hiveMetastoreURI) {
return hiveConf;
}

private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
try {
return RetryingMetaStoreClient.getProxy(
hiveConf,
null,
null,
HiveMetaStoreClient.class.getName(),
true);
} catch (MetaException e) {
throw new CatalogException("Failed to create Hive metastore client", e);
}
}

@Override
public void open() throws CatalogException {
if (client == null) {
client = getMetastoreClient(hiveConf);
client = HiveMetastoreClientFactory.create(hiveConf);
LOG.info("Connected to Hive metastore");
}

Expand Down Expand Up @@ -444,10 +428,7 @@ public List<String> listViews(String databaseName) throws DatabaseNotExistExcept
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");

try {
return client.getTables(
databaseName,
null, // table pattern
TableType.VIRTUAL_VIEW);
return client.getViews(databaseName);
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(getName(), databaseName);
} catch (TException e) {
Expand Down Expand Up @@ -996,7 +977,8 @@ public List<String> listFunctions(String databaseName) throws DatabaseNotExistEx
}

try {
return client.getFunctions(databaseName, null);
// hive-1.x requires the pattern not being null, so pass a pattern that matches any name
return client.getFunctions(databaseName, ".*");
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list functions in database %s", databaseName), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.catalog.hive;

import org.apache.hadoop.hive.conf.HiveConf;

/**
* Factory to create Hive metastore client.
*/
public class HiveMetastoreClientFactory {

private HiveMetastoreClientFactory() {
}

public static HiveMetastoreClientWrapper create(HiveConf hiveConf) {
return new HiveMetastoreClientWrapper(hiveConf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.catalog.hive;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

/**
* Wrapper class for Hive Metastore Client, which embeds a HiveShim layer to handle different Hive versions.
* Methods provided mostly conforms to IMetaStoreClient interfaces except those that require shims.
*/
@Internal
public class HiveMetastoreClientWrapper implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);

private final IMetaStoreClient client;
private final HiveConf hiveConf;

public HiveMetastoreClientWrapper(HiveConf hiveConf) {
this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf cannot be null");
client = createMetastoreClient();
}

@Override
public void close() {
client.close();
}

public List<String> getDatabases(String pattern) throws MetaException, TException {
return client.getDatabases(pattern);
}

public List<String> getAllDatabases() throws MetaException, TException {
return client.getAllDatabases();
}

public List<String> getAllTables(String databaseName) throws MetaException, TException, UnknownDBException {
return client.getAllTables(databaseName);
}

public void dropTable(String databaseName, String tableName)
throws MetaException, TException, NoSuchObjectException {
client.dropTable(databaseName, tableName);
}

public void dropTable(String dbName, String tableName, boolean deleteData, boolean ignoreUnknownTable)
throws MetaException, NoSuchObjectException, TException {
client.dropTable(dbName, tableName, deleteData, ignoreUnknownTable);
}

public boolean tableExists(String databaseName, String tableName)
throws MetaException, TException, UnknownDBException {
return client.tableExists(databaseName, tableName);
}

public Database getDatabase(String name) throws NoSuchObjectException, MetaException, TException {
return client.getDatabase(name);
}

public Table getTable(String databaseName, String tableName)
throws MetaException, NoSuchObjectException, TException {
return client.getTable(databaseName, tableName);
}

public Partition add_partition(Partition partition)
throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
return client.add_partition(partition);
}

public int add_partitions(List<Partition> partitionList)
throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
return client.add_partitions(partitionList);
}

public Partition getPartition(String databaseName, String tableName, List<String> list)
throws NoSuchObjectException, MetaException, TException {
return client.getPartition(databaseName, tableName, list);
}

public List<String> listPartitionNames(String databaseName, String tableName, short maxPartitions)
throws MetaException, TException {
return client.listPartitionNames(databaseName, tableName, maxPartitions);
}

public List<String> listPartitionNames(String databaseName, String tableName, List<String> partitionValues,
short maxPartitions) throws MetaException, TException, NoSuchObjectException {
return client.listPartitionNames(databaseName, tableName, partitionValues, maxPartitions);
}

public void createTable(Table table)
throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException {
client.createTable(table);
}

public void alter_table(String databaseName, String tableName, Table table)
throws InvalidOperationException, MetaException, TException {
client.alter_table(databaseName, tableName, table);
}

public void createDatabase(Database database)
throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
client.createDatabase(database);
}

public void dropDatabase(String name, boolean deleteData, boolean ignoreIfNotExists)
throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
client.dropDatabase(name, deleteData, ignoreIfNotExists);
}

public void alterDatabase(String name, Database database) throws NoSuchObjectException, MetaException, TException {
client.alterDatabase(name, database);
}

public boolean dropPartition(String databaseName, String tableName, List<String> partitionValues, boolean deleteData)
throws NoSuchObjectException, MetaException, TException {
return client.dropPartition(databaseName, tableName, partitionValues, deleteData);
}

public void alter_partition(String databaseName, String tableName, Partition partition)
throws InvalidOperationException, MetaException, TException {
client.alter_partition(databaseName, tableName, partition);
}

public void renamePartition(String databaseName, String tableName, List<String> partitionValues, Partition partition)
throws InvalidOperationException, MetaException, TException {
client.renamePartition(databaseName, tableName, partitionValues, partition);
}

public void createFunction(Function function) throws InvalidObjectException, MetaException, TException {
client.createFunction(function);
}

public void alterFunction(String databaseName, String functionName, Function function)
throws InvalidObjectException, MetaException, TException {
client.alterFunction(databaseName, functionName, function);
}

public void dropFunction(String databaseName, String functionName)
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException, TException {
client.dropFunction(databaseName, functionName);
}

public List<String> getFunctions(String databaseName, String pattern) throws MetaException, TException {
return client.getFunctions(databaseName, pattern);
}

List<ColumnStatisticsObj> getTableColumnStatistics(String databaseName, String tableName, List<String> columnNames)
throws NoSuchObjectException, MetaException, TException {
return client.getTableColumnStatistics(databaseName, tableName, columnNames);
}

Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(String dbName, String tableName,
List<String> partNames, List<String> colNames)
throws NoSuchObjectException, MetaException, TException {
return client.getPartitionColumnStatistics(dbName, tableName, partNames, colNames);
}

public boolean updateTableColumnStatistics(ColumnStatistics columnStatistics)
throws NoSuchObjectException, InvalidObjectException, MetaException, TException, InvalidInputException {
return client.updateTableColumnStatistics(columnStatistics);
}

public List<Partition> listPartitions(String dbName, String tblName, List<String> partVals, short max) throws TException {
return client.listPartitions(dbName, tblName, partVals, max);
}

public List<Partition> listPartitions(String dbName, String tblName, short max) throws TException {
return client.listPartitions(dbName, tblName, max);
}

//-------- Start of shimmed methods ----------

public List<String> getViews(String databaseName) throws UnknownDBException, TException {
HiveShim hiveShim = HiveShimLoader.loadHiveShim();
return hiveShim.getViews(client, databaseName);
}

private IMetaStoreClient createMetastoreClient() {
HiveShim hiveShim = HiveShimLoader.loadHiveShim();
return hiveShim.getHiveMetastoreClient(hiveConf);
}

public Function getFunction(String databaseName, String functionName) throws MetaException, TException {
HiveShim hiveShim = HiveShimLoader.loadHiveShim();
return hiveShim.getFunction(client, databaseName, functionName);
}
}
Loading

0 comments on commit 038ab38

Please sign in to comment.