Skip to content

Commit

Permalink
add Oracle support to blob -> string (airbytehq#2830)
Browse files Browse the repository at this point in the history
* add support to blob -> string

* run gradlew format
  • Loading branch information
marcosmarxm authored Apr 12, 2021
1 parent 6e9d6fc commit 5e3ba1d
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 3 deletions.
6 changes: 3 additions & 3 deletions airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private static void setJsonField(ResultSet r, int i, ObjectNode o) throws SQLExc
java.util.Date d = new java.util.Date(t.getTime() + (t.getNanos() / 1000000));
o.put(columnName, toISO8601String(d));
}
case BINARY, VARBINARY, LONGVARBINARY -> o.put(columnName, r.getBytes(i));
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> o.put(columnName, r.getBytes(i));
default -> o.put(columnName, r.getString(i));
}
}
Expand Down Expand Up @@ -189,7 +189,7 @@ public static void setStatementField(PreparedStatement preparedStatement,
case REAL -> preparedStatement.setFloat(parameterIndex, Float.parseFloat(value));
case NUMERIC, DECIMAL -> preparedStatement.setBigDecimal(parameterIndex, new BigDecimal(value));
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> preparedStatement.setString(parameterIndex, value);
case BINARY -> preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseHexBinary(value));
case BINARY, BLOB -> preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseHexBinary(value));
// since cursor are expected to be comparable, handle cursor typing strictly and error on
// unrecognized types
default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType));
Expand All @@ -213,7 +213,7 @@ public static JsonSchemaPrimitive getType(JDBCType jdbcType) {
case DATE -> JsonSchemaPrimitive.STRING;
case TIME -> JsonSchemaPrimitive.STRING;
case TIMESTAMP -> JsonSchemaPrimitive.STRING;
case BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
default -> JsonSchemaPrimitive.STRING;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.oracle;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.OracleContainer;

class OracleSourceTest {

private static final String STREAM_NAME = "TEST.ID_AND_NAME";
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
STREAM_NAME,
Field.of("ID", JsonSchemaPrimitive.NUMBER),
Field.of("NAME", JsonSchemaPrimitive.STRING),
Field.of("IMAGE", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG);
private static final Set<AirbyteMessage> ASCII_MESSAGES = Sets.newHashSet(
createRecord(STREAM_NAME, map("ID", new BigDecimal("1.0"), "NAME", "user", "IMAGE", "last_summer.png".getBytes())));

private static OracleContainer ORACLE_DB;

private static OracleContainer container;
private static JsonNode config;

@BeforeAll
static void init() {
ORACLE_DB = new OracleContainer("epiclabs/docker-oracle-xe-11g");
ORACLE_DB.start();
}

@BeforeEach
void setup() throws Exception {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", ORACLE_DB.getHost())
.put("port", ORACLE_DB.getFirstMappedPort())
.put("sid", ORACLE_DB.getSid())
.put("username", ORACLE_DB.getUsername())
.put("password", ORACLE_DB.getPassword())
.build());

JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver");

database.execute(connection -> {
connection.createStatement().execute("CREATE USER TEST IDENTIFIED BY TEST DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS");
connection.createStatement().execute("CREATE TABLE TEST.id_and_name(id NUMERIC(4, 0), name VARCHAR(200), image BLOB)");
connection.createStatement()
.execute("INSERT INTO TEST.id_and_name(id, name, image) VALUES (1, 'user', utl_raw.cast_to_raw('last_summer.png'))");
});

database.close();
}

private JdbcDatabase getDatabaseFromConfig(JsonNode config) {
return Databases.createJdbcDatabase(config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver");
}

private JsonNode getConfig(OracleContainer oracleDb) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", oracleDb.getHost())
.put("port", oracleDb.getFirstMappedPort())
.put("sid", oracleDb.getSid())
.put("username", oracleDb.getUsername())
.put("password", oracleDb.getPassword())
.build());
}

@AfterAll
static void cleanUp() {
ORACLE_DB.close();
}

private static void setEmittedAtToNull(Iterable<AirbyteMessage> messages) {
for (AirbyteMessage actualMessage : messages) {
if (actualMessage.getRecord() != null) {
actualMessage.getRecord().setEmittedAt(null);
}
}
}

@Test
void testReadSuccess() throws Exception {
final Set<AirbyteMessage> actualMessages = MoreIterators.toSet(new OracleSource().read(getConfig(ORACLE_DB), CONFIGURED_CATALOG, null));
setEmittedAtToNull(actualMessages);

assertEquals(ASCII_MESSAGES, actualMessages);
}

private static AirbyteMessage createRecord(String stream, Map<Object, Object> data) {
return new AirbyteMessage().withType(Type.RECORD).withRecord(new AirbyteRecordMessage().withData(Jsons.jsonNode(data)).withStream(stream));
}

private static Map<Object, Object> map(Object... entries) {
if (entries.length % 2 != 0) {
throw new IllegalArgumentException("Entries must have even length");
}

return new HashMap<>() {

{
for (int i = 0; i < entries.length; i++) {
put(entries[i++], entries[i]);
}
}

};
}

}

0 comments on commit 5e3ba1d

Please sign in to comment.