Skip to content

Commit

Permalink
[Feature](TrinoConnector-Catalog) Fe supports trino-connector catal…
Browse files Browse the repository at this point in the history
…og (apache#31768)
  • Loading branch information
BePPPower authored Mar 6, 2024
1 parent bd9a74a commit 1c2cefa
Show file tree
Hide file tree
Showing 30 changed files with 1,739 additions and 20 deletions.
4 changes: 4 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("be-java-extensions/java-udf")
modules+=("be-java-extensions/jdbc-scanner")
modules+=("be-java-extensions/paimon-scanner")
# modules+=("be-java-extensions/trino-connector-scanner")
modules+=("be-java-extensions/max-compute-scanner")
modules+=("be-java-extensions/avro-scanner")
modules+=("be-java-extensions/preload-extensions")
Expand Down Expand Up @@ -674,6 +675,7 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then
mkdir -p "${DORIS_OUTPUT}/fe/log"
mkdir -p "${DORIS_OUTPUT}/fe/doris-meta"
mkdir -p "${DORIS_OUTPUT}/fe/conf/ssl"
mkdir -p "${DORIS_OUTPUT}/fe/lib/connectors"
fi

if [[ "${BUILD_SPARK_DPP}" -eq 1 ]]; then
Expand Down Expand Up @@ -756,6 +758,7 @@ EOF
extensions_modules+=("jdbc-scanner")
extensions_modules+=("hudi-scanner")
extensions_modules+=("paimon-scanner")
# extensions_modules+=("trino-connector-scanner")
extensions_modules+=("max-compute-scanner")
extensions_modules+=("avro-scanner")
extensions_modules+=("preload-extensions")
Expand All @@ -776,6 +779,7 @@ EOF
mkdir -p "${DORIS_OUTPUT}/be/log"
mkdir -p "${DORIS_OUTPUT}/be/log/tracing"
mkdir -p "${DORIS_OUTPUT}/be/storage"
mkdir -p "${DORIS_OUTPUT}/be/lib/connectors"
fi

if [[ "${BUILD_BROKER}" -eq 1 ]]; then
Expand Down
3 changes: 0 additions & 3 deletions fe/check/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,5 @@ under the License.
default="checkstyle-xpath-suppressions.xml" />
<property name="optional" value="true"/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="io.trino"/>
</module>
</module>
</module>
4 changes: 4 additions & 0 deletions fe/fe-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ under the License.
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
</dependency>
</dependencies>
<build>
<finalName>doris-fe-common</finalName>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2516,6 +2516,10 @@ public class Config extends ConfigBase {
options = {"default", "ranger-doris"})
public static String access_controller_type = "default";

@ConfField(mutable = true, masterOnly = false, description = {"指定 trino-connector catalog 的插件默认加载路径",
"Specify the default plugins loading path for the trino-connector catalog"})
public static String trino_connector_plugin_dir = EnvUtils.getDorisHome() + "/connectors";

//==========================================================================
// begin of cloud config
//==========================================================================
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.trinoconnector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.type.Type;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

public class TrinoColumnMetadata {
private final String name;
private final Type type;
private final boolean nullable;
private final String comment;
private final String extraInfo;
private final boolean hidden;
private final Map<String, Object> properties;

@JsonCreator
public TrinoColumnMetadata(
@JsonProperty("name") String name,
@JsonProperty("type") Type type,
@JsonProperty("nullable") boolean nullable,
@JsonProperty("comment") String comment,
@JsonProperty("extraInfo") String extraInfo,
@JsonProperty("hidden") boolean hidden,
@JsonProperty("properties") Map<String, Object> properties) {
Objects.requireNonNull(name, "name is null");
Objects.requireNonNull(type, "type is null");
Objects.requireNonNull(properties, "properties is null");

this.name = name.toLowerCase(Locale.ENGLISH);
this.type = type;
this.comment = comment;
this.extraInfo = extraInfo;
this.hidden = hidden;
this.properties = properties.isEmpty() ? Collections.emptyMap() :
Collections.unmodifiableMap(new LinkedHashMap<>(properties));
this.nullable = nullable;
}

@JsonProperty
public String getName() {
return name;
}

@JsonProperty
public Type getType() {
return type;
}

@JsonProperty
public boolean isNullable() {
return nullable;
}

@JsonProperty
public String getComment() {
return comment;
}

@JsonProperty
public String getExtraInfo() {
return extraInfo;
}

@JsonProperty
public boolean isHidden() {
return hidden;
}

@JsonProperty
public Map<String, Object> getProperties() {
return properties;
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder("TrinoColumnMetadata{");
sb.append("name='").append(name).append('\'');
sb.append(", type=").append(type);
sb.append(", ").append(nullable ? "nullable" : "nonnull");
if (comment != null) {
sb.append(", comment='").append(comment).append('\'');
}
if (extraInfo != null) {
sb.append(", extraInfo='").append(extraInfo).append('\'');
}
if (hidden) {
sb.append(", hidden");
}
if (!properties.isEmpty()) {
sb.append(", properties=").append(properties);
}
sb.append('}');
return sb.toString();
}

@Override
public int hashCode() {
return Objects.hash(name, type, nullable, comment, extraInfo, hidden);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
TrinoColumnMetadata other = (TrinoColumnMetadata) obj;
return Objects.equals(this.name, other.name)
&& Objects.equals(this.type, other.type)
&& Objects.equals(this.nullable, other.nullable)
&& Objects.equals(this.comment, other.comment)
&& Objects.equals(this.extraInfo, other.extraInfo)
&& Objects.equals(this.hidden, other.hidden);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// copied from https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/server/PluginManager.java

package org.apache.doris.trinoconnector;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.trino.connector.ConnectorName;
import io.trino.metadata.HandleResolver;
import io.trino.metadata.TypeRegistry;
import io.trino.server.PluginClassLoader;
import io.trino.server.PluginInstaller;
import io.trino.server.PluginManager.PluginsProvider;
import io.trino.spi.Plugin;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class TrinoConnectorPluginManager implements PluginInstaller {
private static final Logger LOG = LogManager.getLogger(TrinoConnectorPluginManager.class);

private static final ImmutableList<String> SPI_PACKAGES = ImmutableList.<String>builder()
.add("io.trino.spi.")
.add("com.fasterxml.jackson.annotation.")
.add("io.airlift.slice.")
.add("org.openjdk.jol.")
.add("io.opentelemetry.api.")
.add("io.opentelemetry.context.")
.build();

private final ConcurrentMap<ConnectorName, ConnectorFactory> connectorFactories = new ConcurrentHashMap();
private final PluginsProvider pluginsProvider;
private final TypeRegistry typeRegistry;
private final HandleResolver handleResolver;
private final AtomicBoolean pluginsLoading = new AtomicBoolean();

public TrinoConnectorPluginManager(
PluginsProvider pluginsProvider,
TypeRegistry typeRegistry,
HandleResolver handleResolver) {
this.pluginsProvider = Objects.requireNonNull(pluginsProvider, "pluginsProvider is null");
this.typeRegistry = Objects.requireNonNull(typeRegistry, "typeRegistry is null");
this.handleResolver = Objects.requireNonNull(handleResolver, "handleResolver is null");
}

@Override
public void loadPlugins() {
if (!pluginsLoading.compareAndSet(false, true)) {
return;
}

pluginsProvider.loadPlugins(this::loadPlugin, TrinoConnectorPluginManager::createClassLoader);

typeRegistry.verifyTypes();
}

private void loadPlugin(String plugin, Supplier<PluginClassLoader> createClassLoader) {
LOG.info("-- Loading plugin {} --", plugin);

PluginClassLoader pluginClassLoader = createClassLoader.get();

LOG.debug("Classpath for plugin:");
for (URL url : pluginClassLoader.getURLs()) {
LOG.debug(" {}", url.getPath());
}

handleResolver.registerClassLoader(pluginClassLoader);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
loadPlugin(pluginClassLoader);
}

LOG.info("-- Finished loading plugin {} --", plugin);
}

private void loadPlugin(PluginClassLoader pluginClassLoader) {
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
Preconditions.checkState(!plugins.isEmpty(), "No service providers of type %s in the classpath: %s",
Plugin.class.getName(), Arrays.asList(pluginClassLoader.getURLs()));

for (Plugin plugin : plugins) {
LOG.info("Installing {}", plugin.getClass().getName());
installPlugin(plugin);
}
}

@Override
public void installPlugin(Plugin plugin) {
installPluginInternal(plugin);
typeRegistry.verifyTypes();
}

private void installPluginInternal(Plugin plugin) {
for (ConnectorFactory connectorFactory : plugin.getConnectorFactories()) {
LOG.info("Registering connector {}", connectorFactory.getName());
ConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(
new ConnectorName(connectorFactory.getName()), connectorFactory);
Preconditions.checkArgument(existingConnectorFactory == null,
"Connector '%s' is already registered", connectorFactory.getName());
}
}

public static PluginClassLoader createClassLoader(String pluginName, List<URL> urls) {
ClassLoader parent = TrinoConnectorPluginManager.class.getClassLoader();
return new PluginClassLoader(pluginName, urls, parent, SPI_PACKAGES);
}

public ConcurrentMap<ConnectorName, ConnectorFactory> getConnectorFactories() {
return connectorFactories;
}

public TypeRegistry getTypeRegistry() {
return typeRegistry;
}

public HandleResolver getHandleResolver() {
return handleResolver;
}
}
Loading

0 comments on commit 1c2cefa

Please sign in to comment.