Skip to content

Commit

Permalink
Fix OOM in migration (airbytehq#2520)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Mar 19, 2021
1 parent cb2e169 commit 1475fe3
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 37 deletions.
8 changes: 8 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/io/IOs.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public static List<String> getTail(int numLines, Path path) throws IOException {
}
}

public static InputStream inputStream(final Path path) {
try {
return Files.newInputStream(path);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static void silentClose(final Closeable closeable) {
try {
closeable.close();
Expand Down
42 changes: 41 additions & 1 deletion airbyte-commons/src/main/java/io/airbyte/commons/yaml/Yamls.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
package io.airbyte.commons.yaml;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.lang.CloseableConsumer;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.util.Iterator;

public class Yamls {

public static final YAMLFactory YAML_FACTORY = new YAMLFactory();
// Object Mapper is thread-safe
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory());
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(YAML_FACTORY);

public static <T> String serialize(T object) {
try {
Expand All @@ -63,6 +71,38 @@ public static JsonNode deserialize(final String yamlString) {
}
}

public static AutoCloseableIterator<JsonNode> deserializeArray(final InputStream stream) {
try {
YAMLParser parser = YAML_FACTORY.createParser(stream);

// Check the first token
if (parser.nextToken() != JsonToken.START_ARRAY) {
throw new IllegalStateException("Expected content to be an array");
}

Iterator<JsonNode> iterator = new AbstractIterator<>() {

@Override
protected JsonNode computeNext() {
try {
while (parser.nextToken() != JsonToken.END_ARRAY) {
return parser.readValueAsTree();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return endOfData();
}

};

return AutoCloseableIterators.fromIterator(iterator, parser::close);

} catch (IOException e) {
throw new RuntimeException(e);
}
}

// todo (cgardens) - share this with Jsons if ever needed.

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public void testGetTailExists() throws IOException {
assertEquals(expectedTail, tail);
}

@Test
void testInputStream() {
assertThrows(RuntimeException.class, () -> {
IOs.inputStream(Path.of("idontexist"));
});
}

@Test
void testSilentClose() throws IOException {
Closeable closeable = Mockito.mock(Closeable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@

package io.airbyte.commons.yaml;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableConsumer;
import io.airbyte.commons.stream.MoreStreams;
import io.airbyte.commons.util.AutoCloseableIterator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;

class YamlsTest {
Expand Down Expand Up @@ -121,6 +129,27 @@ void testListWriter() throws Exception {
assertEquals(values, deserialize);
}

@Test
void testStreamRead() throws IOException {
List<ToClass> classes = Lists.newArrayList(
new ToClass("1", 1, 1),
new ToClass("2", 2, 2),
new ToClass("3", 3, 3));
ByteArrayInputStream input = spy(new ByteArrayInputStream(Yamls.serialize(classes).getBytes(StandardCharsets.UTF_8)));

try (AutoCloseableIterator<JsonNode> iterator = Yamls.deserializeArray(input)) {
assertEquals(
classes,
MoreStreams.toStream(iterator)
.map(e -> Jsons.object(e, ToClass.class))
.collect(Collectors.toList()));
} catch (Exception e) {
fail();
}

verify(input).close();
}

private static class ToClass {

@JsonProperty("str")
Expand Down
42 changes: 22 additions & 20 deletions airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.set.MoreSets;
import io.airbyte.commons.stream.MoreStreams;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.validation.json.JsonSchemaValidator;
Expand All @@ -46,7 +47,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.BaseStream;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -120,27 +120,38 @@ private Path runMigration(Migration migration, Path migrationInputRoot) throws I
final Path tmpOutputDir = Files.createDirectories(migrateRoot.resolve(migration.getVersion()));

// create a map of each input resource path to the input stream.
final Map<ResourceId, Stream<JsonNode>> inputData = createInputStreams(migration, migrationInputRoot);
final Map<ResourceId, AutoCloseableIterator<JsonNode>> inputData = createInputStreams(migration, migrationInputRoot);
final Map<ResourceId, Stream<JsonNode>> inputDataStreams = inputData.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> MoreStreams.toStream(entry.getValue())
.peek(r -> {
try {
jsonSchemaValidator.ensure(migration.getInputSchema().get(entry.getKey()), r);
} catch (JsonValidationException e) {
throw new IllegalArgumentException("Input data schema does not match declared input schema.", e);
}
})));

final Map<ResourceId, RecordConsumer> outputStreams = createOutputStreams(migration, tmpOutputDir);
// make the java compiler happy (it can't resolve that RecordConsumer is, in fact, a
// Consumer<JsonNode>).
final Map<ResourceId, Consumer<JsonNode>> outputDataWithGenericType = MigrationUtils.mapRecordConsumerToConsumer(outputStreams);

// do the migration.
new MigrateWithMetadata(migration).migrate(inputData, outputDataWithGenericType);
new MigrateWithMetadata(migration).migrate(inputDataStreams, outputDataWithGenericType);

// clean up.
inputData.values().forEach(BaseStream::close);
inputData.values().forEach(v -> Exceptions.toRuntime(v::close));
outputStreams.values().forEach(v -> Exceptions.toRuntime(v::close));

return tmpOutputDir;
}

private Map<ResourceId, Stream<JsonNode>> createInputStreams(Migration migration, Path migrationInputRoot) {
final Map<ResourceId, Stream<JsonNode>> resourceIdToInputStreams = MoreMaps.merge(
createInputStreamsForResourceType(migration, migrationInputRoot, ResourceType.CONFIG),
createInputStreamsForResourceType(migration, migrationInputRoot, ResourceType.JOB));
private Map<ResourceId, AutoCloseableIterator<JsonNode>> createInputStreams(Migration migration, Path migrationInputRoot) {
final Map<ResourceId, AutoCloseableIterator<JsonNode>> resourceIdToInputStreams = MoreMaps.merge(
createInputStreamsForResourceType(migrationInputRoot, ResourceType.CONFIG),
createInputStreamsForResourceType(migrationInputRoot, ResourceType.JOB));

try {
MoreSets.assertEqualsVerbose(migration.getInputSchema().keySet(), resourceIdToInputStreams.keySet());
Expand All @@ -150,25 +161,16 @@ private Map<ResourceId, Stream<JsonNode>> createInputStreams(Migration migration
return resourceIdToInputStreams;
}

private Map<ResourceId, Stream<JsonNode>> createInputStreamsForResourceType(Migration migration,
Path migrationInputRoot,
ResourceType resourceType) {
private Map<ResourceId, AutoCloseableIterator<JsonNode>> createInputStreamsForResourceType(Path migrationInputRoot, ResourceType resourceType) {
final List<Path> inputFilePaths = FileUtils.listFiles(migrationInputRoot.resolve(resourceType.getDirectoryName()).toFile(), null, false)
.stream()
.map(File::toPath)
.collect(Collectors.toList());

final Map<ResourceId, Stream<JsonNode>> inputData = new HashMap<>();
final Map<ResourceId, AutoCloseableIterator<JsonNode>> inputData = new HashMap<>();
for (final Path absolutePath : inputFilePaths) {
final ResourceId resourceId = ResourceId.fromRecordFilePath(resourceType, absolutePath);
final Stream<JsonNode> recordInputStream = MoreStreams.toStream(Yamls.deserialize(IOs.readFile(absolutePath)).elements())
.peek(r -> {
try {
jsonSchemaValidator.ensure(migration.getInputSchema().get(resourceId), r);
} catch (JsonValidationException e) {
throw new IllegalArgumentException("Input data schema does not match declared input schema.", e);
}
});
AutoCloseableIterator<JsonNode> recordInputStream = Yamls.deserializeArray(IOs.inputStream(absolutePath));
inputData.put(resourceId, recordInputStream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void run(String[] args) throws IOException {

MigrateConfig migrateConfig = parse(args);

if (migrateConfig.getInputPath().toString().endsWith("gz") || migrateConfig.getInputPath().toString().endsWith(".tar.gz")) {
if (migrateConfig.getInputPath().toString().endsWith(".gz")) {
LOGGER.info("Unpacking tarball");
final Path uncompressedInputPath = Files.createDirectories(workspaceRoot.resolve("uncompressed"));
Archives.extractArchive(migrateConfig.getInputPath(), uncompressedInputPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreIterators;
Expand Down Expand Up @@ -101,21 +102,19 @@ private static JsonNode toConfiguredCatalog(JsonNode schema) {
// sync mode enum is identical in Schema and ConfiguredCatalog.
.map(JsonNode::asText)
.collect(Collectors.toList());
final Map<String, JsonNode> airbyteStream = ImmutableMap.<String, JsonNode>builder()
// catalog fields
.put("name", stream.get("name"))
.put("supported_sync_modes", Jsons.jsonNode(supportedSyncModes))
.put("json_schema", fieldsToJsonSchema(stream.get("fields")))
.put("source_defined_cursor", stream.get("sourceDefinedCursor"))
.put("default_cursor_field", stream.get("defaultCursorField"))
.build();
// catalog fields
final Map<String, JsonNode> airbyteStream = Maps.newHashMap();
airbyteStream.put("name", stream.get("name"));
airbyteStream.put("supported_sync_modes", Jsons.jsonNode(supportedSyncModes));
airbyteStream.put("json_schema", fieldsToJsonSchema(stream.get("fields")));
airbyteStream.put("source_defined_cursor", stream.get("sourceDefinedCursor"));
airbyteStream.put("default_cursor_field", stream.get("defaultCursorField"));
// configured catalog fields
return (Map<String, JsonNode>) ImmutableMap.<String, JsonNode>builder()
.put("stream", Jsons.jsonNode(airbyteStream))
// sync mode enum is identical in Schema and ConfiguredCatalog.
.put("sync_mode", Jsons.jsonNode(stream.get("syncMode").asText()))
.put("cursor_field", stream.get("cursorField"))
.build();
final Map<String, JsonNode> catalog = Maps.newHashMap();
catalog.put("stream", Jsons.jsonNode(airbyteStream));
catalog.put("sync_mode", Jsons.jsonNode(stream.get("syncMode").asText()));
catalog.put("cursor_field", stream.get("cursorField"));
return catalog;
})
.collect(Collectors.toList());

Expand Down

0 comments on commit 1475fe3

Please sign in to comment.