Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 242 additions & 0 deletions .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,248 @@ jobs:
emitted_phase1_metric_shapes_match_weaver_semconv --lib -- --ignored
working-directory: ./rust/otap-dataflow

# Alternative semconv drift check: runs the host_metrics_receiver end-to-end
# against an OTLP `weaver registry live-check` listener and fails if Weaver
# reports any `violation` finding. This validates the real OTLP wire output
# rather than the in-process projection.
#
# Intentionally NOT in `rust-required-status-check.needs` for now: it runs in
# parallel with `host-metrics-semconv` so the two approaches can be compared
# before deciding which one to keep.
host-metrics-weaver-live-check:
runs-on: ubuntu-latest
env:
WEAVER_VERSION: v0.23.0
# Semantic-conventions tag to validate against. Keep in sync with
# the `VERSION` constant in
# rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/semconv.rs
SEMCONV_REF: v1.41.0
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
submodules: true
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: open-telemetry/semantic-conventions
ref: ${{ env.SEMCONV_REF }}
path: semantic-conventions
- uses: dtolnay/rust-toolchain@3c5f7ea28cd621ae0bf5283f0e981fb97b8a7af9
with:
toolchain: stable
- uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1
with:
workspaces: ./rust/otap-dataflow
cache-bin: false
- name: Setup Weaver
uses: open-telemetry/weaver/.github/actions/setup-weaver@3a3b7cc98e08f84bccda45c0f20522b51d64a88c # v0.23.0
with:
version: ${{ env.WEAVER_VERSION }}
- name: Build df_engine
run: cargo build --release --bin df_engine
working-directory: ./rust/otap-dataflow
- name: Run host_metrics_receiver against weaver live-check
timeout-minutes: 5
working-directory: ${{ github.workspace }}
run: |
set -uo pipefail
mkdir -p live-check-report

echo "Weaver version: ${WEAVER_VERSION}"
echo "Semantic conventions: ${SEMCONV_REF}"
weaver --version || true

echo "::group::Start weaver live-check listener"
weaver registry live-check \
--registry semantic-conventions/model \
--input-source otlp \
--otlp-grpc-port 4317 \
--admin-port 4320 \
--inactivity-timeout 60 \
--format json \
--output live-check-report \
> live-check-report/weaver.log 2>&1 &
WEAVER_PID=$!
echo "weaver pid: $WEAVER_PID"
echo "::endgroup::"

# Wait for the OTLP listener to come up.
for i in $(seq 1 30); do
if (echo > /dev/tcp/127.0.0.1/4317) >/dev/null 2>&1; then
break
fi
sleep 1
done

echo "::group::Start df_engine"
./rust/otap-dataflow/target/release/df_engine \
--config rust/otap-dataflow/configs/host-metrics-otlp.yaml \
--num-cores 1 \
> live-check-report/df_engine.log 2>&1 &
ENGINE_PID=$!
echo "df_engine pid: $ENGINE_PID"
echo "::endgroup::"

# Allow several scrape iterations (collection_interval = 2s in the
# config), then enough time for the OTLP exporter to flush.
sleep 12

echo "::group::Stop services"
# df_engine does not implement SIGINT shutdown today. Hard-kill it
# and do NOT `wait` on it (would block indefinitely).
kill -KILL "$ENGINE_PID" 2>/dev/null || true
# Tell weaver to stop accepting and finalize the report.
curl -fsS -X POST "http://127.0.0.1:4320/stop" || true
wait "$WEAVER_PID"
WEAVER_EXIT=$?
echo "weaver exit code: $WEAVER_EXIT"
echo "::endgroup::"

echo "::group::Weaver log (tail)"
tail -n 200 live-check-report/weaver.log || true
echo "::endgroup::"

echo "::group::df_engine log (tail)"
tail -n 100 live-check-report/df_engine.log || true
echo "::endgroup::"

exit $WEAVER_EXIT
- name: Summarize live-check findings
if: always()
working-directory: ${{ github.workspace }}
run: |
set -euo pipefail
REPORT=live-check-report/live_check.json
if [ ! -s "$REPORT" ]; then
echo "No live-check report found at $REPORT" | tee -a "$GITHUB_STEP_SUMMARY"
exit 0
fi

# Compact summary -> GitHub step summary.
SEMCONV_REF="$SEMCONV_REF" WEAVER_VERSION="$WEAVER_VERSION" \
python3 - "$REPORT" <<'PY' | tee -a "$GITHUB_STEP_SUMMARY"
import json, os, sys
d = json.load(open(sys.argv[1]))
s = d.get("statistics", {}) or {}
levels = s.get("advice_level_counts", {}) or {}
violations = levels.get("violation", 0)
improvements = levels.get("improvement", 0)
informations = levels.get("information", 0)
status = "FAIL" if violations else "PASS"

print(f"## Weaver live-check (host_metrics_receiver): {status}")
print()
print(f"- Weaver: `{os.environ.get('WEAVER_VERSION', 'unknown')}`")
print(f"- Semantic conventions: `{os.environ.get('SEMCONV_REF', 'unknown')}`")
print(f"- Samples received: **{len(d.get('samples', []))}**")
by_type = s.get("total_entities_by_type", {}) or {}
if by_type:
parts = ", ".join(f"{v} {k}" for k, v in sorted(by_type.items()))
print(f"- Entities checked: **{s.get('total_entities', 0)}** ({parts})")
print()
print("| Level | Count |")
print("| --- | ---: |")
print(f"| violation | {violations} |")
print(f"| improvement | {improvements} |")
print(f"| information | {informations} |")
print()

types = s.get("advice_type_counts", {}) or {}
if types:
print("### Findings by type")
print()
print("| Count | Type |")
print("| ---: | --- |")
for t, c in sorted(types.items(), key=lambda x: -x[1]):
print(f"| {c} | {t} |")
print()

if violations:
print("> Violations were reported. See the job log and the `weaver-live-check-host-metrics` artifact for full detail.")
else:
print("> No semantic-convention violations. Improvements/informations are advisory. See the job log for example findings.")
PY

# Detailed findings -> job log only (too verbose for step summary).
# Walks every sample's `live_check_result.all_advice` and groups
# findings by (level, id), printing a few example messages per group.
echo "::group::Detailed findings (sampled)"
python3 - "$REPORT" <<'PY'
import json, sys
from collections import defaultdict

d = json.load(open(sys.argv[1]))

# (level, id) -> list of dicts {message, signal_type, signal_name, sample_path}
groups = defaultdict(list)

def walk(node, path):
"""Recursively collect findings from a nested sample tree."""
if isinstance(node, dict):
result = node.get("live_check_result") or {}
for adv in result.get("all_advice", []) or []:
groups[(adv.get("level", "?"), adv.get("id", "?"))].append({
"message": adv.get("message", ""),
"signal_type": adv.get("signal_type", ""),
"signal_name": adv.get("signal_name", ""),
"sample_path": " / ".join(path) if path else "",
})
# Recurse into the sample subtree to catch nested findings
# (e.g. attribute findings inside number_data_point).
sample_name = (
node.get("name")
or node.get("attribute", {}).get("name") if isinstance(node.get("attribute"), dict) else None
)
sub_path = path + ([sample_name] if sample_name else [])
for k, v in node.items():
if k in ("live_check_result",):
continue
walk(v, sub_path)
elif isinstance(node, list):
for item in node:
walk(item, path)

for sample in d.get("samples", []) or []:
walk(sample, [])

if not groups:
print("(no findings emitted by weaver)")
else:
LEVEL_ORDER = {"violation": 0, "improvement": 1, "information": 2}
ordered = sorted(
groups.items(),
key=lambda kv: (LEVEL_ORDER.get(kv[0][0], 99), -len(kv[1]), kv[0][1]),
)
MAX_EXAMPLES = 5
for (level, fid), items in ordered:
print(f"[{level}] {fid} ({len(items)} occurrences)")
# Deduplicate by message + signal_name so we show distinct cases.
seen = set()
shown = 0
for it in items:
key = (it["message"], it["signal_type"], it["signal_name"])
if key in seen:
continue
seen.add(key)
sig = f"{it['signal_type']}:{it['signal_name']}" if it['signal_name'] else it['signal_type']
ctx = f" [{it['sample_path']}]" if it['sample_path'] else ""
print(f" - ({sig}{ctx}) {it['message']}")
shown += 1
if shown >= MAX_EXAMPLES:
break
remaining = len(items) - shown
if remaining > 0:
print(f" ... and {remaining} more occurrence(s)")
print()
PY
echo "::endgroup::"
- name: Upload live-check report
if: always()
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: weaver-live-check-host-metrics
path: live-check-report
if-no-files-found: warn

# Required matrix combinations for deny: otap-dataflow only
deny_required:
runs-on: ubuntu-latest
Expand Down
44 changes: 44 additions & 0 deletions rust/otap-dataflow/configs/host-metrics-otlp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
version: otel_dataflow/v1
engine: { }
groups:
default:
pipelines:
main:
# host_metrics_receiver -> OTLP/gRPC exporter pipeline.
#
# Used by the `host-metrics-weaver-live-check` CI job to feed real
# OTLP traffic from the host metrics receiver into a `weaver registry
# live-check` listener. The downstream listener then asserts that
# what's emitted matches the OpenTelemetry semantic conventions.
#
# The host_metrics_receiver requires a single-core source pipeline,
# so this config is meant to be run as:
#
# df_engine --config configs/host-metrics-otlp.yaml --num-cores 1
#
# Linux-only. The downstream OTLP/gRPC endpoint defaults to the
# Weaver live-check default port (4317).

policies:
channel_capacity:
control:
node: 100
pipeline: 100
pdata: 128

nodes:
receiver:
type: urn:otel:receiver:host_metrics
config:
# Short interval so a brief CI run produces several scrapes.
collection_interval: 2s
initial_delay: 0s

exporter:
type: exporter:otlp_grpc
config:
grpc_endpoint: "http://127.0.0.1:4317"

connections:
- from: receiver
to: exporter
Loading