Skip to content

Commit

Permalink
[FLINK-20563][hive] Support built-in functions for Hive versions prio…
Browse files Browse the repository at this point in the history
…r to 1.2.0

This closes apache#14379
  • Loading branch information
lirui-apache authored and KurtYoung committed Mar 26, 2021
1 parent 30e0e1e commit 2aab2ef
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
Expand Down Expand Up @@ -85,6 +88,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

/** Shim for Hive version 1.0.0. */
public class HiveShimV100 implements HiveShim {
Expand Down Expand Up @@ -282,18 +286,42 @@ public List<FieldSchema> getFieldsFromDeserializer(

@Override
public Set<String> listBuiltInFunctions() {
// FunctionInfo doesn't have isBuiltIn() API to tell whether it's a builtin function or not
// prior to Hive 1.2.0
throw new UnsupportedOperationException(
"Listing built in functions are not supported until Hive 1.2.0");
try {
Method method =
FunctionRegistry.class.getDeclaredMethod("getFunctionNames", boolean.class);
method.setAccessible(true);
// don't search HMS cause we're only interested in built-in functions
Set<String> names = (Set<String>) method.invoke(null, false);

return names.stream()
.filter(n -> getBuiltInFunctionInfo(n).isPresent())
.collect(Collectors.toSet());
} catch (Exception ex) {
throw new CatalogException("Failed to invoke FunctionRegistry.getFunctionNames()", ex);
}
}

@Override
public Optional<FunctionInfo> getBuiltInFunctionInfo(String name) {
// FunctionInfo doesn't have isBuiltIn() API to tell whether it's a builtin function or not
// prior to Hive 1.2.0
throw new UnsupportedOperationException(
"Getting built in functions are not supported until Hive 1.2.0");
// filter out catalog functions since they're not built-in functions and can cause problems
// for tests
if (isCatalogFunctionName(name)) {
return Optional.empty();
}
try {
Optional<FunctionInfo> functionInfo =
Optional.ofNullable(FunctionRegistry.getFunctionInfo(name));
if (functionInfo.isPresent() && isBuiltInFunctionInfo(functionInfo.get())) {
return functionInfo;
} else {
return Optional.empty();
}
} catch (SemanticException e) {
throw new FlinkHiveException(
String.format("Failed getting function info for %s", name), e);
} catch (NullPointerException e) {
return Optional.empty();
}
}

@Override
Expand Down Expand Up @@ -381,6 +409,14 @@ public BulkWriter.Factory<RowData> createOrcBulkWriterFactory(
return new OrcNoHiveBulkWriterFactory(conf, schema, fieldTypes);
}

boolean isBuiltInFunctionInfo(FunctionInfo info) {
return info.isNative();
}

private static boolean isCatalogFunctionName(String funcName) {
return FunctionUtils.isQualifiedFunctionName(funcName);
}

Optional<Writable> javaToWritable(@Nonnull Object value) {
Writable writable = null;
// in case value is already a Writable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.catalog.hive.client;

import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.catalog.stats.Date;
Expand All @@ -32,14 +31,11 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.thrift.TException;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -183,33 +179,7 @@ public Set<String> listBuiltInFunctions() {
}

@Override
public Optional<FunctionInfo> getBuiltInFunctionInfo(String name) {
// filter out catalog functions since they're not built-in functions and can cause problems
// for tests
if (isCatalogFunctionName(name)) {
return Optional.empty();
}
try {
Optional<FunctionInfo> functionInfo =
Optional.ofNullable(FunctionRegistry.getFunctionInfo(name));
if (functionInfo.isPresent() && isBuiltInFunctionInfo(functionInfo.get())) {
return functionInfo;
} else {
return Optional.empty();
}
} catch (SemanticException e) {
throw new FlinkHiveException(
String.format("Failed getting function info for %s", name), e);
} catch (NullPointerException e) {
return Optional.empty();
}
}

private static boolean isCatalogFunctionName(String funcName) {
return FunctionUtils.isQualifiedFunctionName(funcName);
}

private boolean isBuiltInFunctionInfo(FunctionInfo info) {
boolean isBuiltInFunctionInfo(FunctionInfo info) {
try {
Method method = FunctionInfo.class.getMethod("isBuiltIn", null);
return (boolean) method.invoke(info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,62 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;

import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

import java.util.List;

import static org.apache.flink.table.HiveVersionTestUtil.HIVE_120_OR_LATER;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V1_2_0;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V1_0_1;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V1_1_0;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V1_2_1;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V2_0_0;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V2_1_1;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V2_2_0;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V2_3_4;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V3_1_1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assume.assumeTrue;
import static org.junit.Assert.fail;

/** Test for {@link HiveModule}. */
public class HiveModuleTest {

@BeforeClass
public static void init() {
assumeTrue(HIVE_120_OR_LATER);
}

@Rule public final LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

@Test
public void testNumberOfBuiltinFunctions() {
String hiveVersion = HiveShimLoader.getHiveVersion();
HiveModule hiveModule = new HiveModule(hiveVersion);
verifyNumBuiltInFunctions(hiveVersion, hiveModule);

// creating functions shouldn't change the number of built in functions
TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tableEnv.executeSql("create function myudf as 'org.apache.hadoop.hive.ql.udf.UDFPI'");
tableEnv.executeSql(
"create function mygenericudf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs'");
tableEnv.executeSql(
"create function myudaf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax'");
tableEnv.executeSql(
"create function myudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
verifyNumBuiltInFunctions(hiveVersion, hiveModule);
// explicitly verify that HiveModule doesn't consider the created functions as built-in
// functions
assertFalse(hiveModule.getFunctionDefinition("myudf").isPresent());
assertFalse(hiveModule.getFunctionDefinition("mygenericudf").isPresent());
assertFalse(hiveModule.getFunctionDefinition("myudaf").isPresent());
assertFalse(hiveModule.getFunctionDefinition("myudtf").isPresent());
}

private void verifyNumBuiltInFunctions(String hiveVersion, HiveModule hiveModule) {
switch (hiveVersion) {
case HIVE_VERSION_V1_2_0:
assertEquals(229, hiveModule.listFunctions().size());
case HIVE_VERSION_V1_0_1:
assertEquals(197, hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V1_1_0:
assertEquals(202, hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V1_2_1:
assertEquals(222, hiveModule.listFunctions().size());
break;
case HIVE_VERSION_V2_0_0:
assertEquals(233, hiveModule.listFunctions().size());
Expand All @@ -82,6 +103,8 @@ public void testNumberOfBuiltinFunctions() {
case HIVE_VERSION_V3_1_1:
assertEquals(296, hiveModule.listFunctions().size());
break;
default:
fail("Unknown test version " + hiveVersion);
}
}

Expand Down

0 comments on commit 2aab2ef

Please sign in to comment.