Skip to content

Commit

Permalink
mix format
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Norbäck Olivers committed Oct 17, 2020
1 parent 5063b17 commit 481d9bf
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 103 deletions.
16 changes: 8 additions & 8 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use Mix.Config

config :logger,
backends: [LokiLogger]
backends: [LokiLogger]

config :logger, :loki_logger,
level: :debug,
format: "$metadata level=$level $levelpad$message",
metadata: :all,
max_buffer: 300,
loki_labels: %{application: "loki_logger_library", elixir_node: node()},
loki_host: "http://localhost:3100",
loki_scope_org_id: "acme_inc"
level: :debug,
format: "$metadata level=$level $levelpad$message",
metadata: :all,
max_buffer: 300,
loki_labels: %{application: "loki_logger_library", elixir_node: node()},
loki_host: "http://localhost:3100",
loki_scope_org_id: "acme_inc"

config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase
72 changes: 36 additions & 36 deletions lib/logproto/loki.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Logproto.PushRequest do
}
defstruct [:streams]

field :streams, 1, repeated: true, type: Logproto.Stream
field(:streams, 1, repeated: true, type: Logproto.Stream)
end

defmodule Logproto.PushResponse do
Expand All @@ -32,12 +32,12 @@ defmodule Logproto.QueryRequest do
}
defstruct [:query, :limit, :start, :end, :direction, :regex]

field :query, 1, type: :string
field :limit, 2, type: :uint32
field :start, 3, type: Google.Protobuf.Timestamp
field :end, 4, type: Google.Protobuf.Timestamp
field :direction, 5, type: Logproto.Direction, enum: true
field :regex, 6, type: :string
field(:query, 1, type: :string)
field(:limit, 2, type: :uint32)
field(:start, 3, type: Google.Protobuf.Timestamp)
field(:end, 4, type: Google.Protobuf.Timestamp)
field(:direction, 5, type: Logproto.Direction, enum: true)
field(:regex, 6, type: :string)
end

defmodule Logproto.QueryResponse do
Expand All @@ -49,7 +49,7 @@ defmodule Logproto.QueryResponse do
}
defstruct [:streams]

field :streams, 1, repeated: true, type: Logproto.Stream
field(:streams, 1, repeated: true, type: Logproto.Stream)
end

defmodule Logproto.LabelRequest do
Expand All @@ -64,10 +64,10 @@ defmodule Logproto.LabelRequest do
}
defstruct [:name, :values, :start, :end]

field :name, 1, type: :string
field :values, 2, type: :bool
field :start, 3, type: Google.Protobuf.Timestamp
field :end, 4, type: Google.Protobuf.Timestamp
field(:name, 1, type: :string)
field(:values, 2, type: :bool)
field(:start, 3, type: Google.Protobuf.Timestamp)
field(:end, 4, type: Google.Protobuf.Timestamp)
end

defmodule Logproto.LabelResponse do
Expand All @@ -79,7 +79,7 @@ defmodule Logproto.LabelResponse do
}
defstruct [:values]

field :values, 1, repeated: true, type: :string
field(:values, 1, repeated: true, type: :string)
end

defmodule Logproto.Stream do
Expand All @@ -92,8 +92,8 @@ defmodule Logproto.Stream do
}
defstruct [:labels, :entries]

field :labels, 1, type: :string
field :entries, 2, repeated: true, type: Logproto.Entry
field(:labels, 1, type: :string)
field(:entries, 2, repeated: true, type: Logproto.Entry)
end

defmodule Logproto.Entry do
Expand All @@ -106,8 +106,8 @@ defmodule Logproto.Entry do
}
defstruct [:timestamp, :line]

field :timestamp, 1, type: Google.Protobuf.Timestamp
field :line, 2, type: :string
field(:timestamp, 1, type: Google.Protobuf.Timestamp)
field(:line, 2, type: :string)
end

defmodule Logproto.TailRequest do
Expand All @@ -123,11 +123,11 @@ defmodule Logproto.TailRequest do
}
defstruct [:query, :regex, :delayFor, :limit, :start]

field :query, 1, type: :string
field :regex, 2, type: :string
field :delayFor, 3, type: :uint32
field :limit, 4, type: :uint32
field :start, 5, type: Google.Protobuf.Timestamp
field(:query, 1, type: :string)
field(:regex, 2, type: :string)
field(:delayFor, 3, type: :uint32)
field(:limit, 4, type: :uint32)
field(:start, 5, type: Google.Protobuf.Timestamp)
end

defmodule Logproto.TailResponse do
Expand All @@ -140,8 +140,8 @@ defmodule Logproto.TailResponse do
}
defstruct [:stream, :droppedStreams]

field :stream, 1, type: Logproto.Stream
field :droppedStreams, 2, repeated: true, type: Logproto.DroppedStream
field(:stream, 1, type: Logproto.Stream)
field(:droppedStreams, 2, repeated: true, type: Logproto.DroppedStream)
end

defmodule Logproto.DroppedStream do
Expand All @@ -155,9 +155,9 @@ defmodule Logproto.DroppedStream do
}
defstruct [:from, :to, :labels]

field :from, 1, type: Google.Protobuf.Timestamp
field :to, 2, type: Google.Protobuf.Timestamp
field :labels, 3, type: :string
field(:from, 1, type: Google.Protobuf.Timestamp)
field(:to, 2, type: Google.Protobuf.Timestamp)
field(:labels, 3, type: :string)
end

defmodule Logproto.TimeSeriesChunk do
Expand All @@ -172,10 +172,10 @@ defmodule Logproto.TimeSeriesChunk do
}
defstruct [:from_ingester_id, :user_id, :labels, :chunks]

field :from_ingester_id, 1, type: :string
field :user_id, 2, type: :string
field :labels, 3, repeated: true, type: Logproto.LabelPair
field :chunks, 4, repeated: true, type: Logproto.Chunk
field(:from_ingester_id, 1, type: :string)
field(:user_id, 2, type: :string)
field(:labels, 3, repeated: true, type: Logproto.LabelPair)
field(:chunks, 4, repeated: true, type: Logproto.Chunk)
end

defmodule Logproto.LabelPair do
Expand All @@ -188,8 +188,8 @@ defmodule Logproto.LabelPair do
}
defstruct [:name, :value]

field :name, 1, type: :string
field :value, 2, type: :string
field(:name, 1, type: :string)
field(:value, 2, type: :string)
end

defmodule Logproto.Chunk do
Expand All @@ -201,7 +201,7 @@ defmodule Logproto.Chunk do
}
defstruct [:data]

field :data, 1, type: :bytes
field(:data, 1, type: :bytes)
end

defmodule Logproto.TransferChunksResponse do
Expand All @@ -216,6 +216,6 @@ defmodule Logproto.Direction do
@moduledoc false
use Protobuf, enum: true, syntax: :proto3

field :FORWARD, 0
field :BACKWARD, 1
field(:FORWARD, 0)
field(:BACKWARD, 1)
end
134 changes: 77 additions & 57 deletions lib/loki_logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ defmodule LokiLogger do
:ok
end


## Helpers

defp meet_level?(_lvl, nil), do: true
Expand All @@ -79,24 +78,30 @@ defmodule LokiLogger do

defp init(config, state) do
level = Keyword.get(config, :level, :info)
format = Logger.Formatter.compile(Keyword.get(config, :format, "$metadata level=$level $levelpad$message"))
metadata = Keyword.get(config, :metadata, :all)
|> configure_metadata()

format =
Logger.Formatter.compile(
Keyword.get(config, :format, "$metadata level=$level $levelpad$message")
)

metadata =
Keyword.get(config, :metadata, :all)
|> configure_metadata()

max_buffer = Keyword.get(config, :max_buffer, 32)
loki_labels = Keyword.get(config, :loki_labels, %{application: "loki_logger_library"})
loki_host = Keyword.get(config, :loki_host, "http://localhost:3100")
loki_scope_org_id = Keyword.get(config, :loki_scope_org_id, "fake")

%{
state
|
format: format,
metadata: metadata,
level: level,
max_buffer: max_buffer,
loki_labels: loki_labels,
loki_host: loki_host,
loki_scope_org_id: loki_scope_org_id
| format: format,
metadata: metadata,
level: level,
max_buffer: max_buffer,
loki_labels: loki_labels,
loki_host: loki_host,
loki_scope_org_id: loki_scope_org_id
}
end

Expand All @@ -115,70 +120,84 @@ defmodule LokiLogger do

defp buffer_event(level, msg, ts, md, state) do
%{buffer: buffer, buffer_size: buffer_size} = state
epoch_nano = DateTime.to_unix(Timex.to_datetime(ts, Timex.Timezone.Local.lookup()), :nanosecond)

epoch_nano =
DateTime.to_unix(Timex.to_datetime(ts, Timex.Timezone.Local.lookup()), :nanosecond)

buffer = buffer ++ [{epoch_nano, format_event(level, msg, ts, md, state)}]
%{state | buffer: buffer, buffer_size: buffer_size + 1}
end

defp async_io(loki_host, loki_labels, loki_scope_org_id, output) do
defp async_io(loki_host, loki_labels, loki_scope_org_id, output) do
bin_push_request = generate_bin_push_request(loki_labels, output)

http_headers = [{"Content-Type", "application/x-protobuf"}, {"X-Scope-OrgID", loki_scope_org_id}]
http_headers = [
{"Content-Type", "application/x-protobuf"},
{"X-Scope-OrgID", loki_scope_org_id}
]

# TODO: replace with async http call
case HTTPoison.post "#{loki_host}/api/prom/push", bin_push_request,
http_headers do
case HTTPoison.post("#{loki_host}/api/prom/push", bin_push_request, http_headers) do
{:ok, %HTTPoison.Response{status_code: 204}} ->
#expected
# expected
:noop

{:ok, %HTTPoison.Response{status_code: status_code, body: body}} ->
IO.puts inspect(
output
|> List.keysort(1)
|> Enum.reverse,
pretty: true
)

raise "unexpected status code from loki backend #{status_code}" <> Exception.format_exit(body)
IO.puts(
inspect(
output
|> List.keysort(1)
|> Enum.reverse(),
pretty: true
)
)

raise "unexpected status code from loki backend #{status_code}" <>
Exception.format_exit(body)

{:error, %HTTPoison.Error{reason: reason}} ->
raise "http error from loki backend " <> Exception.format_exit(reason)
end
end

defp generate_bin_push_request(loki_labels, output) do
labels = Enum.map(loki_labels, fn {k, v} -> "#{k}=\"#{v}\"" end)
|> Enum.join(",")
labels =
Enum.map(loki_labels, fn {k, v} -> "#{k}=\"#{v}\"" end)
|> Enum.join(",")

labels = "{" <> labels <> "}"
# sort entries on epoch seconds as first element of tuple, to prevent out-of-order entries
sorted_entries = output
|> List.keysort(0)
|> Enum.map(
fn {ts, line} ->
seconds = Kernel.trunc(ts / 1_000_000_000)
nanos = ts - (seconds * 1_000_000_000)
Logproto.Entry.new(
timestamp: Google.Protobuf.Timestamp.new(seconds: seconds, nanos: nanos),
line: line
)
end
)

request = Logproto.PushRequest.new(
streams: [
Logproto.Stream.new(
labels: labels,
entries: sorted_entries
sorted_entries =
output
|> List.keysort(0)
|> Enum.map(fn {ts, line} ->
seconds = Kernel.trunc(ts / 1_000_000_000)
nanos = ts - seconds * 1_000_000_000

Logproto.Entry.new(
timestamp: Google.Protobuf.Timestamp.new(seconds: seconds, nanos: nanos),
line: line
)
]
)

{:ok, bin_push_request} = Logproto.PushRequest.encode(request)
|> :snappyer.compress
end)

request =
Logproto.PushRequest.new(
streams: [
Logproto.Stream.new(
labels: labels,
entries: sorted_entries
)
]
)

{:ok, bin_push_request} =
Logproto.PushRequest.encode(request)
|> :snappyer.compress()

bin_push_request
end

defp format_event(level, msg, ts, md, %{format: format, metadata: keys} = _state) do
defp format_event(level, msg, ts, md, %{format: format, metadata: keys} = _state) do
List.to_string(Logger.Formatter.format(format, level, msg, ts, take_metadata(md, keys)))
end

Expand All @@ -202,7 +221,12 @@ defmodule LokiLogger do
defp log_buffer(%{buffer_size: 0, buffer: []} = state), do: state

defp log_buffer(
%{loki_host: loki_host, loki_labels: loki_labels, loki_scope_org_id: loki_scope_org_id, buffer: buffer} = state
%{
loki_host: loki_host,
loki_labels: loki_labels,
loki_scope_org_id: loki_scope_org_id,
buffer: buffer
} = state
) do
async_io(loki_host, loki_labels, loki_scope_org_id, buffer)
%{state | buffer: [], buffer_size: 0}
Expand All @@ -212,7 +236,3 @@ defmodule LokiLogger do
log_buffer(state)
end
end




1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ defmodule LokiLogger.MixProject do
links: %{"GitHub" => "https://github.com/wardbekker/LokiLogger.git"}
]
end

end
Loading

0 comments on commit 481d9bf

Please sign in to comment.