diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 8932b39cc7..3c04cbfccc 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -707,6 +707,248 @@ jobs: emitted_phase1_metric_shapes_match_weaver_semconv --lib -- --ignored working-directory: ./rust/otap-dataflow + # Alternative semconv drift check: runs the host_metrics_receiver end-to-end + # against an OTLP `weaver registry live-check` listener and fails if Weaver + # reports any `violation` finding. This validates the real OTLP wire output + # rather than the in-process projection. + # + # Intentionally NOT in `rust-required-status-check.needs` for now: it runs in + # parallel with `host-metrics-semconv` so the two approaches can be compared + # before deciding which one to keep. + host-metrics-weaver-live-check: + runs-on: ubuntu-latest + env: + WEAVER_VERSION: v0.23.0 + # Semantic-conventions tag to validate against. Keep in sync with + # the `VERSION` constant in + # rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/semconv.rs + SEMCONV_REF: v1.41.0 + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + submodules: true + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + repository: open-telemetry/semantic-conventions + ref: ${{ env.SEMCONV_REF }} + path: semantic-conventions + - uses: dtolnay/rust-toolchain@3c5f7ea28cd621ae0bf5283f0e981fb97b8a7af9 + with: + toolchain: stable + - uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 + with: + workspaces: ./rust/otap-dataflow + cache-bin: false + - name: Setup Weaver + uses: open-telemetry/weaver/.github/actions/setup-weaver@3a3b7cc98e08f84bccda45c0f20522b51d64a88c # v0.23.0 + with: + version: ${{ env.WEAVER_VERSION }} + - name: Build df_engine + run: cargo build --release --bin df_engine + working-directory: ./rust/otap-dataflow + - name: Run host_metrics_receiver against weaver live-check + timeout-minutes: 5 + working-directory: ${{ github.workspace }} + run: | + set -uo pipefail + mkdir -p live-check-report + + echo "Weaver version: ${WEAVER_VERSION}" + echo "Semantic conventions: ${SEMCONV_REF}" + weaver --version || true + + echo "::group::Start weaver live-check listener" + weaver registry live-check \ + --registry semantic-conventions/model \ + --input-source otlp \ + --otlp-grpc-port 4317 \ + --admin-port 4320 \ + --inactivity-timeout 60 \ + --format json \ + --output live-check-report \ + > live-check-report/weaver.log 2>&1 & + WEAVER_PID=$! + echo "weaver pid: $WEAVER_PID" + echo "::endgroup::" + + # Wait for the OTLP listener to come up. + for i in $(seq 1 30); do + if (echo > /dev/tcp/127.0.0.1/4317) >/dev/null 2>&1; then + break + fi + sleep 1 + done + + echo "::group::Start df_engine" + ./rust/otap-dataflow/target/release/df_engine \ + --config rust/otap-dataflow/configs/host-metrics-otlp.yaml \ + --num-cores 1 \ + > live-check-report/df_engine.log 2>&1 & + ENGINE_PID=$! + echo "df_engine pid: $ENGINE_PID" + echo "::endgroup::" + + # Allow several scrape iterations (collection_interval = 2s in the + # config), then enough time for the OTLP exporter to flush. + sleep 12 + + echo "::group::Stop services" + # df_engine does not implement SIGINT shutdown today. Hard-kill it + # and do NOT `wait` on it (would block indefinitely). + kill -KILL "$ENGINE_PID" 2>/dev/null || true + # Tell weaver to stop accepting and finalize the report. + curl -fsS -X POST "http://127.0.0.1:4320/stop" || true + wait "$WEAVER_PID" + WEAVER_EXIT=$? + echo "weaver exit code: $WEAVER_EXIT" + echo "::endgroup::" + + echo "::group::Weaver log (tail)" + tail -n 200 live-check-report/weaver.log || true + echo "::endgroup::" + + echo "::group::df_engine log (tail)" + tail -n 100 live-check-report/df_engine.log || true + echo "::endgroup::" + + exit $WEAVER_EXIT + - name: Summarize live-check findings + if: always() + working-directory: ${{ github.workspace }} + run: | + set -euo pipefail + REPORT=live-check-report/live_check.json + if [ ! -s "$REPORT" ]; then + echo "No live-check report found at $REPORT" | tee -a "$GITHUB_STEP_SUMMARY" + exit 0 + fi + + # Compact summary -> GitHub step summary. + SEMCONV_REF="$SEMCONV_REF" WEAVER_VERSION="$WEAVER_VERSION" \ + python3 - "$REPORT" <<'PY' | tee -a "$GITHUB_STEP_SUMMARY" + import json, os, sys + d = json.load(open(sys.argv[1])) + s = d.get("statistics", {}) or {} + levels = s.get("advice_level_counts", {}) or {} + violations = levels.get("violation", 0) + improvements = levels.get("improvement", 0) + informations = levels.get("information", 0) + status = "FAIL" if violations else "PASS" + + print(f"## Weaver live-check (host_metrics_receiver): {status}") + print() + print(f"- Weaver: `{os.environ.get('WEAVER_VERSION', 'unknown')}`") + print(f"- Semantic conventions: `{os.environ.get('SEMCONV_REF', 'unknown')}`") + print(f"- Samples received: **{len(d.get('samples', []))}**") + by_type = s.get("total_entities_by_type", {}) or {} + if by_type: + parts = ", ".join(f"{v} {k}" for k, v in sorted(by_type.items())) + print(f"- Entities checked: **{s.get('total_entities', 0)}** ({parts})") + print() + print("| Level | Count |") + print("| --- | ---: |") + print(f"| violation | {violations} |") + print(f"| improvement | {improvements} |") + print(f"| information | {informations} |") + print() + + types = s.get("advice_type_counts", {}) or {} + if types: + print("### Findings by type") + print() + print("| Count | Type |") + print("| ---: | --- |") + for t, c in sorted(types.items(), key=lambda x: -x[1]): + print(f"| {c} | {t} |") + print() + + if violations: + print("> Violations were reported. See the job log and the `weaver-live-check-host-metrics` artifact for full detail.") + else: + print("> No semantic-convention violations. Improvements/informations are advisory. See the job log for example findings.") + PY + + # Detailed findings -> job log only (too verbose for step summary). + # Walks every sample's `live_check_result.all_advice` and groups + # findings by (level, id), printing a few example messages per group. + echo "::group::Detailed findings (sampled)" + python3 - "$REPORT" <<'PY' + import json, sys + from collections import defaultdict + + d = json.load(open(sys.argv[1])) + + # (level, id) -> list of dicts {message, signal_type, signal_name, sample_path} + groups = defaultdict(list) + + def walk(node, path): + """Recursively collect findings from a nested sample tree.""" + if isinstance(node, dict): + result = node.get("live_check_result") or {} + for adv in result.get("all_advice", []) or []: + groups[(adv.get("level", "?"), adv.get("id", "?"))].append({ + "message": adv.get("message", ""), + "signal_type": adv.get("signal_type", ""), + "signal_name": adv.get("signal_name", ""), + "sample_path": " / ".join(path) if path else "", + }) + # Recurse into the sample subtree to catch nested findings + # (e.g. attribute findings inside number_data_point). + sample_name = ( + node.get("name") + or node.get("attribute", {}).get("name") if isinstance(node.get("attribute"), dict) else None + ) + sub_path = path + ([sample_name] if sample_name else []) + for k, v in node.items(): + if k in ("live_check_result",): + continue + walk(v, sub_path) + elif isinstance(node, list): + for item in node: + walk(item, path) + + for sample in d.get("samples", []) or []: + walk(sample, []) + + if not groups: + print("(no findings emitted by weaver)") + else: + LEVEL_ORDER = {"violation": 0, "improvement": 1, "information": 2} + ordered = sorted( + groups.items(), + key=lambda kv: (LEVEL_ORDER.get(kv[0][0], 99), -len(kv[1]), kv[0][1]), + ) + MAX_EXAMPLES = 5 + for (level, fid), items in ordered: + print(f"[{level}] {fid} ({len(items)} occurrences)") + # Deduplicate by message + signal_name so we show distinct cases. + seen = set() + shown = 0 + for it in items: + key = (it["message"], it["signal_type"], it["signal_name"]) + if key in seen: + continue + seen.add(key) + sig = f"{it['signal_type']}:{it['signal_name']}" if it['signal_name'] else it['signal_type'] + ctx = f" [{it['sample_path']}]" if it['sample_path'] else "" + print(f" - ({sig}{ctx}) {it['message']}") + shown += 1 + if shown >= MAX_EXAMPLES: + break + remaining = len(items) - shown + if remaining > 0: + print(f" ... and {remaining} more occurrence(s)") + print() + PY + echo "::endgroup::" + - name: Upload live-check report + if: always() + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: weaver-live-check-host-metrics + path: live-check-report + if-no-files-found: warn + # Required matrix combinations for deny: otap-dataflow only deny_required: runs-on: ubuntu-latest diff --git a/rust/otap-dataflow/configs/host-metrics-otlp.yaml b/rust/otap-dataflow/configs/host-metrics-otlp.yaml new file mode 100644 index 0000000000..5a72a90832 --- /dev/null +++ b/rust/otap-dataflow/configs/host-metrics-otlp.yaml @@ -0,0 +1,44 @@ +version: otel_dataflow/v1 +engine: { } +groups: + default: + pipelines: + main: + # host_metrics_receiver -> OTLP/gRPC exporter pipeline. + # + # Used by the `host-metrics-weaver-live-check` CI job to feed real + # OTLP traffic from the host metrics receiver into a `weaver registry + # live-check` listener. The downstream listener then asserts that + # what's emitted matches the OpenTelemetry semantic conventions. + # + # The host_metrics_receiver requires a single-core source pipeline, + # so this config is meant to be run as: + # + # df_engine --config configs/host-metrics-otlp.yaml --num-cores 1 + # + # Linux-only. The downstream OTLP/gRPC endpoint defaults to the + # Weaver live-check default port (4317). + + policies: + channel_capacity: + control: + node: 100 + pipeline: 100 + pdata: 128 + + nodes: + receiver: + type: urn:otel:receiver:host_metrics + config: + # Short interval so a brief CI run produces several scrapes. + collection_interval: 2s + initial_delay: 0s + + exporter: + type: exporter:otlp_grpc + config: + grpc_endpoint: "http://127.0.0.1:4317" + + connections: + - from: receiver + to: exporter