Skip to content

Commit

Permalink
Split airbyte-db and move db dev commands to gradle (airbytehq#5616)
Browse files Browse the repository at this point in the history
# Summary

- A follow-up PR for airbytehq#5543.
- This PR separates the `airbyte-db` project to two modules:
  - `lib` is the original `airbyte-db`.
  - `jooq` is for jOOQ code generation.
- This is necessary because the jOOQ generator requires a custom database implementation that can run Flyway migration. So the code generator logic needs to depend on the compilation of the original `airbyte-db` project.

# Commits
* Separate db to lib and jooq modules
* Update dependencies
* Add jobs db migrator test
* Fix compose build
* Add migration dev center
* Add schema dump task
* Update airbyte-db/lib/README.md
  * Co-authored-by: Davin Chia <[email protected]>
* Update readme
* Remove bom dependency
* Update readme
* Use jooq code in db config persistence
* Remove AirbyteConfigsTable

Co-authored-by: Davin Chia <[email protected]>
  • Loading branch information
tuliren and davinchia authored Aug 26, 2021
1 parent b0be38f commit b9e1997
Show file tree
Hide file tree
Showing 122 changed files with 629 additions and 236 deletions.
3 changes: 2 additions & 1 deletion airbyte-config/persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
dependencies {
implementation group: 'commons-io', name: 'commons-io', version: '2.7'

implementation project(':airbyte-db')
implementation project(':airbyte-db:lib')
implementation project(':airbyte-db:jooq')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:init')
implementation project(':airbyte-json-validation')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.UPDATED_AT;
import static io.airbyte.db.instance.configs.jooq.Tables.AIRBYTE_CONFIGS;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.select;

Expand All @@ -42,8 +37,7 @@
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -73,7 +67,7 @@ public DatabaseConfigPersistence(Database database) {
*/
public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistence) throws IOException {
database.transaction(ctx -> {
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS));
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where());
if (isInitialized) {
LOGGER.info("Config database is not empty; skipping config seeding and copying");
return null;
Expand All @@ -86,7 +80,7 @@ public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistenc
} catch (IOException e) {
throw new SQLException(e);
}
Timestamp timestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
OffsetDateTime timestamp = OffsetDateTime.now();

int insertionCount = seedConfigs.entrySet().stream().map(entry -> {
String configType = entry.getKey();
Expand All @@ -111,7 +105,7 @@ public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz
throws ConfigNotFoundException, JsonValidationException, IOException {
Result<Record> result = database.query(ctx -> ctx.select(asterisk())
.from(AIRBYTE_CONFIGS)
.where(CONFIG_TYPE.eq(configType.name()), CONFIG_ID.eq(configId))
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.fetch());

if (result.isEmpty()) {
Expand All @@ -120,18 +114,18 @@ public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz
throw new IllegalStateException(String.format("Multiple %s configs found for ID %s: %s", configType, configId, result));
}

return Jsons.deserialize(result.get(0).get(CONFIG_BLOB).data(), clazz);
return Jsons.deserialize(result.get(0).get(AIRBYTE_CONFIGS.CONFIG_BLOB).data(), clazz);
}

@Override
public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) throws IOException {
Result<Record> results = database.query(ctx -> ctx.select(asterisk())
.from(AIRBYTE_CONFIGS)
.where(CONFIG_TYPE.eq(configType.name()))
.orderBy(CONFIG_TYPE, CONFIG_ID)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()))
.orderBy(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID)
.fetch());
return results.stream()
.map(record -> Jsons.deserialize(record.get(CONFIG_BLOB).data(), clazz))
.map(record -> Jsons.deserialize(record.get(AIRBYTE_CONFIGS.CONFIG_BLOB).data(), clazz))
.collect(Collectors.toList());
}

Expand All @@ -142,15 +136,15 @@ public <T> void writeConfig(AirbyteConfig configType, String configId, T config)
database.transaction(ctx -> {
boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
.where(CONFIG_TYPE.eq(configType.name()), CONFIG_ID.eq(configId)));
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)));

Timestamp timestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
OffsetDateTime timestamp = OffsetDateTime.now();

if (isExistingConfig) {
int updateCount = ctx.update(AIRBYTE_CONFIGS)
.set(CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(UPDATED_AT, timestamp)
.where(CONFIG_TYPE.eq(configType.name()), CONFIG_ID.eq(configId))
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.execute();
if (updateCount != 0 && updateCount != 1) {
LOGGER.warn("{} config {} has been updated; updated record count: {}", configType, configId, updateCount);
Expand All @@ -160,11 +154,11 @@ public <T> void writeConfig(AirbyteConfig configType, String configId, T config)
}

int insertionCount = ctx.insertInto(AIRBYTE_CONFIGS)
.set(CONFIG_ID, configId)
.set(CONFIG_TYPE, configType.name())
.set(CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(CREATED_AT, timestamp)
.set(UPDATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.CONFIG_ID, configId)
.set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType.name())
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(AIRBYTE_CONFIGS.CREATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.execute();
if (insertionCount != 1) {
LOGGER.warn("{} config {} has been inserted; insertion record count: {}", configType, configId, insertionCount);
Expand All @@ -179,11 +173,11 @@ public void deleteConfig(AirbyteConfig configType, String configId) throws IOExc
database.transaction(ctx -> {
boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
.where(CONFIG_TYPE.eq(configType.name()), CONFIG_ID.eq(configId)));
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)));

if (isExistingConfig) {
ctx.deleteFrom(AIRBYTE_CONFIGS)
.where(CONFIG_TYPE.eq(configType.name()), CONFIG_ID.eq(configId))
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.execute();
return null;
}
Expand All @@ -199,7 +193,7 @@ public <T> void replaceAllConfigs(Map<AirbyteConfig, Stream<T>> configs, boolean

LOGGER.info("Replacing all configs");

Timestamp timestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
OffsetDateTime timestamp = OffsetDateTime.now();
int insertionCount = database.transaction(ctx -> {
ctx.truncate(AIRBYTE_CONFIGS).restartIdentity().execute();

Expand All @@ -217,18 +211,18 @@ public <T> void replaceAllConfigs(Map<AirbyteConfig, Stream<T>> configs, boolean
/**
* @return the number of inserted records for convenience, which is always 1.
*/
private int insertConfigRecord(DSLContext ctx, Timestamp timestamp, String configType, JsonNode configJson, String idFieldName) {
private int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String idFieldName) {
String configId = idFieldName == null
? UUID.randomUUID().toString()
: configJson.get(idFieldName).asText();
LOGGER.info("Inserting {} record {}", configType, configId);

ctx.insertInto(AIRBYTE_CONFIGS)
.set(CONFIG_ID, configId)
.set(CONFIG_TYPE, configType)
.set(CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson)))
.set(CREATED_AT, timestamp)
.set(UPDATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.CONFIG_ID, configId)
.set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson)))
.set(AIRBYTE_CONFIGS.CREATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.execute();
return 1;
}
Expand All @@ -239,11 +233,11 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {

Map<String, Result<Record>> results = database.query(ctx -> ctx.select(asterisk())
.from(AIRBYTE_CONFIGS)
.orderBy(CONFIG_TYPE, CONFIG_ID)
.fetchGroups(CONFIG_TYPE));
.orderBy(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID)
.fetchGroups(AIRBYTE_CONFIGS.CONFIG_TYPE));
return results.entrySet().stream().collect(Collectors.toMap(
Entry::getKey,
e -> e.getValue().stream().map(r -> Jsons.deserialize(r.get(CONFIG_BLOB).data()))));
e -> e.getValue().stream().map(r -> Jsons.deserialize(r.get(AIRBYTE_CONFIGS.CONFIG_BLOB).data()))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.count;
import static org.jooq.impl.DSL.table;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
Expand All @@ -38,7 +38,6 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.util.List;
Expand Down Expand Up @@ -171,15 +170,10 @@ public void testDumpConfigs() throws Exception {
}

private void assertRecordCount(int expectedCount) throws Exception {
Result<Record1<Integer>> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(AIRBYTE_CONFIGS).fetch());
Result<Record1<Integer>> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch());
assertEquals(expectedCount, recordCount.get(0).value1());
}

private void assertHasWorkspace(StandardWorkspace workspace) throws Exception {
assertEquals(workspace,
configPersistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspace.getWorkspaceId().toString(), StandardWorkspace.class));
}

private void assertHasSource(StandardSourceDefinition source) throws Exception {
assertEquals(source, configPersistence
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
Expand Down
62 changes: 0 additions & 62 deletions airbyte-db/README.md

This file was deleted.

86 changes: 86 additions & 0 deletions airbyte-db/jooq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# jOOQ Code Generation

## How to Use
This module generates jOOQ code for the configs and jobs database. To use the generated code, add the following dependency:

```gradle
dependencies {
implementation project(':airbyte-db:jooq')
}
```

The generated code exists in the package `io.airbyte.db.instance.<db-name>.jooq` under the directory `build/generated/<db-name>Database/src/main/java`.

## Code Generation
Gradle plugin `nu.studer.jooq` is used for jOOQ code generation. See [here](https://github.com/etiennestuder/gradle-jooq-plugin) for details.

It is necessary to separate this module from the `lib` module, because we use a custom database (`FlywayMigrationDatabase`) that runs Flyway migration first for the code generator. This implementation needs to be compiled before it can be used.

The code will be automatically generated when this module is compiled. To manually update the generated code, run the `compileJava` task:

```sh
SUB_BUILD=PLATFORM ./gradlew :airbyte-db:jooq:compileJava
```

Or run the following tasks for individual database:

```sh
# for configs database
SUB_BUILD=PLATFORM ./gradlew :airbyte-db:jooq:generateConfigsDatabaseJooq

# for jobs database
SUB_BUILD=PLATFORM ./gradlew :airbyte-db:jooq:generateJobsDatabaseJooq
```

## How to Setup Code Generation for New Database
- In `build.gradle`, do the following.
- Add a new jOOQ configuration under `jooq.configuration`.
- This step will automatically create a `generate<db-name>DatabaseJooq` task.
- Register the output of the code generation task in the main sourceSet.
- Setup caching for the code generation task.

Template:

```build.gradle
// add jooq configuration
jooq {
configurations {
<db-name>Database {
generateSchemaSourceOnCompilation = true
generationTool {
generator {
name = 'org.jooq.codegen.DefaultGenerator'
database {
name = 'io.airbyte.db.instance.configs.ConfigsFlywayMigrationDatabase'
inputSchema = 'public'
excludes = 'airbyte_configs_migrations'
}
target {
packageName = 'io.airbyte.db.instance.configs.jooq'
directory = 'build/generated/configsDatabase/src/main/java'
}
}
}
}
}
}
// register output as source set
sourceSets.main.java.srcDirs (
tasks.named('generate<db-name>DatabaseJooq').flatMap { it.outputDir }
)
sourceSets {
main {
java {
srcDirs "$buildDir/generated/<db-name>Database/src/main/java"
}
}
}
// setup caching
tasks.named('generate<db-name>DatabaseJooq').configure {
allInputsDeclared = true
outputs.cacheIf { true }
}
```
Loading

0 comments on commit b9e1997

Please sign in to comment.