Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions apps/opentelemetry_experimental/src/otel_log_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("opentelemetry_api/include/opentelemetry.hrl").

-export([start_link/2]).
-export([start_link/2,
flush/1]).

-export([log/2,
adding_handler/1,
Expand All @@ -37,9 +38,18 @@
exporting/3,
handle_event/3]).


-type handler_config() :: #{max_queue_size => non_neg_integer(),
exporting_timeout_ms => non_neg_integer(),
scheduled_delay_ms => non_neg_integer(),
exporter => {module(), term()},
report_cb => fun(),
depth => non_neg_integer(),
chars_limit => non_neg_integer(),
single_line => boolean()}.
-type config() :: #{id => logger:handler_id(),
regname := atom(),
config => term(),
config => handler_config(),
level => logger:level() | all | none,
module => module(),
filter_default => log | stop,
Expand Down Expand Up @@ -69,6 +79,11 @@
start_link(RegName, Config) ->
gen_statem:start_link({local, RegName}, ?MODULE, [RegName, Config], []).

-spec flush(Config) -> ok when
Config :: config().
flush(_Config=#{regname := Id}) ->
gen_statem:call(Id, flush).

-spec adding_handler(Config) -> {ok, Config} | {error, Reason} when
Config :: config(),
Reason :: term().
Expand Down Expand Up @@ -140,17 +155,17 @@ init([_RegName, Config]) ->
process_flag(trap_exit, true),

Resource = otel_resource_detector:get_resource(),
HandlerConfig = maps:get(config, Config, #{}),
SizeLimit = maps:get(max_queue_size, HandlerConfig, ?DEFAULT_MAX_QUEUE_SIZE),
ExportingTimeout = maps:get(exporting_timeout_ms, HandlerConfig, ?DEFAULT_EXPORTER_TIMEOUT_MS),
ScheduledDelay = maps:get(scheduled_delay_ms, HandlerConfig, ?DEFAULT_SCHEDULED_DELAY_MS),

SizeLimit = maps:get(max_queue_size, Config, ?DEFAULT_MAX_QUEUE_SIZE),
ExportingTimeout = maps:get(exporting_timeout_ms, Config, ?DEFAULT_EXPORTER_TIMEOUT_MS),
ScheduledDelay = maps:get(scheduled_delay_ms, Config, ?DEFAULT_SCHEDULED_DELAY_MS),

ExporterConfig = maps:get(exporter, Config, {opentelemetry_exporter, #{protocol => grpc}}),
ExporterConfig = maps:get(exporter, HandlerConfig, {opentelemetry_exporter, #{protocol => grpc}}),

{ok, idle, #data{exporter=undefined,
exporter_config=ExporterConfig,
resource=Resource,
config=Config,
config=HandlerConfig,
max_queue_size=case SizeLimit of
infinity -> infinity;
_ -> SizeLimit div erlang:system_info(wordsize)
Expand Down Expand Up @@ -183,24 +198,23 @@ exporting({timeout, export_logs}, export_logs, _) ->
{keep_state_and_data, [postpone]};
exporting(enter, _OldState, _Data) ->
keep_state_and_data;
exporting(internal, export, Data=#data{exporter=Exporter,
resource=Resource,
config=Config,
batch=Batch}) when map_size(Batch) =/= 0 ->
_ = export(Exporter, Resource, Batch, Config),
exporting(internal, export, Data) ->
ok = maybe_export(Data),
{next_state, idle, Data#data{batch=#{}}};
exporting(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

handle_event({call, From}, {changing_config, _SetOrUpdate, _OldConfig, NewConfig}, Data) ->
{keep_state, Data#data{config=NewConfig}, [{reply, From, NewConfig}]};
handle_event({call, From}, {removing_handler, Config}, _Data) ->
%% TODO: flush
HandlerConfig = maps:get(config, NewConfig, #{}),
{keep_state, Data#data{config=HandlerConfig}, [{reply, From, NewConfig}]};
handle_event({call, From}, {removing_handler, Config}, Data) ->
ok = maybe_export(Data),
{keep_state_and_data, [{reply, From, Config}]};
handle_event({call, From}, {filter_handler, Config}, Data) ->
{keep_state, Data, [{reply, From, Config}]};
handle_event({call, From}, {filter_config, Config}, Data) ->
{keep_state, Data, [{reply, From, Config}]};
handle_event({call, From}, flush, Data) ->
ok = maybe_export(Data),
{keep_state, Data, [{reply, From, ok}]};
handle_event({call, _From}, _Msg, _Data) ->
keep_state_and_data;
handle_event(cast, {log, Scope, LogEvent}, Data=#data{batch=Logs}) ->
Expand All @@ -220,6 +234,15 @@ init_exporter(ExporterConfig) ->
undefined
end.

maybe_export(Data=#data{exporter=Exporter,
resource=Resource,
config=Config,
batch=Batch}) when map_size(Batch) =/= 0 ->
_ = export(Exporter, Resource, Batch, Config),
ok;
maybe_export(_Data) ->
ok.

export(undefined, _, _, _) ->
true;
export(Exporter, Resource, Batch, Config) ->
Expand Down
63 changes: 34 additions & 29 deletions apps/opentelemetry_experimental/src/otel_otlp_logs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,52 +76,57 @@ log_record(#{level := Level,
trim(
lists:reverse(
trim(S, false)), true)),
%% TODO Do we actually have to strip string of newlines with indentation?
re:replace(T,",?\r?\n\s*",", ",
[{return,list}, global, unicode]);
[{return,binary}, global, unicode]);
M ->
M
end,
Attributes = maps:without([gl, time, report_cb], Metadata),
Attributes = maps:without([gl, time, report_cb, otel_trace_id, otel_span_id, otel_trace_flags], Metadata),
Attributes1 = maps:fold(fun(K, V, Acc) ->
[#{key => otel_otlp_common:to_binary(K),
value => otel_otlp_common:to_any_value(V)} | Acc]
end, [], Attributes),
%% FIXME Always 0?
DroppedAttributesCount = maps:size(Attributes) - length(Attributes1),
Flags = 0,

%% Note: otel_trace_id and otel_span_id from hex_span_ctx are now binaries, not charlists.
LogRecord = case Metadata of
#{otel_trace_id := TraceId,
otel_span_id := SpanId,
otel_trace_flags := TraceFlagsHex} ->
TraceFlags = case TraceFlagsHex of
<<_:0, _/binary>> when byte_size(TraceFlagsHex) == 2 ->
erlang:binary_to_integer(TraceFlagsHex, 16);
_ -> 0
end,
#{trace_id => TraceId,
span_id => SpanId,
trace_flags => TraceFlags};
#{otel_trace_id := TraceId,
otel_span_id := SpanId} ->
#{trace_id => TraceId,
span_id => SpanId};
_ ->
#{}
end,


%% NOTE: Collector wants IDs to be bytes (https://github.com/open-telemetry/opentelemetry-proto/blob/ca839c51f706f5d53bfb46f06c3e90c3af3a52c6/opentelemetry/proto/logs/v1/logs.proto#L199)
LogRecord = try_prepare_tracing_ids(Metadata),

LogRecord#{time_unix_nano => opentelemetry:timestamp_to_nano(Time),
observed_time_unix_nano => erlang:convert_time_unit(ObservedTime, microsecond, nanosecond),
severity_number => SeverityNumber,
severity_text => SeverityText,
body => otel_otlp_common:to_any_value(Body1),
attributes => Attributes1,
severity_number => SeverityNumber,
severity_text => SeverityText,
body => otel_otlp_common:to_any_value(Body1),
attributes => Attributes1,
dropped_attributes_count => DroppedAttributesCount,
flags => Flags
flags => Flags
}.

try_prepare_tracing_ids(#{otel_trace_id := TraceId, otel_span_id := SpanId, otel_trace_flags := TraceFlagsHex}) ->
try
TraceFlags = case TraceFlagsHex of
<<_:0, _/binary>> when byte_size(TraceFlagsHex) == 2 ->
erlang:binary_to_integer(TraceFlagsHex, 16);
_ -> 0
end,
#{trace_id => <<(binary_to_integer(TraceId, 16)):128>>,
span_id => <<(binary_to_integer(SpanId, 16)):64>>,
trace_flags => TraceFlags}
catch
_:_ -> #{}
end;
try_prepare_tracing_ids(#{otel_trace_id := TraceId, otel_span_id := SpanId}) ->
try
#{trace_id => <<(binary_to_integer(TraceId, 16)):128>>,
span_id => <<(binary_to_integer(SpanId, 16)):64>>}
catch
_:_ -> #{}
end;
try_prepare_tracing_ids(_) ->
#{}.

format_msg({string, Chardata}, Meta, Config) ->
format_msg({"~ts", [Chardata]}, Meta, Config);
format_msg({report,_}=Msg, Meta, #{report_cb := Fun}=Config)
Expand Down