diff --git a/.chloggen/config.yaml b/.chloggen/config.yaml index 0c87c8f876875..b3c98ea2ce5f1 100644 --- a/.chloggen/config.yaml +++ b/.chloggen/config.yaml @@ -227,6 +227,7 @@ components: - processor/tail_sampling - processor/tencentcvmdetector - processor/transform + - processor/transformprocessor/internal/logparsingfuncs - processor/unroll - processor/upclouddetector - processor/vultrdetector diff --git a/.chloggen/feat_transform-leef-parser.yaml b/.chloggen/feat_transform-leef-parser.yaml new file mode 100644 index 0000000000000..b6bfdabef0ef9 --- /dev/null +++ b/.chloggen/feat_transform-leef-parser.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `parse_leef` function to parse Log Event Extended Format (LEEF) messages. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [44908] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/transformprocessor-parse-clf.yaml b/.chloggen/transformprocessor-parse-clf.yaml new file mode 100644 index 0000000000000..f381a28128f88 --- /dev/null +++ b/.chloggen/transformprocessor-parse-clf.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `ParseCLF` function for parsing Common Log Format (CLF) HTTP access log entries. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [48349] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + `ParseCLF` is available in log statements and returns a map with the parsed + `remote_host`, `rfc931`, `authuser`, `timestamp`, `request`, `method`, + `request_uri`, `protocol`, `status`, and `bytes` fields. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e5bfbbb996a90..a60fe28f3a918 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -243,6 +243,7 @@ processor/spanpruningprocessor/ @open-telemetry processor/sumologicprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo @pankaj101A @jagan2221 processor/tailsamplingprocessor/ @open-telemetry/collector-contrib-approvers @portertech @jmacd @csmarchbanks @carsonip processor/transformprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @evan-bradley @edmocosta @bogdandrutu +processor/transformprocessor/internal/logparsingfuncs/ @open-telemetry/collector-contrib-approvers @Caleb-Hurshman @Dylan-M processor/unrollprocessor/ @open-telemetry/collector-contrib-approvers @axw @schmikei @rnishtala-sumo receiver/activedirectorydsreceiver/ @open-telemetry/collector-contrib-approvers @pjanotti receiver/aerospikereceiver/ @open-telemetry/collector-contrib-approvers @antonblock diff --git a/.github/ISSUE_TEMPLATE/beta_stability.yaml b/.github/ISSUE_TEMPLATE/beta_stability.yaml index dc992eea44a6a..a8cf88c7bd67a 100644 --- a/.github/ISSUE_TEMPLATE/beta_stability.yaml +++ b/.github/ISSUE_TEMPLATE/beta_stability.yaml @@ -240,6 +240,7 @@ body: - processor/sumologic - processor/tailsampling - processor/transform + - processor/transform/internal/logparsingfuncs - processor/unroll - receiver/activedirectoryds - receiver/aerospike diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml index 4fc1232d8ec6b..c5fa9794b4d9d 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yaml +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -243,6 +243,7 @@ body: - processor/sumologic - processor/tailsampling - processor/transform + - processor/transform/internal/logparsingfuncs - processor/unroll - receiver/activedirectoryds - receiver/aerospike diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml index 99fb70a74129d..1aaf918dbed00 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yaml +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -237,6 +237,7 @@ body: - processor/sumologic - processor/tailsampling - processor/transform + - processor/transform/internal/logparsingfuncs - processor/unroll - receiver/activedirectoryds - receiver/aerospike diff --git a/.github/ISSUE_TEMPLATE/other.yaml b/.github/ISSUE_TEMPLATE/other.yaml index 34727aeebc205..03e78dc8c1973 100644 --- a/.github/ISSUE_TEMPLATE/other.yaml +++ b/.github/ISSUE_TEMPLATE/other.yaml @@ -237,6 +237,7 @@ body: - processor/sumologic - processor/tailsampling - processor/transform + - processor/transform/internal/logparsingfuncs - processor/unroll - receiver/activedirectoryds - receiver/aerospike diff --git a/.github/ISSUE_TEMPLATE/unmaintained.yaml b/.github/ISSUE_TEMPLATE/unmaintained.yaml index 8f7ce35cd6105..f6ddad95c23e8 100644 --- a/.github/ISSUE_TEMPLATE/unmaintained.yaml +++ b/.github/ISSUE_TEMPLATE/unmaintained.yaml @@ -242,6 +242,7 @@ body: - processor/sumologic - processor/tailsampling - processor/transform + - processor/transform/internal/logparsingfuncs - processor/unroll - receiver/activedirectoryds - receiver/aerospike diff --git a/.github/component_labels.txt b/.github/component_labels.txt index f377c5c37426e..b4ceaa3ac6344 100644 --- a/.github/component_labels.txt +++ b/.github/component_labels.txt @@ -224,6 +224,7 @@ processor/spanpruningprocessor processor/spanpruning processor/sumologicprocessor processor/sumologic processor/tailsamplingprocessor processor/tailsampling processor/transformprocessor processor/transform +processor/transformprocessor/internal/logparsingfuncs processor/transform/internal/logparsingfuncs processor/unrollprocessor processor/unroll receiver/activedirectorydsreceiver receiver/activedirectoryds receiver/aerospikereceiver receiver/aerospike diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 03877206a1316..4bd87a6a06618 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -273,6 +273,11 @@ In addition to the common OTTL functions, the processor defines its own function - [aggregate_on_attribute_value](#aggregate_on_attribute_value) - [merge_histogram_buckets](#merge_histogram_buckets) +**Logs only functions** + +- [ParseCLF](#parseclf) +- [ParseLEEF](#parseleef) + **Traces only functions** - [set_semconv_span_name](#set_semconv_span_name) @@ -691,6 +696,58 @@ Examples: # counts: [5, 11, 1] ``` +### ParseCLF + +`ParseCLF(target)` + +The `ParseCLF` function returns a `pcommon.Map` that is the result of parsing the `target` string as a [Common Log Format (CLF)](https://www.w3.org/Daemon/User/Config/Logging.html#common-logfile-format) HTTP access log entry. + +`target` is a Getter that returns a string. If the returned string is empty, or cannot be parsed as CLF, an error will be returned. + +The CLF entry is expected to have the form: + +``` +remotehost rfc931 authuser [date] "request" status bytes +``` + +The returned map has the following fields: + +- `remote_host` — the client's DNS name or IP address. +- `rfc931` — the remote logname of the user (CLF uses `-` when unknown). +- `authuser` — the authenticated user (CLF uses `-` when unknown). +- `timestamp` — the contents of the bracketed date field, preserved as a string. +- `request` — the raw request line as sent by the client. +- `method`, `request_uri`, `protocol` — the parsed components of the request line, only set when the request line is well-formed. +- `status` — the HTTP status code as an integer. +- `bytes` — the content-length of the response as an integer. Omitted when CLF reports `-` (e.g. on a 304 response). + +Examples: + +- `ParseCLF(body)` +- `ParseCLF("127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \"GET /apache_pb.gif HTTP/1.0\" 200 2326")` + +### ParseLEEF + +`ParseLEEF(target)` + +The `ParseLEEF` function returns a `pcommon.Map` that is the result of parsing the `target` string as a [Log Event Extended Format (LEEF)](https://www.ibm.com/docs/en/dsm?topic=overview-leef-event-components) message. + +`target` is a Getter that returns a string. If the returned string is empty, or cannot be parsed as LEEF, an error will be returned. + +`ParseLEEF` can parse both LEEF 1.0 and LEEF 2.0 messages. The function is tolerant of an optional syslog header preceding the `LEEF:` token. The returned map has the following top-level fields: + +- `version` — the LEEF version (`"1.0"` or `"2.0"`). +- `vendor`, `product_name`, `product_version`, `event_id` — the LEEF header fields. +- `attributes` — a map of the parsed key/value attribute pairs. + +For LEEF 1.0 the attribute delimiter is always a tab. For LEEF 2.0 the delimiter is taken from the header and may be specified as a single character or as a hex value (e.g. `0x09`). + +Examples: + +- `ParseLEEF(body)` +- `ParseLEEF("LEEF:1.0|Microsoft|MSExchange|4.0 SP1|15345|src=10.50.1.1\tdst=2.10.20.20\tsev=5")` +- `ParseLEEF("LEEF:2.0|Lancope|StealthWatch|1.0|41|^|src=10.0.1.8^dst=10.0.0.5^sev=5")` + ### set_semconv_span_name `set_semconv_span_name(semconvVersion, Optional[originalSpanNameAttribute])` diff --git a/processor/transformprocessor/internal/logparsingfuncs/func_parse_clf.go b/processor/transformprocessor/internal/logparsingfuncs/func_parse_clf.go new file mode 100644 index 0000000000000..2b55c6a6f6b75 --- /dev/null +++ b/processor/transformprocessor/internal/logparsingfuncs/func_parse_clf.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logparsingfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logparsingfuncs" + +import ( + "context" + "errors" + "fmt" + "regexp" + "strconv" + "strings" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" +) + +type parseCLFArguments struct { + Target ottl.StringGetter[*ottllog.TransformContext] +} + +func NewParseCLFFactory() ottl.Factory[*ottllog.TransformContext] { + return ottl.NewFactory("ParseCLF", &parseCLFArguments{}, createParseCLFFunction) +} + +func createParseCLFFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[*ottllog.TransformContext], error) { + args, ok := oArgs.(*parseCLFArguments) + if !ok { + return nil, errors.New("parseCLFFactory args must be of type *parseCLFArguments") + } + + return parseCLF(args.Target), nil +} + +func parseCLF(target ottl.StringGetter[*ottllog.TransformContext]) ottl.ExprFunc[*ottllog.TransformContext] { + return func(ctx context.Context, tCtx *ottllog.TransformContext) (any, error) { + source, err := target.Get(ctx, tCtx) + if err != nil { + return nil, err + } + + if source == "" { + return nil, errors.New("cannot parse empty CLF message") + } + + return parseCLFMessage(source) + } +} + +// clfRegex matches the Common Log Format: +// +// remotehost rfc931 authuser [date] "request" status bytes +// +// See https://www.w3.org/Daemon/User/Config/Logging.html#common-logfile-format +var clfRegex = regexp.MustCompile(`^(\S+) (\S+) (\S+) \[([^\]]+)\] "([^"]*)" (\S+) (\S+)$`) + +func parseCLFMessage(message string) (pcommon.Map, error) { + matches := clfRegex.FindStringSubmatch(strings.TrimSpace(message)) + if matches == nil { + return pcommon.NewMap(), errors.New("invalid CLF message: does not match expected format") + } + + result := pcommon.NewMap() + result.PutStr("remote_host", matches[1]) + result.PutStr("rfc931", matches[2]) + result.PutStr("authuser", matches[3]) + result.PutStr("timestamp", matches[4]) + + request := matches[5] + result.PutStr("request", request) + + if requestParts := strings.SplitN(request, " ", 3); len(requestParts) == 3 { + result.PutStr("method", requestParts[0]) + result.PutStr("request_uri", requestParts[1]) + result.PutStr("protocol", requestParts[2]) + } + + status := matches[6] + statusInt, err := strconv.ParseInt(status, 10, 64) + if err != nil { + return pcommon.NewMap(), fmt.Errorf("invalid status code %q: %w", status, err) + } + result.PutInt("status", statusInt) + + bytesStr := matches[7] + if bytesStr != "-" { + bytesInt, err := strconv.ParseInt(bytesStr, 10, 64) + if err != nil { + return pcommon.NewMap(), fmt.Errorf("invalid bytes value %q: %w", bytesStr, err) + } + result.PutInt("bytes", bytesInt) + } + + return result, nil +} diff --git a/processor/transformprocessor/internal/logparsingfuncs/func_parse_clf_test.go b/processor/transformprocessor/internal/logparsingfuncs/func_parse_clf_test.go new file mode 100644 index 0000000000000..8ddd2a38ce5ad --- /dev/null +++ b/processor/transformprocessor/internal/logparsingfuncs/func_parse_clf_test.go @@ -0,0 +1,327 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logparsingfuncs + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" +) + +func Test_parseCLF(t *testing.T) { + tests := []struct { + name string + input string + expected map[string]any + absent []string + }{ + { + name: "canonical example from the W3C spec", + input: `127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`, + expected: map[string]any{ + "remote_host": "127.0.0.1", + "rfc931": "-", + "authuser": "frank", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "GET /apache_pb.gif HTTP/1.0", + "method": "GET", + "request_uri": "/apache_pb.gif", + "protocol": "HTTP/1.0", + "status": int64(200), + "bytes": int64(2326), + }, + }, + { + name: "all dashes for unknown fields", + input: `- - - [10/Oct/2000:13:55:36 -0700] "GET / HTTP/1.1" 200 0`, + expected: map[string]any{ + "remote_host": "-", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "GET / HTTP/1.1", + "method": "GET", + "request_uri": "/", + "protocol": "HTTP/1.1", + "status": int64(200), + "bytes": int64(0), + }, + }, + { + name: "bytes is dash (no content)", + input: `192.168.1.1 - - [10/Oct/2000:13:55:36 -0700] "GET /redirect HTTP/1.1" 304 -`, + expected: map[string]any{ + "remote_host": "192.168.1.1", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "GET /redirect HTTP/1.1", + "method": "GET", + "request_uri": "/redirect", + "protocol": "HTTP/1.1", + "status": int64(304), + }, + absent: []string{"bytes"}, + }, + { + name: "IPv6 remote host", + input: `2001:db8::1 - - [10/Oct/2000:13:55:36 -0700] "POST /api/v1/users HTTP/1.1" 201 512`, + expected: map[string]any{ + "remote_host": "2001:db8::1", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "POST /api/v1/users HTTP/1.1", + "method": "POST", + "request_uri": "/api/v1/users", + "protocol": "HTTP/1.1", + "status": int64(201), + "bytes": int64(512), + }, + }, + { + name: "hostname remote host with rfc931 user", + input: `client.example.com bob alice [25/Dec/2023:00:00:00 +0000] "DELETE /resource HTTP/2.0" 204 0`, + expected: map[string]any{ + "remote_host": "client.example.com", + "rfc931": "bob", + "authuser": "alice", + "timestamp": "25/Dec/2023:00:00:00 +0000", + "request": "DELETE /resource HTTP/2.0", + "method": "DELETE", + "request_uri": "/resource", + "protocol": "HTTP/2.0", + "status": int64(204), + "bytes": int64(0), + }, + }, + { + name: "request_uri with query string", + input: `10.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET /search?q=hello+world&page=2 HTTP/1.1" 200 1024`, + expected: map[string]any{ + "remote_host": "10.0.0.1", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "GET /search?q=hello+world&page=2 HTTP/1.1", + "method": "GET", + "request_uri": "/search?q=hello+world&page=2", + "protocol": "HTTP/1.1", + "status": int64(200), + "bytes": int64(1024), + }, + }, + { + name: "leading and trailing whitespace tolerated", + input: " 127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] \"GET / HTTP/1.1\" 200 42 ", + expected: map[string]any{ + "remote_host": "127.0.0.1", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "GET / HTTP/1.1", + "method": "GET", + "request_uri": "/", + "protocol": "HTTP/1.1", + "status": int64(200), + "bytes": int64(42), + }, + }, + { + name: "malformed request line is preserved but not split", + input: `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "MALFORMED" 400 0`, + expected: map[string]any{ + "remote_host": "127.0.0.1", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "MALFORMED", + "status": int64(400), + "bytes": int64(0), + }, + absent: []string{"method", "request_uri", "protocol"}, + }, + { + name: "empty request line", + input: `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "" 408 0`, + expected: map[string]any{ + "remote_host": "127.0.0.1", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "", + "status": int64(408), + "bytes": int64(0), + }, + absent: []string{"method", "request_uri", "protocol"}, + }, + { + name: "large byte counts", + input: `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET /download HTTP/1.1" 200 4294967296`, + expected: map[string]any{ + "remote_host": "127.0.0.1", + "rfc931": "-", + "authuser": "-", + "timestamp": "10/Oct/2000:13:55:36 -0700", + "request": "GET /download HTTP/1.1", + "method": "GET", + "request_uri": "/download", + "protocol": "HTTP/1.1", + "status": int64(200), + "bytes": int64(4294967296), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + target := ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return tt.input, nil + }, + } + exprFunc := parseCLF(target) + result, err := exprFunc(t.Context(), nil) + require.NoError(t, err) + + resultMap, ok := result.(pcommon.Map) + require.True(t, ok, "result should be pcommon.Map") + + assertCLFMap(t, resultMap, tt.expected) + for _, k := range tt.absent { + _, ok := resultMap.Get(k) + assert.False(t, ok, "key %q should be absent", k) + } + }) + } +} + +func Test_parseCLF_errors(t *testing.T) { + tests := []struct { + name string + input string + expectedError string + }{ + { + name: "plain text", + input: "this is not a CLF message", + expectedError: "does not match expected format", + }, + { + name: "missing brackets around date", + input: `127.0.0.1 - - 10/Oct/2000:13:55:36 -0700 "GET / HTTP/1.1" 200 42`, + expectedError: "does not match expected format", + }, + { + name: "missing quotes around request", + input: `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] GET / HTTP/1.1 200 42`, + expectedError: "does not match expected format", + }, + { + name: "non-numeric status", + input: `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET / HTTP/1.1" OK 42`, + expectedError: "invalid status code", + }, + { + name: "non-numeric bytes", + input: `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET / HTTP/1.1" 200 lots`, + expectedError: "invalid bytes value", + }, + { + name: "too few fields", + input: `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET / HTTP/1.1" 200`, + expectedError: "does not match expected format", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + target := ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return tt.input, nil + }, + } + exprFunc := parseCLF(target) + _, err := exprFunc(t.Context(), nil) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedError) + }) + } +} + +func Test_parseCLF_empty(t *testing.T) { + target := ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "", nil + }, + } + exprFunc := parseCLF(target) + _, err := exprFunc(t.Context(), nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot parse empty CLF message") +} + +func Test_parseCLF_target_error(t *testing.T) { + target := ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return nil, assert.AnError + }, + } + exprFunc := parseCLF(target) + _, err := exprFunc(t.Context(), nil) + require.Error(t, err) +} + +func Test_createParseCLFFunction(t *testing.T) { + factory := NewParseCLFFactory() + assert.Equal(t, "ParseCLF", factory.Name()) + + args := &parseCLFArguments{ + Target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return `127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET / HTTP/1.1" 200 42`, nil + }, + }, + } + + exprFunc, err := factory.CreateFunction(ottl.FunctionContext{}, args) + require.NoError(t, err) + + result, err := exprFunc(t.Context(), nil) + require.NoError(t, err) + assert.NotNil(t, result) +} + +func Test_createParseCLFFunction_wrongArgs(t *testing.T) { + factory := NewParseCLFFactory() + + _, err := factory.CreateFunction(ottl.FunctionContext{}, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "parseCLFFactory args must be of type *parseCLFArguments") +} + +func assertCLFMap(t *testing.T, m pcommon.Map, expected map[string]any) { + t.Helper() + + assert.Equal(t, len(expected), m.Len(), "field count mismatch; got map %v", m.AsRaw()) + + for k, v := range expected { + val, ok := m.Get(k) + require.True(t, ok, "key %q should exist", k) + switch want := v.(type) { + case string: + assert.Equal(t, want, val.Str(), "value for key %q mismatch", k) + case int64: + assert.Equal(t, want, val.Int(), "value for key %q mismatch", k) + default: + t.Fatalf("unexpected expected-value type for key %q: %T", k, v) + } + } +} diff --git a/processor/transformprocessor/internal/logparsingfuncs/func_parse_leef.go b/processor/transformprocessor/internal/logparsingfuncs/func_parse_leef.go new file mode 100644 index 0000000000000..49684d5702316 --- /dev/null +++ b/processor/transformprocessor/internal/logparsingfuncs/func_parse_leef.go @@ -0,0 +1,243 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logparsingfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logparsingfuncs" + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "strings" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" +) + +type parseLEEFArguments struct { + Target ottl.StringGetter[*ottllog.TransformContext] +} + +func NewParseLEEFFactory() ottl.Factory[*ottllog.TransformContext] { + return ottl.NewFactory("parse_leef", &parseLEEFArguments{}, createParseLEEFFunction) +} + +func createParseLEEFFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[*ottllog.TransformContext], error) { + args, ok := oArgs.(*parseLEEFArguments) + if !ok { + return nil, errors.New("parseLEEFFactory args must be of type *parseLEEFArguments") + } + + return parseLEEF(args.Target), nil +} + +func parseLEEF(target ottl.StringGetter[*ottllog.TransformContext]) ottl.ExprFunc[*ottllog.TransformContext] { + return func(ctx context.Context, tCtx *ottllog.TransformContext) (any, error) { + source, err := target.Get(ctx, tCtx) + if err != nil { + return nil, err + } + + if source == "" { + return nil, errors.New("cannot parse empty LEEF message") + } + + return parseLEEFMessage(source) + } +} + +func parseLEEFMessage(message string) (pcommon.Map, error) { + leefStart := strings.Index(message, "LEEF:") + if leefStart == -1 { + return pcommon.Map{}, errors.New("invalid LEEF message: 'LEEF:' not found") + } + + leefMessage := message[leefStart:] + + firstPipe := strings.Index(leefMessage, "|") + if firstPipe == -1 { + return pcommon.Map{}, errors.New("invalid LEEF message: missing pipe delimiter in header") + } + + versionField := leefMessage[:firstPipe] + version, err := parseLEEFVersion(versionField) + if err != nil { + return pcommon.Map{}, err + } + + remainder := leefMessage[firstPipe+1:] + + var header leefHeader + var attributes string + + switch version { + case "1.0": + header, attributes, err = parseLEEF1Header(remainder) + case "2.0": + header, attributes, err = parseLEEF2Header(remainder) + default: + return pcommon.Map{}, fmt.Errorf("unsupported LEEF version: %s", version) + } + + if err != nil { + return pcommon.Map{}, err + } + + header.version = version + + var parsedAttrs map[string]any + if attributes != "" { + parsedAttrs = parseLEEFAttributes(attributes, header.delimiter) + } + + return buildLEEFResult(header, parsedAttrs) +} + +type leefHeader struct { + version string + vendor string + productName string + productVersion string + eventID string + delimiter string +} + +func parseLEEFVersion(field string) (string, error) { + if !strings.HasPrefix(field, "LEEF:") { + return "", fmt.Errorf("invalid LEEF message: must start with 'LEEF:', got %q", field) + } + + version := strings.TrimPrefix(field, "LEEF:") + if version != "1.0" && version != "2.0" { + return "", fmt.Errorf("unsupported LEEF version: %s (supported: 1.0, 2.0)", version) + } + + return version, nil +} + +func parseLEEF1Header(remainder string) (leefHeader, string, error) { + parts := strings.SplitN(remainder, "|", 5) + if len(parts) < 4 { + return leefHeader{}, "", fmt.Errorf("invalid LEEF 1.0 header: expected at least 4 fields (vendor, product, version, eventID), got %d", len(parts)) + } + + header := leefHeader{ + vendor: parts[0], + productName: parts[1], + productVersion: parts[2], + eventID: parts[3], + delimiter: "\t", + } + + var attributes string + if len(parts) == 5 { + attributes = parts[4] + } + + return header, attributes, nil +} + +func parseLEEF2Header(remainder string) (leefHeader, string, error) { + parts := strings.SplitN(remainder, "|", 6) + if len(parts) < 5 { + return leefHeader{}, "", fmt.Errorf("invalid LEEF 2.0 header: expected at least 5 fields (vendor, product, version, eventID, delimiter), got %d", len(parts)) + } + + delimiterSpec := parts[4] + delimiter, err := parseDelimiter(delimiterSpec) + if err != nil { + return leefHeader{}, "", fmt.Errorf("invalid LEEF 2.0 delimiter: %w", err) + } + + header := leefHeader{ + vendor: parts[0], + productName: parts[1], + productVersion: parts[2], + eventID: parts[3], + delimiter: delimiter, + } + + var attributes string + if len(parts) == 6 { + attributes = parts[5] + } + + return header, attributes, nil +} + +func parseDelimiter(spec string) (string, error) { + if spec == "" { + return "\t", nil + } + + if strings.HasPrefix(spec, "0x") || strings.HasPrefix(spec, "0X") { + hexStr := spec[2:] + if hexStr == "" { + return "", errors.New("empty hex value") + } + decoded, err := hex.DecodeString(hexStr) + if err != nil { + return "", fmt.Errorf("invalid hex delimiter %q: %w", spec, err) + } + if len(decoded) != 1 { + return "", fmt.Errorf("hex delimiter must decode to a single byte, got %d bytes", len(decoded)) + } + return string(decoded), nil + } + + if len(spec) == 1 { + return spec, nil + } + + return spec, nil +} + +func parseLEEFAttributes(attributes, delimiter string) map[string]any { + if attributes == "" { + return make(map[string]any) + } + + result := make(map[string]any) + + for pair := range strings.SplitSeq(attributes, delimiter) { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + + eqIndex := strings.Index(pair, "=") + if eqIndex == -1 { + continue + } + + key := pair[:eqIndex] + value := pair[eqIndex+1:] + + if key == "" { + continue + } + + result[key] = value + } + + return result +} + +func buildLEEFResult(header leefHeader, attributes map[string]any) (pcommon.Map, error) { + result := pcommon.NewMap() + + result.PutStr("version", header.version) + result.PutStr("vendor", header.vendor) + result.PutStr("product_name", header.productName) + result.PutStr("product_version", header.productVersion) + result.PutStr("event_id", header.eventID) + + attrsMap := result.PutEmptyMap("attributes") + if err := attrsMap.FromRaw(attributes); err != nil { + return pcommon.Map{}, fmt.Errorf("failed to convert attributes: %w", err) + } + + return result, nil +} diff --git a/processor/transformprocessor/internal/logparsingfuncs/func_parse_leef_test.go b/processor/transformprocessor/internal/logparsingfuncs/func_parse_leef_test.go new file mode 100644 index 0000000000000..7f81d6359c5a1 --- /dev/null +++ b/processor/transformprocessor/internal/logparsingfuncs/func_parse_leef_test.go @@ -0,0 +1,917 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logparsingfuncs + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" +) + +func Test_parseLEEF(t *testing.T) { + tests := []struct { + name string + target ottl.StringGetter[*ottllog.TransformContext] + expected map[string]any + }{ + { + name: "LEEF 1.0 simple", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Microsoft|MSExchange|4.0 SP1|15345|src=10.50.1.1\tdst=2.10.20.20\tsev=5", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Microsoft", + "product_name": "MSExchange", + "product_version": "4.0 SP1", + "event_id": "15345", + "attributes": map[string]any{ + "src": "10.50.1.1", + "dst": "2.10.20.20", + "sev": "5", + }, + }, + }, + { + name: "LEEF 1.0 with many attributes", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|QRadar|QRM|1.0|NEW_PORT_DISCOVERED|src=7.5.6.6\tdst=172.50.123.1\tsev=5\tcat=anomaly\tsrcPort=3881\tdstPort=21\tusrName=joe.black", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "QRadar", + "product_name": "QRM", + "product_version": "1.0", + "event_id": "NEW_PORT_DISCOVERED", + "attributes": map[string]any{ + "src": "7.5.6.6", + "dst": "172.50.123.1", + "sev": "5", + "cat": "anomaly", + "srcPort": "3881", + "dstPort": "21", + "usrName": "joe.black", + }, + }, + }, + { + name: "LEEF 1.0 header only no attributes", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|EventID|", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "EventID", + "attributes": map[string]any{}, + }, + }, + { + name: "LEEF 1.0 no trailing pipe", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|EventID", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "EventID", + "attributes": map[string]any{}, + }, + }, + { + name: "LEEF 2.0 with caret delimiter", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Lancope|StealthWatch|1.0|41|^|src=10.0.1.8^dst=10.0.0.5^sev=5", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Lancope", + "product_name": "StealthWatch", + "product_version": "1.0", + "event_id": "41", + "attributes": map[string]any{ + "src": "10.0.1.8", + "dst": "10.0.0.5", + "sev": "5", + }, + }, + }, + { + name: "LEEF 2.0 with hex tab delimiter", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|100|0x09|key1=val1\tkey2=val2", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "100", + "attributes": map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + }, + { + name: "LEEF 2.0 with hex caret delimiter", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|100|0x5e|key1=val1^key2=val2", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "100", + "attributes": map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + }, + { + name: "LEEF 2.0 with empty delimiter defaults to tab", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|100||key1=val1\tkey2=val2", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "100", + "attributes": map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + }, + { + name: "LEEF 2.0 header only", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|EventID|^|", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "EventID", + "attributes": map[string]any{}, + }, + }, + { + name: "LEEF 2.0 no trailing pipe after delimiter", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|EventID|^", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "EventID", + "attributes": map[string]any{}, + }, + }, + { + name: "attribute value with spaces", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|Event|msg=This is a message with spaces\tsrc=1.2.3.4", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "Event", + "attributes": map[string]any{ + "msg": "This is a message with spaces", + "src": "1.2.3.4", + }, + }, + }, + { + name: "attribute value with equals sign", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|Event|url=http://example.com?foo=bar\tsrc=1.2.3.4", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "Event", + "attributes": map[string]any{ + "url": "http://example.com?foo=bar", + "src": "1.2.3.4", + }, + }, + }, + { + name: "attribute with empty value", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|Event|key1=\tkey2=value2", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "Event", + "attributes": map[string]any{ + "key1": "", + "key2": "value2", + }, + }, + }, + { + name: "LEEF 2.0 uppercase hex", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|100|0X5E|key1=val1^key2=val2", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "100", + "attributes": map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + }, + { + name: "header fields with special characters", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor-Name_123|Product.Name|1.0-beta|Event_ID_123|key=value", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor-Name_123", + "product_name": "Product.Name", + "product_version": "1.0-beta", + "event_id": "Event_ID_123", + "attributes": map[string]any{ + "key": "value", + }, + }, + }, + { + name: "real world QRadar example", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|IBM|QRadar|7.3.2|Authentication|^|src=192.168.1.100^dst=10.0.0.1^usrName=admin^cat=auth^sev=3^devTime=Jan 15 2024 10:30:45^devTimeFormat=MMM dd yyyy HH:mm:ss", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "IBM", + "product_name": "QRadar", + "product_version": "7.3.2", + "event_id": "Authentication", + "attributes": map[string]any{ + "src": "192.168.1.100", + "dst": "10.0.0.1", + "usrName": "admin", + "cat": "auth", + "sev": "3", + "devTime": "Jan 15 2024 10:30:45", + "devTimeFormat": "MMM dd yyyy HH:mm:ss", + }, + }, + }, + { + name: "network security event", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Cisco|ASA|9.8|FirewallDeny|src=10.1.1.1\tdst=192.168.1.1\tsrcPort=12345\tdstPort=443\tproto=TCP\tsev=7", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Cisco", + "product_name": "ASA", + "product_version": "9.8", + "event_id": "FirewallDeny", + "attributes": map[string]any{ + "src": "10.1.1.1", + "dst": "192.168.1.1", + "srcPort": "12345", + "dstPort": "443", + "proto": "TCP", + "sev": "7", + }, + }, + }, + { + name: "duplicate delimiter in attributes section", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|Event|^|key1=val1^^key2=val2", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "Event", + "attributes": map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + }, + { + name: "trailing delimiter in attributes", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|Event|key1=val1\tkey2=val2\t", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "Event", + "attributes": map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + }, + { + name: "leading delimiter in attributes", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|Event|\tkey1=val1\tkey2=val2", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Vendor", + "product_name": "Product", + "product_version": "1.0", + "event_id": "Event", + "attributes": map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + }, + { + name: "IBM Guardium login failure event with syslog header", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + // Full sample from https://www.ibm.com/docs/en/dsm?topic=guardium-sample-event-messages + // Includes syslog header (RFC 3164 format) + // Note: LEEF 1.0 uses tab delimiter for attributes per spec at + // https://www.ibm.com/docs/en/dsm?topic=overview-leef-event-components + return "<30>Aug 19 12:33:31 ibm.guardium.test guard_sender[4486]: LEEF:1.0|IBM|Guardium|8.0|Login failures|ruleID=20026\truleDesc=Login failures\tseverity=INFO\tdevTime=2013-8-19 6:34:41\tserverType=DB2\tclassification=\tcategory=\tdbProtocolVersion=3.0\tusrName=\tsourceProgram=DB2JCC_APPLICATION\tstart=1376908481000\tdbUser=user\tdst=10.30.2.124\tdstPort=50000\tsrc=10.30.5.152\tsrcPort=38754\tprotocol=TCP\ttype=LOGIN_FAILED\tviolationID=15\tsql=\terror=08001-XXXX:30082-01", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "IBM", + "product_name": "Guardium", + "product_version": "8.0", + "event_id": "Login failures", + "attributes": map[string]any{ + "ruleID": "20026", + "ruleDesc": "Login failures", + "severity": "INFO", + "devTime": "2013-8-19 6:34:41", + "serverType": "DB2", + "classification": "", + "category": "", + "dbProtocolVersion": "3.0", + "usrName": "", + "sourceProgram": "DB2JCC_APPLICATION", + "start": "1376908481000", + "dbUser": "user", + "dst": "10.30.2.124", + "dstPort": "50000", + "src": "10.30.5.152", + "srcPort": "38754", + "protocol": "TCP", + "type": "LOGIN_FAILED", + "violationID": "15", + "sql": "", + "error": "08001-XXXX:30082-01", + }, + }, + }, + { + name: "IBM Guardium unauthorized access event with syslog header", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + // Full sample from https://www.ibm.com/docs/en/dsm?topic=guardium-sample-event-messages + // Includes syslog header (RFC 3164 format) + // Note: LEEF 1.0 uses tab delimiter for attributes per spec at + // https://www.ibm.com/docs/en/dsm?topic=overview-leef-event-components + return "<25>Jun 11 13:47:19 ibm.guardium.test guard_sender[3432]: LEEF:1.0|IBM|Guardium|8.0|Unauthorized Users on Cardholder Objects - Alert|ruleID=159\truleDesc=Unauthorized Users on Cardholder Objects - Alert\tseverity=MED\tdevTime=2013-6-11 12:46:21\tserverType=MS SQL SERVER\tclassification=Violation\tcategory=PCI\tdbProtocolVersion=8.0\tusrName=\tsourceProgram=ABCDEF.EXE\tstart=1370965581000\tdbUser=SYSTEM\tdst=172.16.107.92\tdstPort=1433\tsrc=172.16.107.92\tsrcPort=60621\tprotocol=TCP\ttype=SQL_LANG\tviolationID=0\tsql=SELECT * FROM EPOAgentHandlerAssignment INNER JOIN EPOAgentHandlerAssignmentPriority ON (EPOAgentHandlerAssignment.AutoID = EPOAgentHandlerAssignmentPriority.AssignmentID) ORDER BY EPOAgentHandlerAssignmentPriority.Priority ASC\terror=TDS_MS-", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "IBM", + "product_name": "Guardium", + "product_version": "8.0", + "event_id": "Unauthorized Users on Cardholder Objects - Alert", + "attributes": map[string]any{ + "ruleID": "159", + "ruleDesc": "Unauthorized Users on Cardholder Objects - Alert", + "severity": "MED", + "devTime": "2013-6-11 12:46:21", + "serverType": "MS SQL SERVER", + "classification": "Violation", + "category": "PCI", + "dbProtocolVersion": "8.0", + "usrName": "", + "sourceProgram": "ABCDEF.EXE", + "start": "1370965581000", + "dbUser": "SYSTEM", + "dst": "172.16.107.92", + "dstPort": "1433", + "src": "172.16.107.92", + "srcPort": "60621", + "protocol": "TCP", + "type": "SQL_LANG", + "violationID": "0", + "sql": "SELECT * FROM EPOAgentHandlerAssignment INNER JOIN EPOAgentHandlerAssignmentPriority ON (EPOAgentHandlerAssignment.AutoID = EPOAgentHandlerAssignmentPriority.AssignmentID) ORDER BY EPOAgentHandlerAssignmentPriority.Priority ASC", + "error": "TDS_MS-", + }, + }, + }, + { + name: "syslog header RFC 5424 format", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + // RFC 5424 syslog format with structured data + return "<113>1 2019-01-18T11:07:53.520+07:00 hostname LEEF:2.0|Lancope|StealthWatch|1.0|41|^|src=10.0.1.8^dst=10.0.0.5^sev=5", nil + }, + }, + expected: map[string]any{ + "version": "2.0", + "vendor": "Lancope", + "product_name": "StealthWatch", + "product_version": "1.0", + "event_id": "41", + "attributes": map[string]any{ + "src": "10.0.1.8", + "dst": "10.0.0.5", + "sev": "5", + }, + }, + }, + { + name: "syslog header simple", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "<13>Jan 18 11:07:53 192.168.1.1 LEEF:1.0|Microsoft|MSExchange|4.0 SP1|15345|src=192.0.2.0\tdst=172.50.123.1", nil + }, + }, + expected: map[string]any{ + "version": "1.0", + "vendor": "Microsoft", + "product_name": "MSExchange", + "product_version": "4.0 SP1", + "event_id": "15345", + "attributes": map[string]any{ + "src": "192.0.2.0", + "dst": "172.50.123.1", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + exprFunc := parseLEEF(tt.target) + result, err := exprFunc(t.Context(), nil) + require.NoError(t, err) + + resultMap, ok := result.(pcommon.Map) + require.True(t, ok, "result should be pcommon.Map") + + // Check top-level fields + assertMapValue(t, resultMap, "version", tt.expected["version"]) + assertMapValue(t, resultMap, "vendor", tt.expected["vendor"]) + assertMapValue(t, resultMap, "product_name", tt.expected["product_name"]) + assertMapValue(t, resultMap, "product_version", tt.expected["product_version"]) + assertMapValue(t, resultMap, "event_id", tt.expected["event_id"]) + + // Check attributes + expectedAttrs := tt.expected["attributes"].(map[string]any) + attrsVal, ok := resultMap.Get("attributes") + require.True(t, ok, "attributes field should exist") + attrsMap := attrsVal.Map() + assert.Equal(t, len(expectedAttrs), attrsMap.Len(), "attributes count mismatch") + + for k, v := range expectedAttrs { + attrVal, ok := attrsMap.Get(k) + assert.True(t, ok, "attribute %q should exist", k) + assert.Equal(t, v, attrVal.Str(), "attribute %q value mismatch", k) + } + }) + } +} + +func Test_parseLEEF_error(t *testing.T) { + tests := []struct { + name string + target ottl.StringGetter[*ottllog.TransformContext] + expectedError string + }{ + { + name: "empty input", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "", nil + }, + }, + expectedError: "cannot parse empty LEEF message", + }, + { + name: "not a LEEF message", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "CEF:0|Vendor|Product|1.0|100|Event Name|5|src=1.2.3.4", nil + }, + }, + expectedError: "'LEEF:' not found", + }, + { + name: "unsupported LEEF version", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:3.0|Vendor|Product|1.0|EventID|key=value", nil + }, + }, + expectedError: "unsupported LEEF version: 3.0", + }, + { + name: "invalid LEEF version format", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:abc|Vendor|Product|1.0|EventID|key=value", nil + }, + }, + expectedError: "unsupported LEEF version: abc", + }, + { + name: "missing pipes in header", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|OnlyVendor", nil + }, + }, + expectedError: "invalid LEEF 1.0 header: expected at least 4 fields", + }, + { + name: "LEEF 1.0 too few header fields", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product", nil + }, + }, + expectedError: "invalid LEEF 1.0 header: expected at least 4 fields", + }, + { + name: "LEEF 2.0 too few header fields", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0", nil + }, + }, + expectedError: "invalid LEEF 2.0 header: expected at least 5 fields", + }, + { + name: "no pipe delimiter at all", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0", nil + }, + }, + expectedError: "missing pipe delimiter in header", + }, + { + name: "invalid hex delimiter - odd length", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|EventID|0x9|key=value", nil + }, + }, + expectedError: "invalid hex delimiter", + }, + { + name: "invalid hex delimiter - not hex", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|EventID|0xGG|key=value", nil + }, + }, + expectedError: "invalid hex delimiter", + }, + { + name: "invalid hex delimiter - too many bytes", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|EventID|0x0909|key=value", nil + }, + }, + expectedError: "hex delimiter must decode to a single byte", + }, + { + name: "invalid hex delimiter - empty hex", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:2.0|Vendor|Product|1.0|EventID|0x|key=value", nil + }, + }, + expectedError: "empty hex value", + }, + { + name: "plain text not LEEF", + target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "This is just plain text log message", nil + }, + }, + expectedError: "'LEEF:' not found", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + exprFunc := parseLEEF(tt.target) + _, err := exprFunc(t.Context(), nil) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedError) + }) + } +} + +func Test_parseLEEF_target_error(t *testing.T) { + target := ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return nil, assert.AnError + }, + } + exprFunc := parseLEEF(target) + _, err := exprFunc(t.Context(), nil) + require.Error(t, err) +} + +func Test_createParseLEEFFunction(t *testing.T) { + factory := NewParseLEEFFactory() + assert.Equal(t, "parse_leef", factory.Name()) + + args := &parseLEEFArguments{ + Target: ottl.StandardStringGetter[*ottllog.TransformContext]{ + Getter: func(context.Context, *ottllog.TransformContext) (any, error) { + return "LEEF:1.0|Vendor|Product|1.0|Event|key=value", nil + }, + }, + } + + exprFunc, err := factory.CreateFunction(ottl.FunctionContext{}, args) + require.NoError(t, err) + + result, err := exprFunc(t.Context(), nil) + require.NoError(t, err) + assert.NotNil(t, result) +} + +func Test_createParseLEEFFunction_wrongArgs(t *testing.T) { + factory := NewParseLEEFFactory() + + _, err := factory.CreateFunction(ottl.FunctionContext{}, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "parseLEEFFactory args must be of type *parseLEEFArguments") +} + +func Test_parseDelimiter(t *testing.T) { + tests := []struct { + name string + input string + expected string + hasError bool + }{ + { + name: "empty defaults to tab", + input: "", + expected: "\t", + }, + { + name: "single character", + input: "^", + expected: "^", + }, + { + name: "pipe character", + input: "|", + expected: "|", + }, + { + name: "hex tab", + input: "0x09", + expected: "\t", + }, + { + name: "hex caret lowercase", + input: "0x5e", + expected: "^", + }, + { + name: "hex caret uppercase", + input: "0x5E", + expected: "^", + }, + { + name: "hex with uppercase prefix", + input: "0X5e", + expected: "^", + }, + { + name: "hex space", + input: "0x20", + expected: " ", + }, + { + name: "multi-character delimiter", + input: "||", + expected: "||", + }, + { + name: "invalid hex - odd length", + input: "0x9", + hasError: true, + }, + { + name: "invalid hex - not hex chars", + input: "0xZZ", + hasError: true, + }, + { + name: "invalid hex - too many bytes", + input: "0x0909", + hasError: true, + }, + { + name: "invalid hex - empty", + input: "0x", + hasError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := parseDelimiter(tt.input) + if tt.hasError { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +} + +func Test_parseLEEFAttributes(t *testing.T) { + tests := []struct { + name string + input string + delimiter string + expected map[string]any + }{ + { + name: "simple tab delimited", + input: "key1=val1\tkey2=val2", + delimiter: "\t", + expected: map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + { + name: "caret delimited", + input: "key1=val1^key2=val2^key3=val3", + delimiter: "^", + expected: map[string]any{ + "key1": "val1", + "key2": "val2", + "key3": "val3", + }, + }, + { + name: "empty attributes", + input: "", + delimiter: "\t", + expected: map[string]any{}, + }, + { + name: "value with equals", + input: "url=http://example.com?a=b", + delimiter: "\t", + expected: map[string]any{ + "url": "http://example.com?a=b", + }, + }, + { + name: "key without value skipped", + input: "key1=val1\tkeyonly\tkey2=val2", + delimiter: "\t", + expected: map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + { + name: "empty value", + input: "key1=\tkey2=val2", + delimiter: "\t", + expected: map[string]any{ + "key1": "", + "key2": "val2", + }, + }, + { + name: "whitespace handling", + input: " key1=val1 \t key2=val2 ", + delimiter: "\t", + expected: map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + { + name: "duplicate delimiters", + input: "key1=val1^^key2=val2", + delimiter: "^", + expected: map[string]any{ + "key1": "val1", + "key2": "val2", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := parseLEEFAttributes(tt.input, tt.delimiter) + assert.Equal(t, tt.expected, result) + }) + } +} + +func assertMapValue(t *testing.T, m pcommon.Map, key string, expected any) { + t.Helper() + val, ok := m.Get(key) + require.True(t, ok, "key %q should exist", key) + assert.Equal(t, expected, val.Str(), "value for key %q mismatch", key) +} diff --git a/processor/transformprocessor/internal/logparsingfuncs/metadata.yaml b/processor/transformprocessor/internal/logparsingfuncs/metadata.yaml new file mode 100644 index 0000000000000..d0b3692f4a695 --- /dev/null +++ b/processor/transformprocessor/internal/logparsingfuncs/metadata.yaml @@ -0,0 +1,3 @@ +status: + codeowners: + active: [Caleb-Hurshman, Dylan-M] diff --git a/processor/transformprocessor/internal/logs/functions.go b/processor/transformprocessor/internal/logs/functions.go index c536f7662985b..2bcbe3c1b0ca9 100644 --- a/processor/transformprocessor/internal/logs/functions.go +++ b/processor/transformprocessor/internal/logs/functions.go @@ -4,12 +4,23 @@ package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs" import ( + "maps" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logparsingfuncs" ) func LogFunctions() map[string]ottl.Factory[*ottllog.TransformContext] { - // No logs-only functions yet. - return ottlfuncs.StandardFuncs[*ottllog.TransformContext]() + functions := ottlfuncs.StandardFuncs[*ottllog.TransformContext]() + + logFunctions := ottl.CreateFactoryMap( + logparsingfuncs.NewParseCLFFactory(), + logparsingfuncs.NewParseLEEFFactory(), + ) + + maps.Copy(functions, logFunctions) + + return functions } diff --git a/processor/transformprocessor/internal/logs/functions_test.go b/processor/transformprocessor/internal/logs/functions_test.go index c801f6f85eff2..48fb03ccede64 100644 --- a/processor/transformprocessor/internal/logs/functions_test.go +++ b/processor/transformprocessor/internal/logs/functions_test.go @@ -11,10 +11,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logparsingfuncs" ) func Test_LogFunctions(t *testing.T) { expected := ottlfuncs.StandardFuncs[*ottllog.TransformContext]() + expected["ParseCLF"] = logparsingfuncs.NewParseCLFFactory() + expected["ParseLEEF"] = logparsingfuncs.NewParseLEEFFactory() actual := LogFunctions() require.Len(t, actual, len(expected)) for k := range actual {