Skip to content

Commit

Permalink
AVRO-2545: Add Ruby support for aliases (apache#636)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjwp authored May 30, 2020
1 parent 09f8afe commit 5287c79
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 38 deletions.
26 changes: 14 additions & 12 deletions lang/ruby/lib/avro/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -359,26 +359,28 @@ def read_record(writers_schema, readers_schema, decoder)
readers_fields_hash = readers_schema.fields_hash
read_record = {}
writers_schema.fields.each do |field|
if (readers_field = readers_fields_hash[field.name])
readers_field = readers_fields_hash[field.name]
if readers_field
field_val = read_data(field.type, readers_field.type, decoder)
read_record[field.name] = field_val
elsif readers_schema.fields_by_alias.key?(field.name)
readers_field = readers_schema.fields_by_alias[field.name]
field_val = read_data(field.type, readers_field.type, decoder)
read_record[readers_field.name] = field_val
else
skip_data(field.type, decoder)
end
end

# fill in the default values
if readers_fields_hash.size > read_record.size
writers_fields_hash = writers_schema.fields_hash
readers_fields_hash.each do |field_name, field|
unless writers_fields_hash.has_key? field_name
if field.default?
field_val = read_default_value(field.type, field.default)
read_record[field.name] = field_val
else
raise AvroError, "Missing data for #{field.type} with no default"
end
end
readers_fields_hash.each do |field_name, field|
next if read_record.key?(field_name)

if field.default?
field_val = read_default_value(field.type, field.default)
read_record[field.name] = field_val
else
raise AvroError, "Missing data for #{field.type} with no default"
end
end

Expand Down
83 changes: 66 additions & 17 deletions lang/ruby/lib/avro/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,20 @@ def self.real_parse(json_obj, names=nil, default_namespace=nil)
raise SchemaParseError, "Name #{name} is invalid for type #{type}!"
end
namespace = json_obj.include?('namespace') ? json_obj['namespace'] : default_namespace
aliases = json_obj['aliases']
case type_sym
when :fixed
size = json_obj['size']
return FixedSchema.new(name, namespace, size, names, logical_type)
return FixedSchema.new(name, namespace, size, names, logical_type, aliases)
when :enum
symbols = json_obj['symbols']
doc = json_obj['doc']
default = json_obj['default']
return EnumSchema.new(name, namespace, symbols, names, doc, default)
return EnumSchema.new(name, namespace, symbols, names, doc, default, aliases)
when :record, :error
fields = json_obj['fields']
doc = json_obj['doc']
return RecordSchema.new(name, namespace, fields, names, type_sym, doc)
return RecordSchema.new(name, namespace, fields, names, type_sym, doc, aliases)
else
raise SchemaParseError.new("Unknown named type: #{type}")
end
Expand Down Expand Up @@ -230,13 +231,25 @@ def to_s
MultiJson.dump to_avro
end

def validate_aliases!
unless aliases.nil? ||
(aliases.is_a?(Array) && aliases.all? { |a| a.is_a?(String) })

raise Avro::SchemaParseError,
"Invalid aliases value #{aliases.inspect} for #{type} #{name}. Must be an array of strings."
end
end
private :validate_aliases!

class NamedSchema < Schema
attr_reader :name, :namespace
attr_reader :name, :namespace, :aliases

def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil)
def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil, aliases=nil)
super(type, logical_type)
@name, @namespace = Name.extract_namespace(name, namespace)
@doc = doc
@doc = doc
@aliases = aliases
validate_aliases! if aliases
Name.add_name(names, self)
end

Expand All @@ -247,33 +260,53 @@ def to_avro(names=Set.new)
end
props = {'name' => @name}
props.merge!('namespace' => @namespace) if @namespace
props.merge!('doc' => @doc) if @doc
props['namespace'] = @namespace if @namespace
props['doc'] = @doc if @doc
props['aliases'] = aliases if aliases && aliases.any?
super.merge props
end

def fullname
@fullname ||= Name.make_fullname(@name, @namespace)
end

def fullname_aliases
@fullname_aliases ||= if aliases
aliases.map { |a| Name.make_fullname(a, namespace) }
else
[]
end
end

def match_fullname?(name)
name == fullname || fullname_aliases.include?(name)
end
end

class RecordSchema < NamedSchema
attr_reader :fields, :doc

def self.make_field_objects(field_data, names, namespace=nil)
field_objects, field_names = [], Set.new
field_objects, field_names, alias_names = [], Set.new, Set.new
field_data.each do |field|
if field.respond_to?(:[]) # TODO(jmhodges) wtffffff
type = field['type']
name = field['name']
default = field.key?('default') ? field['default'] : :no_default
order = field['order']
doc = field['doc']
new_field = Field.new(type, name, default, order, names, namespace, doc)
aliases = field['aliases']
new_field = Field.new(type, name, default, order, names, namespace, doc, aliases)
# make sure field name has not been used yet
if field_names.include?(new_field.name)
raise SchemaParseError, "Field name #{new_field.name.inspect} is already in use"
end
field_names << new_field.name
# make sure alias has not be been used yet
if new_field.aliases && alias_names.intersect?(new_field.aliases.to_set)
raise SchemaParseError, "Alias #{(alias_names & new_field.aliases).to_a} already in use"
end
alias_names.merge(new_field.aliases) if new_field.aliases
else
raise SchemaParseError, "Not a valid field: #{field}"
end
Expand All @@ -282,14 +315,14 @@ def self.make_field_objects(field_data, names, namespace=nil)
field_objects
end

def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil)
def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil, aliases=nil)
if schema_type == :request || schema_type == 'request'
@type_sym = schema_type.to_sym
@namespace = namespace
@name = nil
@doc = nil
else
super(schema_type, name, namespace, names, doc)
super(schema_type, name, namespace, names, doc, nil, aliases)
end
@fields = if fields
RecordSchema.make_field_objects(fields, names, self.namespace)
Expand All @@ -302,6 +335,16 @@ def fields_hash
@fields_hash ||= fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
end

def fields_by_alias
@fields_by_alias ||= fields.each_with_object({}) do |field, hash|
if field.aliases
field.aliases.each do |a|
hash[a] = field
end
end
end
end

def to_avro(names=Set.new)
hsh = super
return hsh unless hsh.is_a?(Hash)
Expand Down Expand Up @@ -372,7 +415,7 @@ class EnumSchema < NamedSchema

attr_reader :symbols, :doc, :default

def initialize(name, space, symbols, names=nil, doc=nil, default=nil)
def initialize(name, space, symbols, names=nil, doc=nil, default=nil, aliases=nil)
if symbols.uniq.length < symbols.length
fail_msg = "Duplicate symbol: #{symbols}"
raise Avro::SchemaParseError, fail_msg
Expand All @@ -391,7 +434,7 @@ def initialize(name, space, symbols, names=nil, doc=nil, default=nil)
raise Avro::SchemaParseError, "Default '#{default}' is not a valid symbol for enum #{name}"
end

super(:enum, name, space, names, doc)
super(:enum, name, space, names, doc, nil, aliases)
@default = default
@symbols = symbols
end
Expand Down Expand Up @@ -444,12 +487,12 @@ def to_avro(names=nil)

class FixedSchema < NamedSchema
attr_reader :size
def initialize(name, space, size, names=nil, logical_type=nil)
def initialize(name, space, size, names=nil, logical_type=nil, aliases=nil)
# Ensure valid cto args
unless size.is_a?(Integer)
raise AvroError, 'Fixed Schema requires a valid integer for size property.'
end
super(:fixed, name, space, names, nil, logical_type)
super(:fixed, name, space, names, nil, logical_type, aliases)
@size = size
end

Expand All @@ -460,14 +503,16 @@ def to_avro(names=Set.new)
end

class Field < Schema
attr_reader :type, :name, :default, :order, :doc
attr_reader :type, :name, :default, :order, :doc, :aliases

def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil)
def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil, aliases=nil)
@type = subparse(type, names, namespace)
@name = name
@default = default
@order = order
@doc = doc
@aliases = aliases
validate_aliases! if aliases
validate_default! if default? && !Avro.disable_field_default_validation
end

Expand All @@ -483,6 +528,10 @@ def to_avro(names=Set.new)
end
end

def alias_names
@alias_names ||= Array(aliases)
end

private

def validate_default!
Expand Down
23 changes: 14 additions & 9 deletions lang/ruby/lib/avro/schema_compatibility.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ def self.mutual_read?(writers_schema, readers_schema)
end

# Perform a basic check that a datum written with the writers_schema could
# be read using the readers_schema. This check only includes matching the types,
# including schema promotion, and matching the full name for named types.
# Aliases for named types are not supported here, and the ruby implementation
# of Avro in general does not include support for aliases.
# be read using the readers_schema. This check includes matching the types,
# including schema promotion, and matching the full name (including aliases) for named types.
def self.match_schemas(writers_schema, readers_schema)
w_type = writers_schema.type_sym
r_type = readers_schema.type_sym
Expand All @@ -46,16 +44,16 @@ def self.match_schemas(writers_schema, readers_schema)

case r_type
when :record
return writers_schema.fullname == readers_schema.fullname
return readers_schema.match_fullname?(writers_schema.fullname)
when :error
return writers_schema.fullname == readers_schema.fullname
return readers_schema.match_fullname?(writers_schema.fullname)
when :request
return true
when :fixed
return writers_schema.fullname == readers_schema.fullname &&
return readers_schema.match_fullname?(writers_schema.fullname) &&
writers_schema.size == readers_schema.size
when :enum
return writers_schema.fullname == readers_schema.fullname
return readers_schema.match_fullname?(writers_schema.fullname)
when :map
return match_schemas(writers_schema.values, readers_schema.values)
when :array
Expand Down Expand Up @@ -148,7 +146,14 @@ def match_record_schemas(writers_schema, readers_schema)
if writer_fields_hash.key?(field.name)
return false unless full_match_schemas(writer_fields_hash[field.name].type, field.type)
else
return false unless field.default?
names = writer_fields_hash.keys & field.alias_names
if names.size > 1
return false
elsif names.size == 1
return false unless full_match_schemas(writer_fields_hash[names.first].type, field.type)
else
return false unless field.default?
end
end
end

Expand Down
16 changes: 16 additions & 0 deletions lang/ruby/test/test_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,22 @@ def test_map_schema_promotion
assert_equal(datum_read, datum_to_write)
end

def test_aliased
writers_schema = Avro::Schema.parse(<<-SCHEMA)
{"type":"record", "name":"Rec1", "fields":[
{"name":"field1", "type":"int"}
]}
SCHEMA
readers_schema = Avro::Schema.parse(<<-SCHEMA)
{"type":"record", "name":"Rec2", "aliases":["Rec1"], "fields":[
{"name":"field2", "aliases":["field1"], "type":"int"}
]}
SCHEMA
writer, * = write_datum({ 'field1' => 1 }, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
assert_equal(datum_read, { 'field2' => 1 })
end

def test_snappy_backward_compat
# a snappy-compressed block payload without the checksum
# this has no back-references, just one literal so the last 9
Expand Down
Loading

0 comments on commit 5287c79

Please sign in to comment.