Skip to content

Commit

Permalink
Change the scope definition (airbytehq#1485)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Jan 5, 2021
1 parent 93ba7b3 commit 677c086
Show file tree
Hide file tree
Showing 20 changed files with 351 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ public static <T extends Enum<T>> Optional<T> toEnum(final String value, Class<T
return Optional.ofNullable((T) NORMALIZED_ENUMS.get(enumClass).get(normalizeName(value)));
}

public static <T extends Enum<T>> String toSqlName(final T value) {
return value.name().toLowerCase();
}

private static String normalizeName(final String name) {
return name.toLowerCase().replaceAll("[^a-zA-Z0-9]", "");
}
Expand Down
24 changes: 24 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/text/Names.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.commons.text;

import com.google.common.base.Preconditions;
import java.text.Normalizer;

public class Names {
Expand Down Expand Up @@ -58,4 +59,27 @@ public static String concatQuotedNames(String name, String suffix) {
}
}

public static String doubleQuote(String value) {
return internalQuote(value, '"');
}

public static String singleQuote(String value) {
return internalQuote(value, '\'');
}

private static String internalQuote(final String value, final char quoteChar) {
Preconditions.checkNotNull(value);

boolean startsWithChar = value.charAt(0) == quoteChar;
boolean endsWithChar = value.charAt(value.length() - 1) == quoteChar;

Preconditions.checkState(startsWithChar == endsWithChar, "Invalid value: %s", value);

if (startsWithChar) {
return value;
} else {
return String.format("%c%s%c", quoteChar, value, quoteChar);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,28 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.commons.text;

import com.google.common.base.Preconditions;
import io.airbyte.config.JobConfig;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class ScopeHelper {
public class Sqls {

private static final String SCOPE_DELIMITER = ":";

public static String getScopePrefix(JobConfig.ConfigType configType) {
return configType.value();
}

public static String createScope(JobConfig.ConfigType configType, String configId) {
Preconditions.checkNotNull(configType);
Preconditions.checkNotNull(configId);
return getScopePrefix(configType) + SCOPE_DELIMITER + configId;
public static <T extends Enum<T>> String toSqlName(final T value) {
return value.name().toLowerCase();
}

public static String getConfigId(String scope) {
Preconditions.checkNotNull(scope);

final String[] split = scope.split(SCOPE_DELIMITER);
if (split.length <= 1) {
return "";
} else {
return split[1];
}
/**
* Generate a string fragment that can be put in the IN clause of a SQL statement. eg. column IN
* (value1, value2)
*
* @param values to encode
* @param <T> enum type
* @return "'value1', 'value2', 'value3'"
*/
public static <T extends Enum<T>> String toSqlInFragment(final Iterable<T> values) {
return StreamSupport.stream(values.spliterator(), false).map(Sqls::toSqlName).map(Names::singleQuote)
.collect(Collectors.joining(",", "(", ")"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static io.airbyte.commons.enums.Enums.convertTo;
import static io.airbyte.commons.enums.Enums.isCompatible;
import static io.airbyte.commons.enums.Enums.toEnum;
import static io.airbyte.commons.enums.Enums.toSqlName;

import java.util.Optional;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -111,11 +110,4 @@ void testToEnum() {
Assertions.assertEquals(Optional.empty(), toEnum("VALUE_5", E5.class));
}

@Test
void testToSqlName() {
Assertions.assertEquals("value_1", toSqlName(E5.VALUE_1));
Assertions.assertEquals("value_two", toSqlName(E5.VALUE_TWO));
Assertions.assertEquals("value_three", toSqlName(E5.value_three));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.commons.text;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.Test;

Expand All @@ -38,4 +39,20 @@ void testToAlphanumericAndUnderscore() {
assertEquals("users_USE_special_____", Names.toAlphanumericAndUnderscore("users USE special !@#$"));
}

@Test
void testDoubleQuote() {
assertEquals("\"abc\"", Names.doubleQuote("abc"));
assertEquals("\"abc\"", Names.doubleQuote("\"abc\""));
assertThrows(IllegalStateException.class, () -> Names.doubleQuote("\"abc"));
assertThrows(IllegalStateException.class, () -> Names.doubleQuote("abc\""));
}

@Test
void testSimpleQuote() {
assertEquals("'abc'", Names.singleQuote("abc"));
assertEquals("'abc'", Names.singleQuote("'abc'"));
assertThrows(IllegalStateException.class, () -> Names.singleQuote("'abc"));
assertThrows(IllegalStateException.class, () -> Names.singleQuote("abc'"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,32 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.commons.text;

import static org.junit.jupiter.api.Assertions.*;
import static io.airbyte.commons.text.Sqls.toSqlName;

import io.airbyte.config.JobConfig;
import java.util.UUID;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class ScopeHelperTest {
class SqlsTest {

enum E1 {
VALUE_1,
VALUE_TWO,
value_three,
}

@Test
public void testCreateScope() {
final String configId = UUID.randomUUID().toString();
final String actual = ScopeHelper.createScope(JobConfig.ConfigType.SYNC, configId);
final String expected = "sync:" + configId;
assertEquals(expected, actual);
void testToSqlName() {
Assertions.assertEquals("value_1", toSqlName(E1.VALUE_1));
Assertions.assertEquals("value_two", toSqlName(E1.VALUE_TWO));
Assertions.assertEquals("value_three", toSqlName(E1.value_three));
}

@Test
public void testGetConfig() {
final String configId = UUID.randomUUID().toString();
assertEquals("", ScopeHelper.getConfigId("sync:"));
assertEquals(configId, ScopeHelper.getConfigId("sync:" + configId));
void testInFragment() {
Assertions.assertEquals("('value_two','value_three')", Sqls.toSqlInFragment(Lists.newArrayList(E1.VALUE_TWO, E1.value_three)));
}

}
11 changes: 8 additions & 3 deletions airbyte-scheduler/src/main/java/io/airbyte/scheduler/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@

import com.google.common.base.Preconditions;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

public class Job {

public static final Set<ConfigType> REPLICATION_TYPES = EnumSet.of(ConfigType.SYNC, ConfigType.RESET_CONNECTION);

private final long id;
private JobConfig.ConfigType configType;
private final ConfigType configType;
private final String scope;
private final JobConfig config;
private final JobStatus status;
Expand All @@ -46,7 +51,7 @@ public class Job {
private final List<Attempt> attempts;

public Job(final long id,
final JobConfig.ConfigType configType,
final ConfigType configType,
final String scope,
final JobConfig config,
final List<Attempt> attempts,
Expand All @@ -69,7 +74,7 @@ public long getId() {
return id;
}

public JobConfig.ConfigType getConfigType() {
public ConfigType getConfigType() {
return configType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void scheduleSyncJobs() throws IOException {
final List<StandardSync> activeConnections = getAllActiveConnections();

for (StandardSync connection : activeConnections) {
final Optional<Job> previousJobOptional = jobPersistence.getLastSyncScope(connection.getConnectionId());
final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connection.getConnectionId());
final StandardSyncSchedule standardSyncSchedule = getStandardSyncSchedule(connection);

if (scheduleJobPredicate.test(previousJobOptional, standardSyncSchedule)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,14 @@ private ImmutableMap.Builder<String, Object> generateMetadata(Job job) throws Co
switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE, DISCOVER_SCHEMA -> {
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromSource(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));
.getSourceDefinitionFromSource(UUID.fromString(job.getScope()));

metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
}
case CHECK_CONNECTION_DESTINATION -> {
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromDestination(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));
.getDestinationDefinitionFromDestination(UUID.fromString(job.getScope()));

metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());
Expand All @@ -197,7 +197,7 @@ private ImmutableMap.Builder<String, Object> generateMetadata(Job job) throws Co
// no op because this will be noisy as heck.
}
case SYNC -> {
final UUID connectionId = UUID.fromString(ScopeHelper.getConfigId(job.getScope()));
final UUID connectionId = UUID.fromString(job.getScope());
final StandardSyncSchedule schedule = configRepository.getStandardSyncSchedule(connectionId);
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromConnection(connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Job createOrGetActiveSyncJob(SourceConnection source,
destinationDockerImage);

long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastSyncScope(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
: jobIdOptional.get();

return waitUntilJobIsTerminalOrTimeout(jobId);
Expand All @@ -102,7 +102,7 @@ public Job createOrGetActiveResetConnectionJob(DestinationConnection destination
final Optional<Long> jobIdOptional = jobCreator.createResetConnectionJob(destination, standardSync, destinationDockerImage);

long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastSyncScope(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
: jobIdOptional.get();

return waitUntilJobIsTerminalOrTimeout(jobId);
Expand Down
Loading

0 comments on commit 677c086

Please sign in to comment.