Skip to content

Commit

Permalink
Destination Oracle: change table limit and format record data (airbyt…
Browse files Browse the repository at this point in the history
…ehq#5542)

* change oracle dest connector

* remove container to run tests add depency to secrets/config.json

* correct functions in tests

* gradlew format
  • Loading branch information
marcosmarxm authored Aug 24, 2021
1 parent 93032a8 commit 29d65e9
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "3986776d-2319-4de9-8af8-db14c0996e72",
"name": "Oracle (Alpha)",
"dockerRepository": "airbyte/destination-oracle",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/oracle"
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
- destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
name: Oracle (Alpha)
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
name: Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-oracle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ dependencies {
implementation "com.oracle.database.jdbc:ojdbc8-production:19.7.0.0"

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.testcontainers:oracle-xe:1.15.2'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-oracle')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String convertStreamName(String input) {
if (!result.isEmpty() && result.charAt(0) == '_') {
result = result.substring(1);
}
return maxStringLength(result, 30);
return maxStringLength(result, 128);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package io.airbyte.integrations.destination.oracle;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -150,8 +152,9 @@ private static void insertRawRecordsInSingleQuery(String tableName,
int i = 1;
for (final AirbyteRecordMessage message : records) {
// 1-indexed
final JsonNode formattedData = StandardNameTransformer.formatJsonPath(message.getData());
statement.setString(i, uuidSupplier.get().toString());
statement.setString(i + 1, Jsons.serialize(message.getData()));
statement.setString(i + 1, Jsons.serialize(formattedData));
statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt())));
i += 3;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,72 +25,46 @@
package io.airbyte.integrations.destination.oracle;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;

public class OracleIntegrationTest extends DestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(OracleIntegrationTest.class);
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private static OracleContainer db;
private static JsonNode baseConfig;
private ExtendedNameTransformer namingResolver = new OracleNameTransformer();
private JsonNode config;
private static JsonNode config;

@BeforeAll
protected static void init() {
db = new OracleContainer("epiclabs/docker-oracle-xe-11g");
db.start();
public JsonNode getStaticConfig() {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
}

@Override
protected String getImageName() {
return "airbyte/destination-oracle:dev";
}

private JsonNode getConfig(OracleContainer db) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getFirstMappedPort())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("schema", "testSchema")
.put("sid", db.getSid())
.build());
}

@Override
protected JsonNode getConfig() {
return config;
}

@Override
protected JsonNode getFailCheckConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", "wrong password")
.put("schema", "public")
.put("port", db.getFirstMappedPort())
.put("sid", db.getSid())
.build());
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName, String namespace, JsonNode streamSchema) throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
Expand All @@ -116,6 +90,13 @@ protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv env, Strin
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected JsonNode getFailCheckConfig() {
final JsonNode invalidConfig = Jsons.clone(config);
((ObjectNode) invalidConfig).put("password", "wrong password");
return invalidConfig;
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
Expand All @@ -130,28 +111,27 @@ protected List<String> resolveIdentifier(String identifier) {
}

private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException {
List<org.jooq.Record> result = Databases.createOracleDatabase(db.getUsername(), db.getPassword(), db.getJdbcUrl())
.query(ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT))
.stream()
.collect(Collectors.toList()));
List<org.jooq.Record> result = getDatabase().query(ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT))
.stream()
.collect(Collectors.toList()));
return result
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList());
}

private static Database getDatabase(JsonNode config) {
private static Database getDatabase() {
// todo (cgardens) - rework this abstraction so that we do not have to pass a null into the
// constructor. at least explicitly handle it, even if the impl doesn't change.
return Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
baseConfig.get("username").asText(),
baseConfig.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
baseConfig.get("host").asText(),
baseConfig.get("port").asText(),
baseConfig.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver",
null);
}
Expand All @@ -172,21 +152,18 @@ private List<String> getAllTables(Database db) {

@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
config = getConfig(db);

final Database database = getDatabase(config);
database.query(ctx -> {
ctx.execute("alter database default tablespace users");
return null;
});
// config = getConfig(db);
baseConfig = getStaticConfig();
config = Jsons.clone(baseConfig);
final Database database = getDatabase();
allTables = getAllTables(database);
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
config = getConfig(db);
config = getStaticConfig();

final Database database = getDatabase(config);
final Database database = getDatabase();
var tables = getAllTables(database);
tables.removeAll(allTables);
try {
Expand All @@ -201,10 +178,4 @@ protected void tearDown(TestDestinationEnv testEnv) {
}
}

@AfterAll
static void cleanUp() {
db.stop();
db.close();
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ You should now have all the requirements needed to configure Oracle as a destina
## Changelog
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.6 | 2021-08-23 | [#5542](https://github.com/airbytehq/airbyte/pull/5542) | Remove support for Oracle 11g to allow normalization |
| 0.1.5 | 2021-08-10 | [#5307](https://github.com/airbytehq/airbyte/pull/5307) | 🐛 Destination Oracle: Fix destination check for users without dba role |
| 0.1.4 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.1.3 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |
Expand Down

0 comments on commit 29d65e9

Please sign in to comment.