Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .chloggen/standalone-mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: opamp-bridge

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: OpAMP Bridge standalone mode

# One or more tracking issues related to the change
issues: [4913]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Standalone mode for OpAMP Bridge allows users to manage collector configuration from a remote
OpAMP server without the need to deploy full Otel Operator.
22 changes: 21 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,26 @@ deploy: install-gateway-api-crds set-image-controller
undeploy: set-image-controller
$(KUSTOMIZE) build config/default | kubectl delete --ignore-not-found=$(ignore-not-found) -f -

##@ Standalone OpAMP Bridge (no operator / CRDs required)

# Deploy the standalone OpAMP bridge into the current Kubernetes context.
# Does not require the operator, CRDs, or cert-manager.
.PHONY: deploy-standalone-bridge
deploy-standalone-bridge: kustomize
cd config/standalone-bridge && $(KUSTOMIZE) edit set image operator-opamp-bridge=${OPERATOROPAMPBRIDGE_IMG}
$(KUSTOMIZE) build config/standalone-bridge | kubectl apply -f -
kubectl rollout status deployment/otel-opamp-bridge-standalone -n opentelemetry-opamp-bridge --timeout=120s

# Undeploy the standalone OpAMP bridge from the current Kubernetes context.
.PHONY: undeploy-standalone-bridge
undeploy-standalone-bridge: kustomize
$(KUSTOMIZE) build config/standalone-bridge | kubectl delete --ignore-not-found=true -f -

# Build, load, and deploy the standalone bridge to a kind cluster.
# Assumes a kind cluster is already running (use start-kind first).
.PHONY: deploy-standalone-bridge-kind
deploy-standalone-bridge-kind: load-image-operator-opamp-bridge deploy-standalone-bridge

# Generates the released manifests
.PHONY: release-artifacts
release-artifacts: set-image-controller
Expand Down Expand Up @@ -427,7 +447,7 @@ e2e-multi-instrumentation: chainsaw
# OpAMPBridge CR end-to-tests
.PHONY: e2e-opampbridge
e2e-opampbridge: chainsaw
$(CHAINSAW) test --test-dir ./tests/e2e-opampbridge --report-name e2e-opampbridge
OPERATOROPAMPBRIDGE_IMG=$(OPERATOROPAMPBRIDGE_IMG) $(CHAINSAW) test --test-dir ./tests/e2e-opampbridge --report-name e2e-opampbridge

# end-to-end-test for testing pdb support
.PHONY: e2e-pdb
Expand Down
56 changes: 56 additions & 0 deletions cmd/operator-opamp-bridge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,62 @@ There are two main ways to install the OpAMP Bridge:

## Usage

### Standalone mode

Standalone mode lets the bridge manage Collector configuration stored in Kubernetes `ConfigMap` resources, without creating `OpenTelemetryCollector` CRDs. This is useful when the Collector workload is managed outside the operator, but the config still needs to be reported to and updated from an OpAMP server.

Start the bridge with `mode: standalone` in its config file, or pass `--mode=standalone`:

```yaml
endpoint: "<OPAMP_SERVER_ENDPOINT>"
mode: standalone
capabilities:
AcceptsRemoteConfig: true
ReportsEffectiveConfig: true
ReportsRemoteConfig: true
standalone:
agents:
- name: my-collector
namespace: default
type: otel-collector
config:
collector:
kind: configmap
namespace: default
name: collector-config
key: collector.yaml
```

In this mode, the bridge creates one OpAMP client connection for each entry under `standalone.agents`. Each key under an agent's `config` section is the OpAMP config file name reported to the server. The value describes the local Kubernetes resource that backs that file. In the example above, the OpAMP server sees a config file named `collector`, and the bridge maps it locally to `ConfigMap/default/collector-config`, key `collector.yaml`.

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: collector-config
namespace: default
data:
collector.yaml: |
receivers:
otlp:
protocols:
grpc:
http:
exporters:
otlphttp:
endpoint: http://example-collector:4318
service:
pipelines:
traces:
receivers: [otlp]
exporters: [otlphttp]
```


The bridge will not create or delete ConfigMaps in standalone mode. Remote config updates are only applied to the configured local resource and key.

Standalone mode needs RBAC for ConfigMaps. The repository includes a starter manifest at [`config/standalone-bridge/rbac.yaml`](../../config/standalone-bridge/rbac.yaml).

### OpAMPBridge CRD

The [OpAMPBridge](../../docs/api/opampbridges.md) CRD is used to create an OpAMP Bridge instance.
Expand Down
96 changes: 43 additions & 53 deletions cmd/operator-opamp-bridge/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"k8s.io/utils/clock"
"sigs.k8s.io/yaml"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/metrics"
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator"
Expand All @@ -29,7 +27,7 @@ import (
type Agent struct {
logger logr.Logger

appliedKeys map[kubeResourceKey]bool
appliedKeys map[string]bool
clock clock.Clock
startTime uint64
lastHash []byte
Expand Down Expand Up @@ -59,7 +57,7 @@ func NewAgent(logger logr.Logger, applier operator.ConfigApplier, cfg *config.Co
applier: applier,
proxy: p,
logger: logger,
appliedKeys: map[kubeResourceKey]bool{},
appliedKeys: map[string]bool{},
instanceId: cfg.GetInstanceId(),
agentDescription: cfg.GetDescription(),
remoteConfigEnabled: cfg.RemoteConfigEnabled(),
Expand Down Expand Up @@ -118,7 +116,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
proxiesUsed := make(map[uuid.UUID]struct{}, len(agentsByHostName))
for _, col := range cols {
key := newKubeResourceKey(col.GetNamespace(), col.GetName())
podMap, err := agent.generateCollectorHealth(agent.getCollectorSelector(col), col.GetNamespace())
podMap, err := agent.generateCollectorHealth(col.GetSelectorLabels(), col.GetNamespace())
if err != nil {
return nil, err
}
Expand All @@ -132,7 +130,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
proxiesUsed[uid] = struct{}{}
}
}
podStartTime, err := timeToUnixNanoUnsigned(col.ObjectMeta.GetCreationTimestamp().Time)
podStartTime, err := timeToUnixNanoUnsigned(col.GetCreationTimestamp())
if err != nil {
return nil, err
}
Expand All @@ -143,7 +141,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: podStartTime,
StatusTimeUnixNano: statusTime,
Status: col.Status.Scale.StatusReplicas,
Status: col.GetStatusReplicas(),
ComponentHealthMap: podMap,
Healthy: isPoolHealthy,
}
Expand All @@ -157,28 +155,6 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
return healthMap, nil
}

// getCollectorSelector destructures the collectors scale selector if present, it uses the labelmap from the operator.
func (*Agent) getCollectorSelector(col v1beta1.OpenTelemetryCollector) map[string]string {
if col.Status.Scale.Selector != "" {
selMap := map[string]string{}
for kvPair := range strings.SplitSeq(col.Status.Scale.Selector, ",") {
kv := strings.Split(kvPair, "=")
// skip malformed pairs
if len(kv) != 2 {
continue
}
selMap[kv[0]] = kv[1]
}
return selMap
}
return map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", col.GetNamespace(), col.GetName()),
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
}
}

func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) {
statusTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
Expand Down Expand Up @@ -348,15 +324,14 @@ func (agent *Agent) getEffectiveConfig(context.Context) (*protobufs.EffectiveCon
}
instanceMap := map[string]*protobufs.AgentConfigFile{}
for _, instance := range instances {
col := instance
marshaled, err := yaml.Marshal(&col)
if err != nil {
agent.logger.Error(err, "failed to marhsal config")
return nil, err
body := instance.GetEffectiveConfig()
if body == nil {
agent.logger.Error(errors.New("nil effective config"), "failed to get effective config",
"name", instance.GetName(), "namespace", instance.GetNamespace())
continue
}
mapKey := newKubeResourceKey(instance.GetNamespace(), instance.GetName())
instanceMap[mapKey.String()] = &protobufs.AgentConfigFile{
Body: marshaled,
instanceMap[instance.GetConfigMapKey()] = &protobufs.AgentConfigFile{
Body: body,
ContentType: "yaml",
}
}
Expand Down Expand Up @@ -390,11 +365,11 @@ func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) {

// applyRemoteConfig receives a remote configuration from a remote server of the following form:
//
// map[name/namespace] -> collector CRD spec
// map[resource key] -> AgentConfigFile body
//
// For every key in the received remote configuration, the agent attempts to apply it to the connected
// Kubernetes cluster. If an agent fails to apply a collector CRD, it will continue to the next entry. The agent will
// store the received configuration hash regardless of application status as per the OpAMP spec.
// For every key in the received remote configuration, the agent attempts to apply it via the configured
// applier. If an entry fails to apply, the agent continues to the next entry. The agent stores the
// received configuration hash regardless of application status, as per the OpAMP spec.
//
// INVARIANT: The caller must verify that config isn't nil _and_ the configuration has changed between calls.
func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*protobufs.RemoteConfigStatus, error) {
Expand All @@ -404,23 +379,16 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro
if key == "" || len(file.Body) == 0 {
continue
}
colKey, err := kubeResourceFromKey(key)
if err != nil {
errs = append(errs, err)
continue
}
err = agent.applier.Apply(colKey.name, colKey.namespace, file)
if err != nil {
if err := agent.applyConfigFile(key, file); err != nil {
errs = append(errs, err)
continue
}
agent.appliedKeys[colKey] = true
agent.appliedKeys[key] = true
}
// Check if anything was deleted
for collectorKey := range agent.appliedKeys {
if _, ok := config.Config.GetConfigMap()[collectorKey.String()]; !ok {
err := agent.applier.Delete(collectorKey.name, collectorKey.namespace)
if err != nil {
for key := range agent.appliedKeys {
if _, ok := config.Config.GetConfigMap()[key]; !ok {
if err := agent.deleteConfigFile(key); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -440,6 +408,28 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro
}, nil
}

func (agent *Agent) applyConfigFile(key string, file *protobufs.AgentConfigFile) error {
if agent.config.IsStandaloneMode() {
return agent.applier.Apply(key, "", file)
}
colKey, err := kubeResourceFromKey(key)
if err != nil {
return err
}
return agent.applier.Apply(colKey.name, colKey.namespace, file)
}

func (agent *Agent) deleteConfigFile(key string) error {
if agent.config.IsStandaloneMode() {
return agent.applier.Delete(key, "")
}
colKey, err := kubeResourceFromKey(key)
if err != nil {
return err
}
return agent.applier.Delete(colKey.name, colKey.namespace)
}

// Shutdown will stop the OpAMP client gracefully.
func (agent *Agent) Shutdown() {
agent.logger.V(3).Info("Agent shutting down...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func newKubeResourceKey(namespace, name string) kubeResourceKey {

func kubeResourceFromKey(key string) (kubeResourceKey, error) {
s := strings.Split(key, "/")
// We expect map keys to be of the form name/namespace
// We expect map keys to be of the form namespace/name.
if len(s) != 2 {
return kubeResourceKey{}, errors.New("invalid key")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_collectorKeyFromKey(t *testing.T) {
func TestKubeResourceFromKey(t *testing.T) {
type args struct {
key string
}
Expand Down Expand Up @@ -59,29 +59,7 @@ func Test_collectorKeyFromKey(t *testing.T) {
}
}

func Test_collectorKey_String(t *testing.T) {
type fields struct {
name string
namespace string
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "can make a key",
fields: fields{
name: "good",
namespace: "namespace",
},
want: "namespace/good",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
k := newKubeResourceKey(tt.fields.namespace, tt.fields.name)
assert.Equalf(t, tt.want, k.String(), "String()")
})
}
func TestKubeResourceKeyString(t *testing.T) {
key := newKubeResourceKey("namespace", "good")
assert.Equal(t, "namespace/good", key.String())
}
Loading