From d32826198949809f864d27ebe26cf69cc3f581f8 Mon Sep 17 00:00:00 2001 From: Shamil Ishraq Date: Fri, 6 Oct 2017 11:24:10 -0400 Subject: [PATCH] Update Kaffe to Brod 3.0 This exposes the Kafka timestamp properties of the message. There are no internal changes to Kaffe. Closes #45 --- mix.exs | 4 ++-- mix.lock | 12 ++++++------ .../group_member/subscriber/subscriber_test.exs | 16 ++++++++++++---- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/mix.exs b/mix.exs index 0668bd4..27622a7 100644 --- a/mix.exs +++ b/mix.exs @@ -3,7 +3,7 @@ defmodule Kaffe.Mixfile do def project do [app: :kaffe, - version: "1.3.1", + version: "1.4.0", description: "An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.", name: "Kaffe", source_url: "https://github.com/spreedly/kaffe", @@ -25,7 +25,7 @@ defmodule Kaffe.Mixfile do defp deps do [ - {:brod, "~> 2.2"}, + {:brod, "~> 3.0"}, {:ex_doc, "~> 0.14", only: :dev, runtime: false}, ] end diff --git a/mix.lock b/mix.lock index b42388b..354d013 100644 --- a/mix.lock +++ b/mix.lock @@ -1,8 +1,8 @@ -%{"brod": {:hex, :brod, "2.3.1", "c80036a92fff8a6be1182293c2cf353fa13e7deb11747d09fe0ae3fcd8d10f54", [:make, :rebar, :rebar3], [{:kafka_protocol, "0.9.1", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.5", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"}, - "earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, - "kafka_protocol": {:hex, :kafka_protocol, "0.9.1", "8e943f26590a1ae034422fb8305f64a1aa21ff6dea4fb51e36b702c410861205", [:make, :rebar, :rebar3], [{:snappyer, "1.2.0", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm"}, +%{"brod": {:hex, :brod, "3.2.0", "64f0778a7a32ec0a39cec9a564f4686bdfe72b147b48076e114a156fd0a30222", [:make, :rebar, :rebar3], [{:kafka_protocol, "1.1.0", [repo: "hexpm", hex: :kafka_protocol, optional: false]}, {:supervisor3, "1.1.5", [repo: "hexpm", hex: :supervisor3, optional: false]}], "hexpm"}, + "earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], []}, + "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, + "kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [repo: "hexpm", hex: :snappyer, optional: false]}], "hexpm"}, "logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []}, "metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]}, - "snappyer": {:hex, :snappyer, "1.2.0", "4761f475c53fec4a027dc90ea0626a735e3d276bfcfaf9a8dc4c7bbda4d1e058", [:make, :rebar, :rebar3], [], "hexpm"}, - "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], [], "hexpm"}} + "snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm"}, + "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], []}} diff --git a/test/kaffe/group_member/subscriber/subscriber_test.exs b/test/kaffe/group_member/subscriber/subscriber_test.exs index b4b8a57..bcf5306 100644 --- a/test/kaffe/group_member/subscriber/subscriber_test.exs +++ b/test/kaffe/group_member/subscriber/subscriber_test.exs @@ -2,6 +2,7 @@ defmodule Kaffe.SubscriberTest do use ExUnit.Case + import Kaffe.Subscriber alias Kaffe.Subscriber defmodule TestKafka do @@ -37,7 +38,7 @@ defmodule Kaffe.SubscriberTest do end test "handle message set" do - + Process.register(self(), :test_case) {:ok, kafka_pid} = TestKafka.start_link(0) @@ -49,7 +50,7 @@ defmodule Kaffe.SubscriberTest do end test "handle kafka_fetch_error" do - + Process.register(self(), :test_case) {:ok, kafka_pid} = TestKafka.start_link(0) @@ -66,7 +67,7 @@ defmodule Kaffe.SubscriberTest do end test "handle consumer down" do - + Process.register(self(), :test_case) {:ok, kafka_pid} = TestKafka.start_link(0) @@ -113,7 +114,14 @@ defmodule Kaffe.SubscriberTest do defp build_message_list do Enum.map(1..10, fn (n) -> - {:kafka_message, n, 0, 0, "key-#{n}", "#{n}", -1} + Subscriber.kafka_message( + offset: n, + magic_byte: 0, + attributes: 0, + key: "key-#{n}", + value: "#{n}", + crc: -1 + ) end) end end