Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Flink] Delta Flink Connector: Incorrect Mapping of BYTES to BINARY(1) in Sink Table Schema #3977

Open
1 task done
enriquecatala opened this issue Dec 16, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@enriquecatala
Copy link

Bug

Which Delta project/connector is this regarding?

  • [x ] Flink

Describe the problem

The Delta Flink connector maps a BYTES type to BINARY(1) in the sink schema, causing schema mismatch errors during an INSERT operation. This behavior persists despite explicitly defining the sink schema or casting the binary_data column in the query. I noticed this when trying to get the raw payload from a kafka topic but I ended up debuging and found that this bug is happening due to something that flink delta connector is doing while creating the mapping at execution plan so I created this sample to show that the schema is not working

I tried with both datastream api and table api with same results.

Steps to reproduce

from pyflink.table import EnvironmentSettings, TableEnvironment

# Initialize the streaming table environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Create a source table using the DataGen connector
table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT,
        binary_data BYTES
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '8',
        'fields.binary_data.kind' = 'random',
        'fields.binary_data.length' = '16'
    )
""")

# Create a sink table using the Delta connector
table_env.execute_sql("""
    CREATE TABLE print_sink (
        id BIGINT,
        binary_data BYTES
    ) WITH (
        'connector' = 'delta',
        'table-path' = '/path/to/delta/table'
    )
""")

# Insert data from the source into the sink
table_env.execute_sql("""
    INSERT INTO print_sink
    SELECT id, binary_data
    FROM random_source
""").wait()

Observed results

The schema for the print_sink table is created incorrectly as:

<Row('id', 'BIGINT', True, None, None, None)>
<Row('binary_data', 'BINARY(1)', True, None, None, None)>

This causes a schema mismatch error during the INSERT operation:

org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'print_sink' do not match.
Cause: Incompatible types for sink column 'binary_data' at position 1.
Query schema: [id: BIGINT, binary_data: BYTES]
Sink schema: [id: BIGINT, binary_data: BINARY(1)]

Expected results

The binary_data column should be mapped correctly to a type compatible with BYTES (e.g., VARBINARY), avoiding schema mismatch errors.

Further details

The issue persists even after:

  • Explicitly casting binary_data as BYTES in the query.
  • Manually defining the sink schema with binary_data VARBINARY.
  • Using the configuration option 'delta.ignore-schema-check' = 'true'.

When using a temporary table (e.g., memory or blackhole), the schema for binary_data is correctly identified as BYTES. The issue seems to be specific to the Delta connector.

Environment information

Tested with

  • Flink 1.19 and 1.20

    org.apache.flink flink-connector-kafka 3.4.0-1.20
      <!-- https://mvnrepository.com/artifact/io.delta/delta-flink -->
      <dependency>
          <groupId>io.delta</groupId>
          <artifactId>delta-flink</artifactId>
          <version>3.2.1</version>
      </dependency>
    
      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-sql-connector-kafka</artifactId>
          <version>3.4.0-1.20</version>            
      </dependency>
      
      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-parquet -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-sql-parquet</artifactId>
          <version>1.20.0</version>
      </dependency>
    
      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_2.12</artifactId>
          <version>1.20.0</version>
      </dependency>
    
      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients</artifactId>
          <version>1.20.0</version>
      </dependency>
    
    
    
      <!-- https://mvnrepository.com/artifact/io.delta/delta-standalone 
           Required for delta-->
      <dependency>
          <groupId>io.delta</groupId>
          <artifactId>delta-standalone_2.12</artifactId>
          <version>3.2.1</version>
      </dependency>
    

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • No. I cannot contribute a bug fix at this time.
@enriquecatala enriquecatala added the bug Something isn't working label Dec 16, 2024
@scottsand-db
Copy link
Collaborator

Hi there! Thanks for making this issue!

Q1: can you please show me the schema of the sink table created here?

# Create a sink table using the Delta connector
table_env.execute_sql("""
    CREATE TABLE print_sink (
        id BIGINT,
        binary_data BYTES
    ) WITH (
        'connector' = 'delta',
        'table-path' = '/path/to/delta/table'
    )
""")

I believe the root of the issue is this:

  • The sink is mapping both flink types BINARY and VARBINARY to the Delta type BinaryType
  • The source is mapping the delta type BinaryType to flink type BINARY (not VARBINARY)
  • i.e. this is non-injective (not one-to-one)

So this might be fixed by mapping delta BinaryType to flink VARBINARY? But I'm not sure, I'll have to debug this more.

@enriquecatala
Copy link
Author

Sure.

It does´nt matter if table already exists of if table is being created from flink. Here you can find the transaction log file for the table being created from the job itself:

{"commitInfo":{"timestamp":1734609157868,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","description":null,"properties":"{\"delta.column.types\":\"binary_data BYTES\",\"delta.ignore-schema-check\":\"true\"}","isManaged":false},"isolationLevel":"SnapshotIsolation","isBlindAppend":true,"operationMetrics":{},"engineInfo":"flink-engine/1.16.1-flink-delta-connector/3.2.1 Delta-Standalone/3.2.1"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"85d80af0-1c81-4ef2-ae6c-78cd45266642","name":"print_sink","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"binary_data\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.column.types":"binary_data BYTES","delta.ignore-schema-check":"true"},"createdTime":1734609157540}}

Just in case, I´m running it using in-memory catalog. here is the full code just in case:

import os
from pyflink.table import EnvironmentSettings, TableEnvironment

#
# BUG: https://github.com/delta-io/delta/issues/3977
#

# Initialize the streaming table environment.
# This enables the execution of continuous queries on unbounded and bounded data.
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

def is_running_locally():
    """
    Check for the presence of a local environment marker.
    """
    return os.path.exists("../.local_env")


if is_running_locally():
    print("is_running_locally")

    # Automatically add all JAR files in the ../flink-connectors/ folder to the classpath.
    flink_connectors_path = os.path.abspath("../flink-connectors")
    jars = [
        f"file://{os.path.join(flink_connectors_path, jar)}"
        for jar in os.listdir(flink_connectors_path)
        if jar.endswith(".jar")
    ]
    table_env.get_config().get_configuration().set_string(
        "pipeline.jars", ";".join(jars)
    )

    # Print all configuration settings
    config = table_env.get_config().get_configuration()
    print("Configurations:")
    for key in config.key_set():
        print(f"{key} = {config.get_string(key, '')}")

    delta_sink_path  = '../data/output/2_datagen_sink_delta_local'
    base_path = '../data/output'
else:
    delta_sink_path  = '/data/output/2_datagen_sink_delta_docker'
    base_path = '/data/output'

  
# 
# https://github.com/delta-io/delta/tree/master/connectors/flink#sql-support
#
# Configure the Delta Catalog
table_env.execute_sql(f"""
    CREATE CATALOG my_delta_catalog WITH (
        'type' = 'delta-catalog',
        'catalog-type' = 'in-memory',
        'default-database' = 'default_database'
    )
""")

# Use the Delta Catalog
table_env.use_catalog("my_delta_catalog")
table_env.use_database("default_database")

table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT,         
        binary_data BYTES
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '8',        
        'fields.binary_data.kind' = 'random',
        'fields.binary_data.length' = '16'
    )
""")

table_env.execute_sql(f"""
    CREATE TABLE  IF NOT EXISTS print_sink (
        id BIGINT,      
        binary_data BYTES
    ) WITH (
        'connector' = 'delta',
        'table-path' = '{delta_sink_path}'

    )
""")

result = table_env.execute_sql("DESCRIBE print_sink")
for row in result.collect():
    print(row)

table_env.execute_sql("""
    INSERT INTO print_sink
        SELECT id,               
               CAST(binary_data AS BYTES)  -- kind of forcing it...not helping
        FROM  random_source                
""").wait()  

Problem here is that databricks delta format binary is in deed a variable binary, while in flink binary seems to be just a byte.

I tried using BINARY, varbinary, bytes(1000)...too with same results.

thank you in advance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants