Skip to content

Commit

Permalink
Adding incremental to the catalog data model (airbytehq#998)
Browse files Browse the repository at this point in the history
* Add ConfiguredAirbyteCatalog and ConfiguredAirbyteStream
  • Loading branch information
cgardens authored Nov 18, 2020
1 parent d1fd379 commit e7edb2c
Show file tree
Hide file tree
Showing 86 changed files with 615 additions and 342 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ jobs:
- name: Checkout Airbyte
uses: actions/checkout@v2

- name: Check images exist
run: ./tools/bin/check_images_exist.sh

- name: Cache java deps
uses: actions/cache@v2
with:
Expand Down Expand Up @@ -65,9 +68,6 @@ jobs:
- name: Ensure no file change
run: git status --porcelain && test -z "$(git status --porcelain)"

- name: Check images exist
run: ./tools/bin/check_images_exist.sh

- name: Check documentation
if: success() && github.ref == 'refs/heads/master'
run: ./tools/site/link_checker.sh check_docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "2470e835-feaf-4db6-96f3-70fd645acc77",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-salesforce-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1",
"name": "Google Analytics",
"dockerRepository": "airbyte/source-googleanalytics-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-googleanalytics-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "57eb1576-8f52-463d-beb6-2e107cdf571d",
"name": "Hubspot",
"dockerRepository": "airbyte/source-hubspot-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://https://docs.airbyte.io/integrations/sources/hubspot"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7",
"name": "Google Sheets",
"dockerRepository": "airbyte/source-google-sheets",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "74d47f79-8d01-44ac-9755-f5eb0d7caacb",
"name": "Facebook Marketing APIs",
"dockerRepository": "airbyte/source-facebook-marketing-api-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing-api-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "778daa7c-feaf-4db6-96f3-70fd645acc77",
"name": "File",
"dockerRepository": "airbyte/source-file",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-file"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "9e0556f4-69df-4522-a3fb-03264d36b348",
"name": "Marketo",
"dockerRepository": "airbyte/source-marketo-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-marketo-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "9fed261d-d107-47fd-8c8b-323023db6e20",
"name": "exchangeratesapi.io",
"dockerRepository": "airbyte/source-exchangeratesapi-singer",
"dockerImageTag": "0.1.4",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-exchangeratesapi_io-source"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b1892b11-788d-44bd-b9ec-3a436f7b54ce",
"name": "Shopify",
"dockerRepository": "airbyte/source-shopify-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-shopify-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e094cb9a-26de-4645-8761-65c0c425d1de",
"name": "Stripe",
"dockerRepository": "airbyte/source-stripe-singer",
"dockerImageTag": "0.1.4",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-stripe-source"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "ef69ef6e-aa7f-4af1-a01d-ef775033524e",
"name": "Github",
"dockerRepository": "airbyte/source-github-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-github-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "fdc8b827-3257-4b33-83cc-106d234c34d4",
"name": "Google Adwords",
"dockerRepository": "airbyte/source-google-adwords-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-google-adwords"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -40,15 +41,15 @@
// todo (cgardens) - hack, remove after we've gotten rid of Schema object.
public class AirbyteProtocolConverters {

public static AirbyteCatalog toCatalog(Schema schema) {
List<AirbyteStream> airbyteStreams = schema.getStreams().stream()
.map(s -> new AirbyteStream()
public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) {
List<ConfiguredAirbyteStream> airbyteStreams = schema.getStreams().stream()
.map(s -> new ConfiguredAirbyteStream()
.withName(s.getName())
.withJsonSchema(toJson(s.getFields())))
// perform selection based on the output of toJson, which keeps properties if selected=true
.filter(s -> !s.getJsonSchema().get("properties").isEmpty())
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(airbyteStreams);
return new ConfiguredAirbyteCatalog().withStreams(airbyteStreams);
}

// todo (cgardens) - this will only work with table / column schemas. it's hack to get us through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ properties:
"$ref": SyncMode.yaml
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
connectionId:
type: string
format: uuid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ properties:
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
syncMode:
"$ref": SyncMode.yaml
connectionId:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ properties:
"$ref": SyncMode.yaml
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
connectionId:
type: string
format: uuid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import org.junit.jupiter.api.Test;
Expand All @@ -48,6 +50,12 @@ class AirbyteProtocolConvertersTest {
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(
Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING),
Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER)))));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = new ConfiguredAirbyteCatalog()
.withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withName(STREAM)
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(
Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING),
Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER)))));

private static final Schema SCHEMA = new Schema()
.withStreams(Lists.newArrayList(new Stream()
Expand Down Expand Up @@ -87,13 +95,13 @@ class AirbyteProtocolConvertersTest {
.withSelected(false)))));

@Test
void testToCatalog() {
assertEquals(CATALOG, AirbyteProtocolConverters.toCatalog(SCHEMA));
void testToConfiguredCatalog() {
assertEquals(CONFIGURED_CATALOG, AirbyteProtocolConverters.toConfiguredCatalog(SCHEMA));
}

@Test
void testToCatalogWithUnselected() {
assertEquals(CATALOG, AirbyteProtocolConverters.toCatalog(SCHEMA_WITH_UNSELECTED));
void testToConfiguredCatalogWithUnselected() {
assertEquals(CONFIGURED_CATALOG, AirbyteProtocolConverters.toConfiguredCatalog(SCHEMA_WITH_UNSELECTED));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
Status,
Type,
Expand All @@ -43,6 +45,8 @@
"AirbyteRecordMessage",
"AirbyteStateMessage",
"AirbyteStream",
"ConfiguredAirbyteCatalog",
"ConfiguredAirbyteStream",
"ConnectorSpecification",
"Status",
"Type",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@
"""

# generated by generate-protocol-files
from .airbyte_message import *
from .airbyte_protocol import *
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""

# generated by datamodel-codegen:
# filename: airbyte_message.yaml
# filename: airbyte_protocol.yaml

from __future__ import annotations

Expand Down Expand Up @@ -79,9 +79,9 @@ class AirbyteConnectionStatus(BaseModel):
message: Optional[str] = None


class AirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
class SyncMode(Enum):
full_refresh = "full_refresh"
incremental = "incremental"


class ConnectorSpecification(BaseModel):
Expand All @@ -93,10 +93,34 @@ class ConnectorSpecification(BaseModel):
)


class AirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
supported_sync_modes: Optional[List[SyncMode]] = None
default_cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves.",
)


class ConfiguredAirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
sync_mode: Optional[SyncMode] = "full_refresh"
cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.",
)


class AirbyteCatalog(BaseModel):
streams: List[AirbyteStream]


class ConfiguredAirbyteCatalog(BaseModel):
streams: List[ConfiguredAirbyteStream]


class AirbyteMessage(BaseModel):
type: Type = Field(..., description="Message type")
log: Optional[AirbyteLogMessage] = Field(
Expand All @@ -114,3 +138,8 @@ class AirbyteMessage(BaseModel):
None,
description="schema message: the state. Must be the last message produced. The platform uses this information",
)


class AirbyteProtocol(BaseModel):
airbyte_message: Optional[AirbyteMessage] = None
configured_airbyte_catalog: Optional[ConfiguredAirbyteCatalog] = None
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
package io.airbyte.integrations.base;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public interface Destination extends Integration {

Expand All @@ -41,6 +41,6 @@ public interface Destination extends Integration {
* will always be called once regardless of success or failure.
* @throws Exception - any exception.
*/
DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) throws Exception;
DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception;

}
Loading

0 comments on commit e7edb2c

Please sign in to comment.