Skip to content

Commit

Permalink
Import/Export support for spanner NUMERIC.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 329002880
  • Loading branch information
cloud-teleport committed Aug 28, 2020
1 parent 93e1d69 commit 54b3e60
Show file tree
Hide file tree
Showing 27 changed files with 1,059 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.teleport.spanner.common.NumericUtils;
import com.google.cloud.teleport.spanner.ddl.Column;
import com.google.cloud.teleport.spanner.ddl.Table;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -100,6 +101,9 @@ public Mutation apply(GenericRecord record) {
.set(column.name())
.to(readDate(record, avroType, logicalType, fieldName).orElse(null));
break;
case NUMERIC:
builder.set(column.name()).to(readNumeric(record, avroType, fieldName).orElse(null));
break;
case ARRAY:
{
Schema arraySchema = avroFieldSchema.getElementType();
Expand Down Expand Up @@ -149,6 +153,11 @@ public Mutation apply(GenericRecord record) {
.set(column.name())
.toDateArray(readDateArray(record, arrayType, fieldName).orElse(null));
break;
case NUMERIC:
builder
.set(column.name())
.toStringArray(readNumericArray(record, arrayType, fieldName).orElse(null));
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -222,6 +231,29 @@ private Optional<List<Date>> readDateArray(
}
}

@VisibleForTesting
@SuppressWarnings("unchecked")
static Optional<List<String>> readNumericArray(
GenericRecord record, Schema.Type avroType, String fieldName) {
Object fieldValue = record.get(fieldName);
if (fieldValue == null) {
return Optional.empty();
}
switch (avroType) {
case BYTES:
List<ByteBuffer> values = (List<ByteBuffer>) record.get(fieldName);
if (values == null) {
return Optional.empty();
}
return Optional.of(
values.stream()
.map(x -> x == null ? null : NumericUtils.bytesToString(x.array()))
.collect(Collectors.toList()));
default:
throw new IllegalArgumentException("Cannot interpret " + avroType + " as BYTES");
}
}

@VisibleForTesting
@SuppressWarnings("unchecked")
static Optional<List<Timestamp>> readTimestampArray(
Expand Down Expand Up @@ -421,6 +453,18 @@ private Optional<Date> readDate(
}
}

private Optional<String> readNumeric(
GenericRecord record, Schema.Type avroType, String fieldName) {
switch (avroType) {
case BYTES:
return Optional.ofNullable((ByteBuffer) record.get(fieldName))
.map(ByteBuffer::array)
.map(NumericUtils::bytesToString);
default:
throw new IllegalArgumentException("Cannot interpret " + avroType + " as BYTES");
}
}

private Optional<Timestamp> readTimestamp(
GenericRecord record, Schema.Type avroType, LogicalType logicalType, String fieldName) {
switch (avroType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import static com.google.cloud.teleport.spanner.AvroUtil.unpackNullable;

import com.google.cloud.spanner.Type;
import com.google.cloud.teleport.spanner.common.NumericUtils;
import com.google.cloud.teleport.spanner.common.Type;
import com.google.cloud.teleport.spanner.ddl.Column;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.Table;
Expand Down Expand Up @@ -168,30 +169,33 @@ private static ImmutableList<String> getNumberedPropsWithPrefix(Schema schema, S
return props.build();
}

private com.google.cloud.spanner.Type inferType(Schema f, boolean supportArrays) {
private com.google.cloud.teleport.spanner.common.Type inferType(Schema f, boolean supportArrays) {
Schema.Type type = f.getType();
LogicalType logicalType = LogicalTypes.fromSchema(f);

switch (type) {
case BOOLEAN:
return Type.bool();
return com.google.cloud.teleport.spanner.common.Type.bool();
case INT:
return com.google.cloud.spanner.Type.int64();
return com.google.cloud.teleport.spanner.common.Type.int64();
case LONG:
if (LogicalTypes.timestampMillis().equals(logicalType)) {
return com.google.cloud.spanner.Type.timestamp();
return com.google.cloud.teleport.spanner.common.Type.timestamp();
}
if (LogicalTypes.timestampMicros().equals(logicalType)) {
return com.google.cloud.spanner.Type.timestamp();
return com.google.cloud.teleport.spanner.common.Type.timestamp();
}
return com.google.cloud.spanner.Type.int64();
return com.google.cloud.teleport.spanner.common.Type.int64();
case FLOAT:
case DOUBLE:
return com.google.cloud.spanner.Type.float64();
return com.google.cloud.teleport.spanner.common.Type.float64();
case STRING:
return com.google.cloud.spanner.Type.string();
return com.google.cloud.teleport.spanner.common.Type.string();
case BYTES:
return com.google.cloud.spanner.Type.bytes();
if (LogicalTypes.decimal(NumericUtils.PRECISION, NumericUtils.SCALE).equals(logicalType)) {
return com.google.cloud.teleport.spanner.common.Type.numeric();
}
return com.google.cloud.teleport.spanner.common.Type.bytes();
case ARRAY:
{
if (supportArrays) {
Expand All @@ -204,7 +208,7 @@ private com.google.cloud.spanner.Type inferType(Schema f, boolean supportArrays)
element = unpacked;
}
try {
return com.google.cloud.spanner.Type.array(inferType(element, false));
return com.google.cloud.teleport.spanner.common.Type.array(inferType(element, false));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Cannot infer array type for field " + f);
}
Expand All @@ -216,7 +220,8 @@ private com.google.cloud.spanner.Type inferType(Schema f, boolean supportArrays)
throw new IllegalArgumentException("Cannot infer a type " + f);
}

private String toString(com.google.cloud.spanner.Type spannerType, boolean supportArray) {
private String toString(
com.google.cloud.teleport.spanner.common.Type spannerType, boolean supportArray) {
switch (spannerType.getCode()) {
case BOOL:
return "BOOL";
Expand All @@ -232,10 +237,13 @@ private String toString(com.google.cloud.spanner.Type spannerType, boolean suppo
return "TIMESTAMP";
case DATE:
return "DATE";
case NUMERIC:
return "NUMERIC";
case ARRAY:
{
if (supportArray) {
Type element = spannerType.getArrayElementType();
com.google.cloud.teleport.spanner.common.Type element =
spannerType.getArrayElementType();
String elementStr = toString(element, false);
return "ARRAY<" + elementStr + ">";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.teleport.spanner;

import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.teleport.spanner.ddl.Column;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.Table;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -48,7 +49,7 @@ public void processElement(ProcessContext c) {
String columnsListAsString =
table.columns().stream()
.filter(x -> !x.isGenerated())
.map(x -> "t.`" + x.name() + "`")
.map(x -> createColumnExpression(x))
.collect(Collectors.joining(","));

PartitionOptions partitionOptions =
Expand All @@ -68,4 +69,19 @@ public void processElement(ProcessContext c) {
}
}));
}

private String createColumnExpression(Column col) {
if (col.typeString().equals("NUMERIC")) {
return "CAST(" + "t.`" + col.name() + "`" + " AS STRING) AS " + col.name();
}
if (col.typeString().equals("ARRAY<NUMERIC>")) {
return "(SELECT ARRAY_AGG(CAST(num AS STRING)) FROM UNNEST("
+ "t.`"
+ col.name()
+ "`"
+ ") AS num) AS "
+ col.name();
}
return "t.`" + col.name() + "`";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package com.google.cloud.teleport.spanner;

import com.google.cloud.spanner.Type;
import com.google.cloud.teleport.spanner.common.NumericUtils;
import com.google.cloud.teleport.spanner.ddl.Column;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.IndexColumn;
import com.google.cloud.teleport.spanner.ddl.Table;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;

Expand Down Expand Up @@ -98,7 +99,7 @@ public Collection<Schema> convert(Ddl ddl) {
return schemas;
}

private Schema avroType(Type spannerType) {
private Schema avroType(com.google.cloud.teleport.spanner.common.Type spannerType) {
switch (spannerType.getCode()) {
case BOOL:
return SchemaBuilder.builder().booleanType();
Expand All @@ -114,6 +115,9 @@ private Schema avroType(Type spannerType) {
return SchemaBuilder.builder().stringType();
case DATE:
return SchemaBuilder.builder().stringType();
case NUMERIC:
return LogicalTypes.decimal(NumericUtils.PRECISION, NumericUtils.SCALE)
.addToSchema(SchemaBuilder.builder().bytesType());
case ARRAY:
Schema avroItemsType = avroType(spannerType.getArrayElementType());
return SchemaBuilder.builder().array().items().type(wrapAsNullable(avroItemsType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.teleport.spanner;

import com.google.cloud.spanner.Struct;
import com.google.cloud.teleport.spanner.common.NumericUtils;
import com.google.common.base.Strings;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -71,6 +72,15 @@ public GenericRecord convert(Struct row) {
builder.set(field, nullValue ? null : row.getDouble(fieldName));
break;
case BYTES:
if (spannerType.equals("NUMERIC")) {
// TODO: uses row.getNumeric() once teleport uses new spanner library.
builder.set(
field,
nullValue
? null
: ByteBuffer.wrap(NumericUtils.stringToBytes(row.getString(fieldName))));
break;
}
builder.set(
field, nullValue ? null : ByteBuffer.wrap(row.getBytes(fieldName).toByteArray()));
break;
Expand Down Expand Up @@ -111,6 +121,23 @@ public GenericRecord convert(Struct row) {
}
case BYTES:
{
if (spannerType.equals("ARRAY<NUMERIC>")) {
if (nullValue) {
builder.set(field, null);
break;
}
List<ByteBuffer> numericValues = null;
numericValues =
row.getStringList(fieldName).stream()
.map(
numeric ->
numeric == null
? null
: ByteBuffer.wrap(NumericUtils.stringToBytes(numeric)))
.collect(Collectors.toList());
builder.set(field, numericValues);
break;
}
List<ByteBuffer> value = null;
if (!nullValue) {
value =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.google.cloud.teleport.spanner;

import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.teleport.spanner.TextImportProtos.ImportManifest;
import com.google.cloud.teleport.spanner.TextImportProtos.ImportManifest.TableManifest;
import com.google.cloud.teleport.spanner.common.Type.Code;
import com.google.cloud.teleport.spanner.ddl.Column;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.Table;
Expand Down Expand Up @@ -183,7 +183,9 @@ public void processElement(ProcessContext c) {
mutations
.apply("Wait for previous depth " + depth, Wait.on(previousComputation))
.apply(
"Write mutations " + depth, LocalSpannerIO.write().withSpannerConfig(spannerConfig)
"Write mutations " + depth,
LocalSpannerIO.write()
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
.withMaxCumulativeBackoff(Duration.standardHours(2))
.withMaxNumMutations(10000)
Expand Down Expand Up @@ -416,6 +418,8 @@ public static Code parseSpannerDataType(String columnType) {
return Code.TIMESTAMP;
} else if (columnType.equalsIgnoreCase("BYTES")) {
return Code.BYTES;
} else if (columnType.equalsIgnoreCase("NUMERIC")) {
return Code.NUMERIC;
} else {
throw new IllegalArgumentException(
"Unrecognized or unsupported column data type: " + columnType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.cloud.ByteArray;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.cloud.teleport.spanner.TextImportProtos.ImportManifest.TableManifest;
import com.google.cloud.teleport.spanner.ddl.Ddl;
Expand Down Expand Up @@ -164,7 +163,7 @@ protected final Mutation parseRow(
manifestColumns != null && manifestColumns.size() > 0
? manifestColumns.get(i).getColumnName()
: table.columns().get(i).name();
Type columnType = table.column(columnName).type();
com.google.cloud.teleport.spanner.common.Type columnType = table.column(columnName).type();
String cellValue = row.get(i);
boolean isNullValue = Strings.isNullOrEmpty(cellValue);
Value columnValue = null;
Expand Down Expand Up @@ -238,6 +237,9 @@ protected final Mutation parseRow(
}
}
break;
case NUMERIC:
columnValue = isNullValue ? Value.string(null) : Value.string(cellValue.trim());
break;
case BYTES:
columnValue = isNullValue ? Value.bytes(null) : Value.bytes(ByteArray.fromBase64(cellValue.trim()));
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.teleport.spanner.common;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;

/** Utility class for dealing with Numeric values. */
public final class NumericUtils {

public static final int PRECISION = 38;
public static final int SCALE = 9;

// Convert an Avro-encoded NUMERIC byte array to an readable NUMERIC string value.
public static String bytesToString(byte[] byteArray) {
BigInteger unscaledNumeric = new BigInteger(byteArray);
BigDecimal scaledNumeric = new BigDecimal(unscaledNumeric, SCALE);
return scaledNumeric.toPlainString();
}

// Convert a readable NUMERIC string value to an Avro-encoded NUMERIC byte array.
public static byte[] stringToBytes(String numeric) {
BigDecimal scaledNumeric = new BigDecimal(numeric, new MathContext(PRECISION)).setScale(SCALE);
BigInteger unscaledNumeric = scaledNumeric.unscaledValue();
return unscaledNumeric.toByteArray();
}
}
Loading

0 comments on commit 54b3e60

Please sign in to comment.