Skip to content

Commit

Permalink
Fix Confluent Kafka connectivity with SSL support
Browse files Browse the repository at this point in the history
Bump version to 1.13.0

Closes spreedly#88
  • Loading branch information
andyleclair authored and Kevin Lewis committed May 24, 2019
1 parent a6f2f4e commit c16eee9
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 7 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ config :kaffe,
# optional
partition_strategy: :md5,
ssl: true,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
Expand All @@ -305,7 +306,9 @@ The `partition_strategy` setting can be one of:
You can also set any of the Brod producer configuration options in the `producer` section - see [the Brod sources](https://github.com/klarna/brod/blob/master/src/brod_producer.erl#L90) for a list of keys and their meaning.
If kafka broker configured with `SASL_PLAINTEXT` auth, `sasl` option can be added
If the Kafka broker is configured with `SASL_PLAINTEXT` auth, the `sasl` option can be added.
If using Confluent Hosted Kafka, also add `ssl: true` as shown above.
## Heroku Configuration
Expand Down
6 changes: 3 additions & 3 deletions lib/kaffe/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ defmodule Kaffe.Config do
ssl_config(client_cert(), client_cert_key())
end

def ssl_config(_client_cert = nil, _client_cert_key = nil) do
[]
end
def ssl_config(true), do: [ssl: true]
def ssl_config(_), do: []

def ssl_config(_client_cert = nil, _client_cert_key = nil), do: []
def ssl_config(client_cert, client_cert_key) do
[
ssl: [
Expand Down
8 changes: 7 additions & 1 deletion lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Kaffe.Config.Consumer do
end

def client_consumer_config do
default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options()
default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options()
end

def sasl_options do
Expand All @@ -81,6 +81,12 @@ defmodule Kaffe.Config.Consumer do
|> Kaffe.Config.sasl_config()
end

def ssl_options do
:ssl
|> config_get(false)
|> Kaffe.Config.ssl_config()
end

def default_client_consumer_config do
[
auto_start_producers: false,
Expand Down
8 changes: 7 additions & 1 deletion lib/kaffe/config/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Kaffe.Config.Producer do
end

def client_producer_config do
default_client_producer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options()
default_client_producer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options()
end

def sasl_options do
Expand All @@ -38,6 +38,12 @@ defmodule Kaffe.Config.Producer do
end
end

def ssl_options do
:ssl
|> config_get(false)
|> Kaffe.Config.ssl_config()
end

def default_client_producer_config do
[
auto_start_producers: true,
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Kaffe.Mixfile do
def project do
[
app: :kaffe,
version: "1.12.0",
version: "1.13.0",
description:
"An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.",
name: "Kaffe",
Expand Down
42 changes: 42 additions & 0 deletions test/kaffe/config/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Kaffe.Config.ConsumerTest do
consumer_config =
Application.get_env(:kaffe, :consumer)
|> Keyword.delete(:offset_reset_policy)
|> Keyword.delete(:ssl)
|> Keyword.put(:start_with_earliest_message, true)

Application.put_env(:kaffe, :consumer, consumer_config)
Expand Down Expand Up @@ -128,6 +129,47 @@ defmodule Kaffe.Config.ConsumerTest do
assert Kaffe.Config.Consumer.configuration() == expected
end

test "correct settings with ssl are extracted" do
config = Application.get_env(:kaffe, :consumer)
ssl = Keyword.get(config, :ssl)
ssl_config = Keyword.put(config, :ssl, true)

Application.put_env(:kaffe, :consumer, ssl_config)

expected = %{
endpoints: [kafka: 9092],
subscriber_name: :"kaffe-test-group",
consumer_group: "kaffe-test-group",
topics: ["kaffe-test"],
group_config: [
offset_commit_policy: :commit_to_kafka_v2,
offset_commit_interval_seconds: 10
],
consumer_config: [
auto_start_producers: false,
allow_topic_auto_creation: false,
begin_offset: :earliest,
ssl: true
],
message_handler: SilentMessage,
async_message_ack: false,
rebalance_delay_ms: 100,
max_bytes: 10_000,
min_bytes: 0,
max_wait_time: 10_000,
subscriber_retries: 1,
subscriber_retry_delay_ms: 5,
offset_reset_policy: :reset_by_subscriber,
worker_allocation_strategy: :worker_per_partition
}

on_exit(fn ->
Application.put_env(:kaffe, :consumer, Keyword.put(config, :ssl, ssl))
end)

assert Kaffe.Config.Consumer.configuration() == expected
end

describe "offset_reset_policy" do
test "computes correctly from start_with_earliest_message == true" do
consumer_config =
Expand Down
35 changes: 35 additions & 0 deletions test/kaffe/config/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,39 @@ defmodule Kaffe.Config.ProducerTest do

assert Kaffe.Config.Producer.configuration() == expected
end

test "adds ssl when true" do
config = Application.get_env(:kaffe, :producer)
ssl = Keyword.get(config, :ssl)
Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, true))

expected = %{
endpoints: [kafka: 9092],
producer_config: [
auto_start_producers: true,
allow_topic_auto_creation: false,
default_producer_config: [
required_acks: -1,
ack_timeout: 1000,
partition_buffer_limit: 512,
partition_onwire_limit: 1,
max_batch_size: 1_048_576,
max_retries: 3,
retry_backoff_ms: 500,
compression: :no_compression,
min_compression_batch_size: 1024
],
ssl: true
],
topics: ["kaffe-test"],
client_name: :kaffe_producer_client,
partition_strategy: :md5
}

on_exit(fn ->
Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, ssl))
end)

assert Kaffe.Config.Producer.configuration() == expected
end
end

0 comments on commit c16eee9

Please sign in to comment.