Skip to content

Commit

Permalink
[apache#832]feat(jdbc-postgresql): Support for postgresql catalog in …
Browse files Browse the repository at this point in the history
…Gravitino (apache#900)

### What changes were proposed in this pull request?
We need to support PostgreSql as the storage protocol for the catalog in
Gravetino

### Why are the changes needed?
Fix: apache#832

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
UT

---------

Co-authored-by: Clearvive <[email protected]>
  • Loading branch information
Clearvive and Clearvive authored Dec 6, 2023
1 parent 4cd6021 commit c9d18da
Show file tree
Hide file tree
Showing 34 changed files with 1,603 additions and 191 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ tasks {
dependsOn(
":catalogs:catalog-hive:copyLibAndConfig",
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
":catalogs:catalog-jdbc-mysql:copyLibAndConfig"
":catalogs:catalog-jdbc-mysql:copyLibAndConfig",
":catalogs:catalog-jdbc-postgresql:copyLibAndConfig"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public void initialize(Map<String, String> conf) throws RuntimeException {

JdbcConfig jdbcConfig = new JdbcConfig(resultConf);
this.dataSource = DataSourceUtils.createDataSource(jdbcConfig);
this.databaseOperation.initialize(dataSource, exceptionConverter);
this.tableOperation.initialize(dataSource, exceptionConverter, jdbcTypeConverter);
this.databaseOperation.initialize(dataSource, exceptionConverter, resultConf);
this.tableOperation.initialize(dataSource, exceptionConverter, jdbcTypeConverter, resultConf);
this.jdbcTablePropertiesMetadata = new JdbcTablePropertiesMetadata();
this.jdbcSchemaPropertiesMetadata = new JdbcSchemaPropertiesMetadata();
}
Expand Down Expand Up @@ -281,6 +281,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
.withAuditInfo(load.auditInfo())
.withName(tableName)
.withColumns(load.columns())
.withAuditInfo(load.auditInfo())
// Remove id from comment
.withComment(StringIdentifier.removeIdFromComment(load.comment()))
.withProperties(properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class JdbcCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata
private static final List<String> JDBC_PROPERTIES =
ImmutableList.of(
JdbcConfig.JDBC_URL.getKey(),
JdbcConfig.JDBC_DATABASE.getKey(),
JdbcConfig.USERNAME.getKey(),
JdbcConfig.PASSWORD.getKey(),
JdbcConfig.POOL_MIN_SIZE.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ public class JdbcConfig extends Config {
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> JDBC_DATABASE =
new ConfigBuilder("jdbc-database")
.doc("The database of the jdbc connection")
.version("0.3.0")
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> USERNAME =
new ConfigBuilder("jdbc-user")
.doc("The username of the Jdbc connection")
Expand Down Expand Up @@ -51,6 +58,10 @@ public String getJdbcUrl() {
return get(JDBC_URL);
}

public String getJdbcDatabase() {
return get(JDBC_DATABASE);
}

public String getUsername() {
return get(USERNAME);
}
Expand All @@ -70,6 +81,5 @@ public int getPoolMaxSize() {
public JdbcConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
assert null != getJdbcUrl();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ public interface DatabaseOperation {
*
* @param dataSource The data source to use for the operations.
* @param exceptionMapper The exception mapper to use for the operations.
* @param conf The configuration for the operations.
*/
void initialize(DataSource dataSource, JdbcExceptionConverter exceptionMapper);
void initialize(
DataSource dataSource, JdbcExceptionConverter exceptionMapper, Map<String, String> conf);

/**
* Creates a database with the given name and comment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public abstract class JdbcDatabaseOperations implements DatabaseOperation {
protected JdbcExceptionConverter exceptionMapper;

@Override
public void initialize(DataSource dataSource, JdbcExceptionConverter exceptionMapper) {
public void initialize(
DataSource dataSource, JdbcExceptionConverter exceptionMapper, Map<String, String> conf) {
this.dataSource = dataSource;
this.exceptionMapper = exceptionMapper;
}
Expand All @@ -38,7 +39,7 @@ public void initialize(DataSource dataSource, JdbcExceptionConverter exceptionMa
public void create(String databaseName, String comment, Map<String, String> properties)
throws SchemaAlreadyExistsException {
LOG.info("Beginning to create database {}", databaseName);
try (final Connection connection = this.dataSource.getConnection()) {
try (final Connection connection = getConnection()) {
JdbcConnectorUtils.executeUpdate(
connection, generateCreateDatabaseSql(databaseName, comment, properties));
LOG.info("Finished creating database {}", databaseName);
Expand All @@ -50,7 +51,7 @@ public void create(String databaseName, String comment, Map<String, String> prop
@Override
public void delete(String databaseName, boolean cascade) throws NoSuchSchemaException {
LOG.info("Beginning to drop database {}", databaseName);
try (final Connection connection = this.dataSource.getConnection()) {
try (final Connection connection = getConnection()) {
JdbcConnectorUtils.executeUpdate(connection, generateDropDatabaseSql(databaseName, cascade));
LOG.info("Finished dropping database {}", databaseName);
} catch (final SQLException se) {
Expand Down Expand Up @@ -89,4 +90,8 @@ protected abstract String generateCreateDatabaseSql(
* @return the SQL statement to drop a database with the given name.
*/
protected abstract String generateDropDatabaseSql(String databaseName, boolean cascade);

protected Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@

/** Operations for managing tables in a JDBC data store. */
public abstract class JdbcTableOperations implements TableOperation {
public static final String PRIMARY_KEY = "PRIMARY KEY";

public static final String COMMENT = "COMMENT";
public static final String SPACE = " ";

public static final String NOT_NULL = "NOT NULL";
public static final String DEFAULT = "DEFAULT";

protected static final Logger LOG = LoggerFactory.getLogger(JdbcTableOperations.class);

Expand All @@ -40,7 +47,8 @@ public abstract class JdbcTableOperations implements TableOperation {
public void initialize(
DataSource dataSource,
JdbcExceptionConverter exceptionMapper,
JdbcTypeConverter jdbcTypeConverter) {
JdbcTypeConverter jdbcTypeConverter,
Map<String, String> conf) {
this.dataSource = dataSource;
this.exceptionMapper = exceptionMapper;
this.typeConverter = jdbcTypeConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public interface TableOperation {
* @param dataSource The data source to use for the operations.
* @param exceptionMapper The exception mapper to use for the operations.
* @param jdbcTypeConverter The type converter to use for the operations.
* @param conf The configuration to use for the operations.
*/
void initialize(
DataSource dataSource,
JdbcExceptionConverter exceptionMapper,
JdbcTypeConverter jdbcTypeConverter);
JdbcTypeConverter jdbcTypeConverter,
Map<String, String> conf);

/**
* @param databaseName The name of the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.NoSuchElementException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -15,7 +14,6 @@ public class TestJdbcConfig {
@Test
public void testCreateDataSourceConfig() {
HashMap<String, String> properties = Maps.newHashMap();
Assertions.assertThrows(NoSuchElementException.class, () -> new JdbcConfig(properties));
properties.put(JdbcConfig.JDBC_URL.getKey(), "jdbc:sqlite::memory:");
Assertions.assertDoesNotThrow(() -> new JdbcConfig(properties));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import javax.sql.DataSource;
Expand Down Expand Up @@ -65,7 +66,7 @@ private static void createDataSource() {

private static void createJdbcDatabaseOperations() {
JDBC_DATABASE_OPERATIONS = new SqliteDatabaseOperations(BASE_FILE_DIR.getPath());
JDBC_DATABASE_OPERATIONS.initialize(DATA_SOURCE, EXCEPTION_MAPPER);
JDBC_DATABASE_OPERATIONS.initialize(DATA_SOURCE, EXCEPTION_MAPPER, Collections.emptyMap());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,7 +87,8 @@ private static void createDataSource() {

private static void createJdbcDatabaseOperations() {
JDBC_TABLE_OPERATIONS = new SqliteTableOperations();
JDBC_TABLE_OPERATIONS.initialize(DATA_SOURCE, EXCEPTION_CONVERTER, TYPE_CONVERTER);
JDBC_TABLE_OPERATIONS.initialize(
DATA_SOURCE, EXCEPTION_CONVERTER, TYPE_CONVERTER, Collections.emptyMap());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public String generateCreateDatabaseSql(
// Append options
if (MapUtils.isNotEmpty(properties)) {
// TODO #804 Properties will be unified in the future.
LOG.warn("Ignoring properties option on database create.");
throw new UnsupportedOperationException("Properties are not supported yet");
// sqlBuilder.append("\n");
// sqlBuilder.append(
// properties.entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@
public class MysqlTableOperations extends JdbcTableOperations {

public static final String AUTO_INCREMENT = "AUTO_INCREMENT";
public static final String PRIMARY_KEY = "PRIMARY KEY";

private static final String COMMENT = "COMMENT";
private static final String SPACE = " ";

private static final String NOT_NULL = "NOT NULL";
private static final String DEFAULT = "DEFAULT";

@Override
public JdbcTable load(String databaseName, String tableName) throws NoSuchTableException {
Expand Down Expand Up @@ -244,7 +237,7 @@ protected String generateCreateTableSql(
// Add table properties if any
if (MapUtils.isNotEmpty(properties)) {
// TODO #804 Properties will be unified in the future.
LOG.warn("Ignoring properties option on table create.");
throw new UnsupportedOperationException("Properties are not supported yet");
// StringJoiner joiner = new StringJoiner(SPACE + SPACE);
// for (Map.Entry<String, String> entry : properties.entrySet()) {
// joiner.add(entry.getKey() + "=" + entry.getValue());
Expand Down Expand Up @@ -303,7 +296,7 @@ protected String generateAlterTableSql(
setProperties.add(((TableChange.SetProperty) change));
} else if (change instanceof TableChange.RemoveProperty) {
// mysql does not support deleting table attributes, it can be replaced by Set Property
throw new UnsupportedOperationException("Remove property is not supported yet");
throw new IllegalArgumentException("Remove property is not supported yet");
} else if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
lazyLoadCreateTable = getOrCreateTable(databaseName, tableName, lazyLoadCreateTable);
Expand Down Expand Up @@ -343,12 +336,9 @@ protected String generateAlterTableSql(
CreateTable createTable = getOrCreateTable(databaseName, tableName, lazyLoadCreateTable);
Map<String, String> properties =
parseOrderedKeyValuePairs(createTable.getTableOptionsStrings().toArray(new String[0]));
String oldComment = properties.get(COMMENT);
if (StringUtils.isNotEmpty(oldComment)) {
StringIdentifier identifier = StringIdentifier.fromComment(oldComment);
if (null != identifier) {
newComment = StringIdentifier.addToComment(identifier, newComment);
}
StringIdentifier identifier = StringIdentifier.fromComment(properties.get(COMMENT));
if (null != identifier) {
newComment = StringIdentifier.addToComment(identifier, newComment);
}
}
alterSql.add("COMMENT '" + newComment + "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
# Copyright 2023 Datastrato.
# This software is licensed under the Apache License version 2.
#
# jdbc-url: jdbc:postgresql://localhost:5432/strato
# jdbc-url: jdbc:mysql://localhost:3306/
# jdbc-user: strato
# jdbc-password: strato
50 changes: 50 additions & 0 deletions catalogs/catalog-jdbc-postgresql/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
description = "catalog-jdbc-postgresql"

plugins {
`maven-publish`
id("java")
id("idea")
}

dependencies {
implementation(project(":common"))
implementation(project(":core"))
implementation(project(":api"))
implementation(project(":catalogs:catalog-jdbc-common"))
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
implementation(libs.jsqlparser)
}

tasks {
val copyDepends by registering(Copy::class) {
from(configurations.runtimeClasspath)
into("build/libs")
}
val copyCatalogLibs by registering(Copy::class) {
dependsOn(copyDepends, "build")
from("build/libs")
into("$rootDir/distribution/package/catalogs/jdbc-postgresql/libs")
}

val copyCatalogConfig by registering(Copy::class) {
from("src/main/resources")
into("$rootDir/distribution/package/catalogs/jdbc-postgresql/conf")

include("jdbc-postgresql.conf")

exclude { details ->
details.file.isDirectory()
}
}

val copyLibAndConfig by registering(Copy::class) {
dependsOn(copyCatalogLibs, copyCatalogConfig)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.postgresql;

import com.datastrato.gravitino.catalog.jdbc.JdbcCatalog;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import com.datastrato.gravitino.catalog.postgresql.converter.PostgreSqlExceptionConverter;
import com.datastrato.gravitino.catalog.postgresql.converter.PostgreSqlTypeConverter;
import com.datastrato.gravitino.catalog.postgresql.operation.PostgreSqlSchemaOperations;
import com.datastrato.gravitino.catalog.postgresql.operation.PostgreSqlTableOperations;

public class PostgreSqlCatalog extends JdbcCatalog {

@Override
public String shortName() {
return "jdbc-postgresql";
}

@Override
protected JdbcExceptionConverter createExceptionConverter() {
return new PostgreSqlExceptionConverter();
}

@Override
protected JdbcTypeConverter createJdbcTypeConverter() {
return new PostgreSqlTypeConverter();
}

@Override
protected JdbcDatabaseOperations createJdbcDatabaseOperations() {
return new PostgreSqlSchemaOperations();
}

@Override
protected JdbcTableOperations createJdbcTableOperations() {
return new PostgreSqlTableOperations();
}
}
Loading

0 comments on commit c9d18da

Please sign in to comment.