diff --git a/.chloggen/standalone-mode.yaml b/.chloggen/standalone-mode.yaml new file mode 100644 index 0000000000..8ca2aa9a32 --- /dev/null +++ b/.chloggen/standalone-mode.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: opamp-bridge + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: OpAMP Bridge standalone mode + +# One or more tracking issues related to the change +issues: [4913] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Standalone mode for OpAMP Bridge allows users to manage collector configuration from a remote + OpAMP server without the need to deploy full Otel Operator. diff --git a/Makefile b/Makefile index 6b7efb5e21..ea9b32dad0 100644 --- a/Makefile +++ b/Makefile @@ -333,6 +333,26 @@ deploy: install-gateway-api-crds set-image-controller undeploy: set-image-controller $(KUSTOMIZE) build config/default | kubectl delete --ignore-not-found=$(ignore-not-found) -f - +##@ Standalone OpAMP Bridge (no operator / CRDs required) + +# Deploy the standalone OpAMP bridge into the current Kubernetes context. +# Does not require the operator, CRDs, or cert-manager. +.PHONY: deploy-standalone-bridge +deploy-standalone-bridge: kustomize + cd config/standalone-bridge && $(KUSTOMIZE) edit set image operator-opamp-bridge=${OPERATOROPAMPBRIDGE_IMG} + $(KUSTOMIZE) build config/standalone-bridge | kubectl apply -f - + kubectl rollout status deployment/otel-opamp-bridge-standalone -n opentelemetry-opamp-bridge --timeout=120s + +# Undeploy the standalone OpAMP bridge from the current Kubernetes context. +.PHONY: undeploy-standalone-bridge +undeploy-standalone-bridge: kustomize + $(KUSTOMIZE) build config/standalone-bridge | kubectl delete --ignore-not-found=true -f - + +# Build, load, and deploy the standalone bridge to a kind cluster. +# Assumes a kind cluster is already running (use start-kind first). +.PHONY: deploy-standalone-bridge-kind +deploy-standalone-bridge-kind: load-image-operator-opamp-bridge deploy-standalone-bridge + # Generates the released manifests .PHONY: release-artifacts release-artifacts: set-image-controller @@ -427,7 +447,7 @@ e2e-multi-instrumentation: chainsaw # OpAMPBridge CR end-to-tests .PHONY: e2e-opampbridge e2e-opampbridge: chainsaw - $(CHAINSAW) test --test-dir ./tests/e2e-opampbridge --report-name e2e-opampbridge + OPERATOROPAMPBRIDGE_IMG=$(OPERATOROPAMPBRIDGE_IMG) $(CHAINSAW) test --test-dir ./tests/e2e-opampbridge --report-name e2e-opampbridge # end-to-end-test for testing pdb support .PHONY: e2e-pdb diff --git a/cmd/operator-opamp-bridge/README.md b/cmd/operator-opamp-bridge/README.md index 3da9986a65..4b44c152c4 100644 --- a/cmd/operator-opamp-bridge/README.md +++ b/cmd/operator-opamp-bridge/README.md @@ -20,6 +20,64 @@ There are two main ways to install the OpAMP Bridge: ## Usage +### Standalone mode + +Standalone mode lets the bridge manage Collector configuration stored in Kubernetes `ConfigMap` resources, without creating `OpenTelemetryCollector` CRDs. This is useful when the Collector workload is managed outside the operator, but the config still needs to be reported to and updated from an OpAMP server. + +Start the bridge with `mode: standalone` in its config file, or pass `--mode=standalone`: + +```yaml +endpoint: "" +mode: standalone +capabilities: + AcceptsRemoteConfig: true + ReportsEffectiveConfig: true + ReportsRemoteConfig: true +standalone: + agents: + - workloadName: my-collector + namespace: default + type: otel-collector + workloadType: deployment + config: + collector: + kind: configmap + name: collector-config + key: collector.yaml +``` + +In this mode, the bridge creates one OpAMP client connection for each entry under `standalone.agents`. Each key under an agent's `config` section is the OpAMP config file name reported to the server. The value describes the local Kubernetes resource that backs that file. Config resources are resolved in the workload namespace. In the example above, the OpAMP server sees a config file named `collector`, and the bridge maps it locally to `ConfigMap/default/collector-config`, key `collector.yaml`. + +After applying a config update, the bridge restarts the configured workload by updating the workload pod template's `kubectl.kubernetes.io/restartedAt` annotation. Supported workload types are `deployment`, `daemonset`, and `statefulset`. + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: collector-config + namespace: default +data: + collector.yaml: | + receivers: + otlp: + protocols: + grpc: + http: + exporters: + otlphttp: + endpoint: http://example-collector:4318 + service: + pipelines: + traces: + receivers: [otlp] + exporters: [otlphttp] +``` + + +The bridge will not create or delete ConfigMaps in standalone mode. Remote config updates are only applied to the configured local resource and key. + +Standalone mode needs RBAC for ConfigMaps and configured workload types. The repository includes a starter manifest at [`config/standalone-bridge/rbac.yaml`](../../config/standalone-bridge/rbac.yaml). + ### OpAMPBridge CRD The [OpAMPBridge](../../docs/api/opampbridges.md) CRD is used to create an OpAMP Bridge instance. diff --git a/cmd/operator-opamp-bridge/internal/agent/agent.go b/cmd/operator-opamp-bridge/internal/agent/agent.go index 1a8dca9a1a..d4b2ae34f2 100644 --- a/cmd/operator-opamp-bridge/internal/agent/agent.go +++ b/cmd/operator-opamp-bridge/internal/agent/agent.go @@ -17,9 +17,7 @@ import ( "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" "k8s.io/utils/clock" - "sigs.k8s.io/yaml" - "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/metrics" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator" @@ -29,7 +27,7 @@ import ( type Agent struct { logger logr.Logger - appliedKeys map[kubeResourceKey]bool + appliedKeys map[string]bool clock clock.Clock startTime uint64 lastHash []byte @@ -59,7 +57,7 @@ func NewAgent(logger logr.Logger, applier operator.ConfigApplier, cfg *config.Co applier: applier, proxy: p, logger: logger, - appliedKeys: map[kubeResourceKey]bool{}, + appliedKeys: map[string]bool{}, instanceId: cfg.GetInstanceId(), agentDescription: cfg.GetDescription(), remoteConfigEnabled: cfg.RemoteConfigEnabled(), @@ -117,8 +115,8 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone healthMap := map[string]*protobufs.ComponentHealth{} proxiesUsed := make(map[uuid.UUID]struct{}, len(agentsByHostName)) for _, col := range cols { - key := newKubeResourceKey(col.GetNamespace(), col.GetName()) - podMap, err := agent.generateCollectorHealth(agent.getCollectorSelector(col), col.GetNamespace()) + key := operator.NewKubeResourceKey(col.GetNamespace(), col.GetName()) + podMap, err := agent.generateCollectorHealth(col.GetSelectorLabels(), col.GetNamespace()) if err != nil { return nil, err } @@ -132,7 +130,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone proxiesUsed[uid] = struct{}{} } } - podStartTime, err := timeToUnixNanoUnsigned(col.ObjectMeta.GetCreationTimestamp().Time) + podStartTime, err := timeToUnixNanoUnsigned(col.GetCreationTimestamp()) if err != nil { return nil, err } @@ -143,7 +141,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone healthMap[key.String()] = &protobufs.ComponentHealth{ StartTimeUnixNano: podStartTime, StatusTimeUnixNano: statusTime, - Status: col.Status.Scale.StatusReplicas, + Status: col.GetStatusReplicas(), ComponentHealthMap: podMap, Healthy: isPoolHealthy, } @@ -157,28 +155,6 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone return healthMap, nil } -// getCollectorSelector destructures the collectors scale selector if present, it uses the labelmap from the operator. -func (*Agent) getCollectorSelector(col v1beta1.OpenTelemetryCollector) map[string]string { - if col.Status.Scale.Selector != "" { - selMap := map[string]string{} - for kvPair := range strings.SplitSeq(col.Status.Scale.Selector, ",") { - kv := strings.Split(kvPair, "=") - // skip malformed pairs - if len(kv) != 2 { - continue - } - selMap[kv[0]] = kv[1] - } - return selMap - } - return map[string]string{ - "app.kubernetes.io/managed-by": "opentelemetry-operator", - "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", col.GetNamespace(), col.GetName()), - "app.kubernetes.io/part-of": "opentelemetry", - "app.kubernetes.io/component": "opentelemetry-collector", - } -} - func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) { statusTime, err := agent.getCurrentTimeUnixNano() if err != nil { @@ -190,7 +166,7 @@ func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, na } healthMap := map[string]*protobufs.ComponentHealth{} for _, item := range pods.Items { - key := newKubeResourceKey(item.GetNamespace(), item.GetName()) + key := operator.NewKubeResourceKey(item.GetNamespace(), item.GetName()) healthy := true if item.Status.Phase != "Running" { healthy = false @@ -348,15 +324,14 @@ func (agent *Agent) getEffectiveConfig(context.Context) (*protobufs.EffectiveCon } instanceMap := map[string]*protobufs.AgentConfigFile{} for _, instance := range instances { - col := instance - marshaled, err := yaml.Marshal(&col) - if err != nil { - agent.logger.Error(err, "failed to marhsal config") - return nil, err + body := instance.GetEffectiveConfig() + if body == nil { + agent.logger.Error(errors.New("nil effective config"), "failed to get effective config", + "name", instance.GetName(), "namespace", instance.GetNamespace()) + continue } - mapKey := newKubeResourceKey(instance.GetNamespace(), instance.GetName()) - instanceMap[mapKey.String()] = &protobufs.AgentConfigFile{ - Body: marshaled, + instanceMap[instance.GetConfigMapKey()] = &protobufs.AgentConfigFile{ + Body: body, ContentType: "yaml", } } @@ -390,11 +365,11 @@ func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) { // applyRemoteConfig receives a remote configuration from a remote server of the following form: // -// map[name/namespace] -> collector CRD spec +// map[resource key] -> AgentConfigFile body // -// For every key in the received remote configuration, the agent attempts to apply it to the connected -// Kubernetes cluster. If an agent fails to apply a collector CRD, it will continue to the next entry. The agent will -// store the received configuration hash regardless of application status as per the OpAMP spec. +// For every key in the received remote configuration, the agent attempts to apply it via the configured +// applier. If an entry fails to apply, the agent continues to the next entry. The agent stores the +// received configuration hash regardless of application status, as per the OpAMP spec. // // INVARIANT: The caller must verify that config isn't nil _and_ the configuration has changed between calls. func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*protobufs.RemoteConfigStatus, error) { @@ -404,23 +379,16 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro if key == "" || len(file.Body) == 0 { continue } - colKey, err := kubeResourceFromKey(key) - if err != nil { + if err := agent.applier.Apply(key, file); err != nil { errs = append(errs, err) continue } - err = agent.applier.Apply(colKey.name, colKey.namespace, file) - if err != nil { - errs = append(errs, err) - continue - } - agent.appliedKeys[colKey] = true + agent.appliedKeys[key] = true } // Check if anything was deleted - for collectorKey := range agent.appliedKeys { - if _, ok := config.Config.GetConfigMap()[collectorKey.String()]; !ok { - err := agent.applier.Delete(collectorKey.name, collectorKey.namespace) - if err != nil { + for key := range agent.appliedKeys { + if _, ok := config.Config.GetConfigMap()[key]; !ok { + if err := agent.applier.Delete(key); err != nil { errs = append(errs, err) } } diff --git a/cmd/operator-opamp-bridge/internal/agent/kube_resource_key.go b/cmd/operator-opamp-bridge/internal/agent/kube_resource_key.go deleted file mode 100644 index 9b2ede4c88..0000000000 --- a/cmd/operator-opamp-bridge/internal/agent/kube_resource_key.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package agent - -import ( - "errors" - "fmt" - "strings" -) - -type kubeResourceKey struct { - name string - namespace string -} - -func newKubeResourceKey(namespace, name string) kubeResourceKey { - return kubeResourceKey{name: name, namespace: namespace} -} - -func kubeResourceFromKey(key string) (kubeResourceKey, error) { - s := strings.Split(key, "/") - // We expect map keys to be of the form name/namespace - if len(s) != 2 { - return kubeResourceKey{}, errors.New("invalid key") - } - return newKubeResourceKey(s[0], s[1]), nil -} - -func (k kubeResourceKey) String() string { - return fmt.Sprintf("%s/%s", k.namespace, k.name) -} diff --git a/cmd/operator-opamp-bridge/internal/config/config.go b/cmd/operator-opamp-bridge/internal/config/config.go index 7ddd65fd1b..53fb754f12 100644 --- a/cmd/operator-opamp-bridge/internal/config/config.go +++ b/cmd/operator-opamp-bridge/internal/config/config.go @@ -10,6 +10,7 @@ import ( "net/url" "os" "runtime" + "strings" "time" "github.com/go-logr/logr" @@ -18,6 +19,7 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" "github.com/spf13/pflag" "gopkg.in/yaml.v2" + yamlv3 "gopkg.in/yaml.v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" @@ -34,7 +36,9 @@ import ( ) const ( - agentType = "io.opentelemetry.operator-opamp-bridge" + agentType = "io.opentelemetry.operator-opamp-bridge" + operatorMode = "operator" + standaloneMode = "standalone" ) var ( @@ -91,6 +95,11 @@ type Config struct { HeartbeatInterval time.Duration `yaml:"heartbeatInterval,omitempty"` Name string `yaml:"name,omitempty"` AgentDescription AgentDescription `yaml:"description,omitempty"` + Standalone StandaloneConfig `yaml:"standalone,omitempty"` + + // Mode selects the operating mode: "operator" (default) uses OpenTelemetryCollector CRDs, + // "standalone" manages static Kubernetes config sources from this config. + Mode string `yaml:"mode,omitempty"` } // AgentDescription is copied from the OpAMP Extension in the collector. @@ -101,6 +110,24 @@ type AgentDescription struct { NonIdentifyingAttributes map[string]string `yaml:"non_identifying_attributes"` } +type StandaloneConfig struct { + Agents []StandaloneAgentConfig `yaml:"agents,omitempty"` +} + +type StandaloneAgentConfig struct { + WorkloadName string `yaml:"workloadName"` + Namespace string `yaml:"namespace"` + Type string `yaml:"type"` + WorkloadType string `yaml:"workloadType"` + Config map[string]StandaloneConfigEntry `yaml:"config"` +} + +type StandaloneConfigEntry struct { + Kind string `yaml:"kind"` + Name string `yaml:"name"` + Key string `yaml:"key"` +} + func NewConfig(logger logr.Logger) *Config { return &Config{ instanceId: mustGetInstanceId(), @@ -157,7 +184,10 @@ func (c *Config) GetAgentScheme() string { return uri.Scheme } -func (*Config) GetAgentType() string { +func (c *Config) GetAgentType() string { + if c.Name != opampBridgeName && c.Mode == standaloneMode { + return c.Name + } return agentType } @@ -184,6 +214,88 @@ func (c *Config) GetDescription() *protobufs.AgentDescription { } } +func (c *Config) ForStandaloneAgent(agent StandaloneAgentConfig) *Config { + agentConfig := *c + agentConfig.Name = agent.WorkloadName + agentConfig.instanceId = uuid.NewSHA1(uuid.NameSpaceURL, []byte(fmt.Sprintf("%s/%s/%s/%s", agent.Namespace, agent.WorkloadType, agent.WorkloadName, agent.Type))) + agentConfig.AgentDescription.NonIdentifyingAttributes = cloneStringMap(c.AgentDescription.NonIdentifyingAttributes) + if agentConfig.AgentDescription.NonIdentifyingAttributes == nil { + agentConfig.AgentDescription.NonIdentifyingAttributes = map[string]string{} + } + agentConfig.AgentDescription.NonIdentifyingAttributes["k8s.namespace.name"] = agent.Namespace + agentConfig.AgentDescription.NonIdentifyingAttributes["k8s.workload.name"] = agent.WorkloadName + agentConfig.AgentDescription.NonIdentifyingAttributes["k8s.workload.type"] = agent.WorkloadType + agentConfig.AgentDescription.NonIdentifyingAttributes["opentelemetry.io/agent.type"] = agent.Type + return &agentConfig +} + +func cloneStringMap(in map[string]string) map[string]string { + if in == nil { + return nil + } + out := make(map[string]string, len(in)) + for k, v := range in { + out[k] = v + } + return out +} + +func (c *Config) Validate() error { + if !c.IsStandaloneMode() { + return nil + } + if len(c.Standalone.Agents) == 0 { + return errors.New("standalone mode requires at least one configured agent") + } + agents := map[string]struct{}{} + for _, agent := range c.Standalone.Agents { + if strings.TrimSpace(agent.WorkloadName) == "" { + return errors.New("standalone agent workloadName is required") + } + if strings.TrimSpace(agent.Namespace) == "" { + return fmt.Errorf("standalone agent %q namespace is required", agent.WorkloadName) + } + if strings.TrimSpace(agent.Type) == "" { + return fmt.Errorf("standalone agent %q type is required", agent.WorkloadName) + } + if !supportedStandaloneWorkloadType(agent.WorkloadType) { + return fmt.Errorf("standalone agent %q has unsupported workloadType %q", agent.WorkloadName, agent.WorkloadType) + } + agentKey := fmt.Sprintf("%s/%s/%s", agent.Namespace, strings.ToLower(agent.WorkloadType), agent.WorkloadName) + if _, ok := agents[agentKey]; ok { + return fmt.Errorf("duplicate standalone agent workload %q", agentKey) + } + agents[agentKey] = struct{}{} + if len(agent.Config) == 0 { + return fmt.Errorf("standalone agent %q requires at least one config entry", agent.WorkloadName) + } + for remoteName, entry := range agent.Config { + if strings.TrimSpace(remoteName) == "" { + return fmt.Errorf("standalone agent %q config remote name is required", agent.WorkloadName) + } + if strings.ToLower(entry.Kind) != "configmap" { + return fmt.Errorf("standalone agent %q config %q has unsupported kind %q", agent.WorkloadName, remoteName, entry.Kind) + } + if strings.TrimSpace(entry.Name) == "" { + return fmt.Errorf("standalone agent %q config %q name is required", agent.WorkloadName, remoteName) + } + if strings.TrimSpace(entry.Key) == "" { + return fmt.Errorf("standalone agent %q config %q key is required", agent.WorkloadName, remoteName) + } + } + } + return nil +} + +func supportedStandaloneWorkloadType(workloadType string) bool { + switch strings.ToLower(workloadType) { + case "deployment", "daemonset", "statefulset": + return true + default: + return false + } +} + func (ad *AgentDescription) nonIdentifyingAttributes() []*protobufs.KeyValue { toReturn := make([]*protobufs.KeyValue, len(ad.NonIdentifyingAttributes)) i := 0 @@ -225,15 +337,25 @@ func (c *Config) RemoteConfigEnabled() bool { } func (c *Config) GetKubernetesClient() (client.Client, error) { - err := schemeBuilder.AddToScheme(scheme.Scheme) - if err != nil { - return nil, err + if c.Mode != standaloneMode { + err := schemeBuilder.AddToScheme(scheme.Scheme) + if err != nil { + return nil, err + } } return client.New(c.ClusterConfig, client.Options{ Scheme: scheme.Scheme, }) } +func (c *Config) IsStandaloneMode() bool { + return c.Mode == standaloneMode +} + +func (c *Config) GetRestConfig() *rest.Config { + return c.ClusterConfig +} + func Load(logger logr.Logger, args []string) (*Config, error) { flagSet := GetFlagSet(pflag.ExitOnError) err := flagSet.Parse(args) @@ -259,6 +381,9 @@ func Load(logger logr.Logger, args []string) (*Config, error) { if err != nil { return nil, err } + if err = cfg.Validate(); err != nil { + return nil, err + } return cfg, nil } @@ -300,6 +425,11 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error { } else if changed { target.Name = name } + if mode, changed, err := getMode(flagSet); err != nil { + return err + } else if changed { + target.Mode = mode + } return nil } @@ -309,8 +439,63 @@ func LoadFromFile(cfg *Config, configFile string) error { return err } envExpandedYaml := []byte(os.ExpandEnv(string(yamlFile))) + if err = validateNoDuplicateStandaloneConfigKeys(envExpandedYaml); err != nil { + return err + } if err = yaml.Unmarshal(envExpandedYaml, cfg); err != nil { return fmt.Errorf("error unmarshaling YAML: %w", err) } return nil } + +func validateNoDuplicateStandaloneConfigKeys(data []byte) error { + var root yamlv3.Node + if err := yamlv3.Unmarshal(data, &root); err != nil { + return fmt.Errorf("error parsing YAML: %w", err) + } + if len(root.Content) == 0 { + return nil + } + standalone := mappingValue(root.Content[0], "standalone") + if standalone == nil { + return nil + } + agents := mappingValue(standalone, "agents") + if agents == nil || agents.Kind != yamlv3.SequenceNode { + return nil + } + for i, agent := range agents.Content { + configs := mappingValue(agent, "config") + if configs == nil || configs.Kind != yamlv3.MappingNode { + continue + } + agentName := fmt.Sprintf("%d", i) + if name := mappingValue(agent, "workloadName"); name != nil && name.Kind == yamlv3.ScalarNode && name.Value != "" { + agentName = name.Value + } + seen := map[string]int{} + for j := 0; j < len(configs.Content); j += 2 { + key := configs.Content[j] + if key.Kind != yamlv3.ScalarNode { + continue + } + if firstLine, ok := seen[key.Value]; ok { + return fmt.Errorf("standalone agent %q config key %q is duplicated at line %d; first defined at line %d", agentName, key.Value, key.Line, firstLine) + } + seen[key.Value] = key.Line + } + } + return nil +} + +func mappingValue(node *yamlv3.Node, key string) *yamlv3.Node { + if node == nil || node.Kind != yamlv3.MappingNode { + return nil + } + for i := 0; i < len(node.Content); i += 2 { + if node.Content[i].Kind == yamlv3.ScalarNode && node.Content[i].Value == key { + return node.Content[i+1] + } + } + return nil +} diff --git a/cmd/operator-opamp-bridge/internal/config/config_test.go b/cmd/operator-opamp-bridge/internal/config/config_test.go index c31353279b..efa75db4c9 100644 --- a/cmd/operator-opamp-bridge/internal/config/config_test.go +++ b/cmd/operator-opamp-bridge/internal/config/config_test.go @@ -5,6 +5,8 @@ package config import ( "fmt" + "os" + "path/filepath" "testing" "time" @@ -290,6 +292,58 @@ func TestLoadFromFile(t *testing.T) { } } +func TestValidateNoDuplicateStandaloneConfigKeys(t *testing.T) { + cfg := []byte(` +mode: standalone +standalone: + agents: + - workloadName: collector + namespace: default + type: otel-collector + workloadType: deployment + config: + collector: + kind: configmap + name: collector-config + key: collector.yaml + collector: + kind: configmap + name: other-config + key: other.yaml +`) + + err := validateNoDuplicateStandaloneConfigKeys(cfg) + require.Error(t, err) + assert.ErrorContains(t, err, `standalone agent "collector" config key "collector" is duplicated`) +} + +func TestLoadFromFileRejectsDuplicateStandaloneConfigKeys(t *testing.T) { + cfg := []byte(` +mode: standalone +standalone: + agents: + - workloadName: collector + namespace: default + type: otel-collector + workloadType: deployment + config: + collector: + kind: configmap + name: collector-config + key: collector.yaml + collector: + kind: configmap + name: other-config + key: other.yaml +`) + configPath := filepath.Join(t.TempDir(), "config.yaml") + require.NoError(t, os.WriteFile(configPath, cfg, 0o600)) + + err := LoadFromFile(NewConfig(logr.Discard()), configPath) + require.Error(t, err) + assert.ErrorContains(t, err, `standalone agent "collector" config key "collector" is duplicated`) +} + func TestGetDescription(t *testing.T) { got := NewConfig(logr.Discard()) instanceId := uuid.New() diff --git a/cmd/operator-opamp-bridge/internal/config/flags.go b/cmd/operator-opamp-bridge/internal/config/flags.go index 14e01b0a16..faf641e546 100644 --- a/cmd/operator-opamp-bridge/internal/config/flags.go +++ b/cmd/operator-opamp-bridge/internal/config/flags.go @@ -24,7 +24,9 @@ const ( kubeConfigPathFlagName = "kubeconfig-path" heartbeatIntervalFlagName = "heartbeat-interval" nameFlagName = "name" + modeFlagName = "mode" defaultHeartbeatInterval = 30 * time.Second + defaultMode = operatorMode ) var defaultKubeConfigPath = filepath.Join(homedir.HomeDir(), ".kube", "config") @@ -39,6 +41,7 @@ func GetFlagSet(errorHandling pflag.ErrorHandling) *pflag.FlagSet { flagSet.String(kubeConfigPathFlagName, defaultKubeConfigPath, "absolute path to the KubeconfigPath file.") flagSet.Duration(heartbeatIntervalFlagName, defaultHeartbeatInterval, "The interval to use for sending a heartbeat. Setting it to 0 disables the heartbeat.") flagSet.String(nameFlagName, opampBridgeName, "The name of the bridge to use for querying managed collectors.") + flagSet.String(modeFlagName, defaultMode, `Operating mode: "operator" (default, uses CRDs) or "standalone" (manages ConfigMaps for Deployments/DaemonSets).`) zapFlagSet := flag.NewFlagSet("", flag.ErrorHandling(errorHandling)) zapCmdLineOpts.BindFlags(zapFlagSet) flagSet.AddGoFlagSet(zapFlagSet) @@ -65,6 +68,10 @@ func getListenAddr(flagSet *pflag.FlagSet) (value string, changed bool, err erro return getFlagValueAndChanged[string](flagSet, listenAddrFlagName) } +func getMode(flagSet *pflag.FlagSet) (value string, changed bool, err error) { + return getFlagValueAndChanged[string](flagSet, modeFlagName) +} + func getFlagValueAndChanged[T any](flagSet *pflag.FlagSet, flagName string) (value T, changed bool, err error) { var zero T if changed = flagSet.Changed(flagName); !changed { diff --git a/cmd/operator-opamp-bridge/internal/operator/client.go b/cmd/operator-opamp-bridge/internal/operator/client.go index 57313d4684..4eb6a34a56 100644 --- a/cmd/operator-opamp-bridge/internal/operator/client.go +++ b/cmd/operator-opamp-bridge/internal/operator/client.go @@ -29,17 +29,14 @@ const ( ) type ConfigApplier interface { - // Apply receives a name and namespace to apply an OpenTelemetryCollector CRD that is contained in the configmap. - Apply(name, namespace string, configmap *protobufs.AgentConfigFile) error + // Apply receives an OpAMP config key and applies the corresponding configuration. + Apply(key string, configmap *protobufs.AgentConfigFile) error - // Delete attempts to delete an OpenTelemetryCollector object given a name and namespace. - Delete(name, namespace string) error + // Delete attempts to delete the resource identified by an OpAMP config key. + Delete(key string) error - // ListInstances retrieves all OpenTelemetryCollector CRDs created by the operator-opamp-bridge agent. - ListInstances() ([]v1beta1.OpenTelemetryCollector, error) - - // GetInstance retrieves an OpenTelemetryCollector CRD given a name and namespace. - GetInstance(name, namespace string) (*v1beta1.OpenTelemetryCollector, error) + // ListInstances retrieves all collector instances managed by the bridge. + ListInstances() ([]CollectorInstance, error) // GetCollectorPods retrieves all pods that match the given collector's selector labels and namespace. GetCollectorPods(selectorLabels map[string]string, namespace string) (*v1.PodList, error) @@ -65,7 +62,12 @@ func NewClient(name string, log logr.Logger, c client.Client, componentsAllowed } } -func (c Client) Apply(name, namespace string, configmap *protobufs.AgentConfigFile) error { +func (c Client) Apply(key string, configmap *protobufs.AgentConfigFile) error { + resource, err := kubeResourceFromKey(key) + if err != nil { + return err + } + name, namespace := resource.name, resource.namespace c.log.Info("Received new config", "name", name, "namespace", namespace) if len(configmap.Body) == 0 { @@ -73,7 +75,7 @@ func (c Client) Apply(name, namespace string, configmap *protobufs.AgentConfigFi } var collector v1beta1.OpenTelemetryCollector - err := yaml.Unmarshal(configmap.Body, &collector) + err = yaml.Unmarshal(configmap.Body, &collector) if err != nil { return errors.NewBadRequest(fmt.Sprintf("failed to unmarshal config into v1beta1 API Version: %v", err)) } @@ -191,12 +193,16 @@ func (c Client) update(ctx context.Context, o, n *v1beta1.OpenTelemetryCollector return c.k8sClient.Update(ctx, n) } -func (c Client) Delete(name, namespace string) error { +func (c Client) Delete(key string) error { + resource, err := kubeResourceFromKey(key) + if err != nil { + return err + } ctx := context.Background() result := v1beta1.OpenTelemetryCollector{} - err := c.k8sClient.Get(ctx, client.ObjectKey{ - Namespace: namespace, - Name: name, + err = c.k8sClient.Get(ctx, client.ObjectKey{ + Namespace: resource.namespace, + Name: resource.name, }, &result) if err != nil { if errors.IsNotFound(err) { @@ -207,7 +213,19 @@ func (c Client) Delete(name, namespace string) error { return c.k8sClient.Delete(ctx, &result) } -func (c Client) ListInstances() ([]v1beta1.OpenTelemetryCollector, error) { +func (c Client) ListInstances() ([]CollectorInstance, error) { + collectors, err := c.listOpenTelemetryCollectors() + if err != nil { + return nil, err + } + result := make([]CollectorInstance, len(collectors)) + for i := range collectors { + result[i] = newCRDInstance(collectors[i]) + } + return result, nil +} + +func (c Client) listOpenTelemetryCollectors() ([]v1beta1.OpenTelemetryCollector, error) { ctx := context.Background() var instances []v1beta1.OpenTelemetryCollector diff --git a/cmd/operator-opamp-bridge/internal/operator/client_test.go b/cmd/operator-opamp-bridge/internal/operator/client_test.go index ca6b1fa51b..a40107c1a2 100644 --- a/cmd/operator-opamp-bridge/internal/operator/client_test.go +++ b/cmd/operator-opamp-bridge/internal/operator/client_test.go @@ -156,7 +156,7 @@ func TestClient_Apply(t *testing.T) { Body: colConfig, ContentType: "yaml", } - applyErr := c.Apply(tt.args.name, tt.args.namespace, configmap) + applyErr := c.Apply(NewKubeResourceKey(tt.args.namespace, tt.args.name).String(), configmap) if tt.wantErr { assert.Error(t, applyErr) assert.ErrorContains(t, applyErr, tt.errContains) @@ -199,7 +199,7 @@ func TestClient_ApplyUpdate(t *testing.T) { ContentType: "yaml", } // Apply a valid initial configuration - err = c.Apply(name, namespace, configmap) + err = c.Apply(NewKubeResourceKey(namespace, name).String(), configmap) require.NoError(t, err, "Should apply base config") // Confirm there are now two collector instances, reporting and managed @@ -220,7 +220,7 @@ func TestClient_ApplyUpdate(t *testing.T) { // Try updating with an invalid configuration configmap.Body = []byte("empty, invalid!") - err = c.Apply(name, namespace, configmap) + err = c.Apply(NewKubeResourceKey(namespace, name).String(), configmap) assert.Error(t, err, "Should be unable to update with invalid config") // Update successfully with a valid configuration @@ -230,7 +230,7 @@ func TestClient_ApplyUpdate(t *testing.T) { Body: newColConfig, ContentType: "yaml", } - err = c.Apply(name, namespace, newConfigMap) + err = c.Apply(NewKubeResourceKey(namespace, name).String(), newConfigMap) require.NoError(t, err, "Should be able to update collector") // Get the updated collector @@ -246,8 +246,12 @@ func TestClient_ApplyUpdate(t *testing.T) { allInstances, err = c.ListInstances() require.NoError(t, err, "Should be able to list all collectors") assert.Len(t, allInstances, 2) - assert.Contains(t, allInstances, reportingCol) - assert.Contains(t, allInstances, *updatedInstance) + instanceNames := make([]string, len(allInstances)) + for i, inst := range allInstances { + instanceNames[i] = inst.GetNamespace() + "/" + inst.GetName() + } + assert.Contains(t, instanceNames, reportingCol.GetNamespace()+"/"+reportingCol.GetName()) + assert.Contains(t, instanceNames, updatedInstance.GetNamespace()+"/"+updatedInstance.GetName()) } func TestClient_Delete(t *testing.T) { @@ -262,7 +266,7 @@ func TestClient_Delete(t *testing.T) { ContentType: "yaml", } // Apply a valid initial configuration - err = c.Apply(name, namespace, configmap) + err = c.Apply(NewKubeResourceKey(namespace, name).String(), configmap) require.NoError(t, err, "Should apply base config") // Get the newly created collector @@ -274,7 +278,7 @@ func TestClient_Delete(t *testing.T) { require.Len(t, instance.Spec.Config.Service.Pipelines, 1, "Should have a pipeline") // Delete it - err = c.Delete(name, namespace) + err = c.Delete(NewKubeResourceKey(namespace, name).String()) require.NoError(t, err, "Should be able to delete a collector") // Check there's nothing left diff --git a/cmd/operator-opamp-bridge/internal/operator/crd_instance.go b/cmd/operator-opamp-bridge/internal/operator/crd_instance.go new file mode 100644 index 0000000000..fe6cb2672e --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/operator/crd_instance.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator + +import ( + "fmt" + "strings" + "time" + + "sigs.k8s.io/yaml" + + "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" +) + +var _ CollectorInstance = CRDInstance{} + +// CRDInstance wraps an OpenTelemetryCollector CRD to implement CollectorInstance. +type CRDInstance struct { + Col v1beta1.OpenTelemetryCollector +} + +func newCRDInstance(col v1beta1.OpenTelemetryCollector) CRDInstance { + return CRDInstance{Col: col} +} + +func (c CRDInstance) GetName() string { + return c.Col.GetName() +} + +func (c CRDInstance) GetNamespace() string { + return c.Col.GetNamespace() +} + +func (c CRDInstance) GetConfigMapKey() string { + return NewKubeResourceKey(c.GetNamespace(), c.GetName()).String() +} + +func (c CRDInstance) GetCreationTimestamp() time.Time { + return c.Col.GetCreationTimestamp().Time +} + +func (c CRDInstance) GetSelectorLabels() map[string]string { + if c.Col.Status.Scale.Selector != "" { + selMap := map[string]string{} + for kvPair := range strings.SplitSeq(c.Col.Status.Scale.Selector, ",") { + kv := strings.Split(kvPair, "=") + if len(kv) != 2 { + continue + } + selMap[kv[0]] = kv[1] + } + return selMap + } + return map[string]string{ + "app.kubernetes.io/managed-by": "opentelemetry-operator", + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", c.Col.GetNamespace(), c.Col.GetName()), + "app.kubernetes.io/part-of": "opentelemetry", + "app.kubernetes.io/component": "opentelemetry-collector", + } +} + +func (c CRDInstance) GetStatusReplicas() string { + return c.Col.Status.Scale.StatusReplicas +} + +func (c CRDInstance) GetEffectiveConfig() []byte { + marshaled, err := yaml.Marshal(&c.Col) + if err != nil { + return nil + } + return marshaled +} diff --git a/cmd/operator-opamp-bridge/internal/operator/instance.go b/cmd/operator-opamp-bridge/internal/operator/instance.go new file mode 100644 index 0000000000..9dd0539663 --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/operator/instance.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator + +import ( + "time" +) + +// CollectorInstance represents a collector managed by the bridge, abstracting +// the underlying Kubernetes resource (CRD or Deployment/DaemonSet). +type CollectorInstance interface { + GetName() string + GetNamespace() string + GetConfigMapKey() string + GetCreationTimestamp() time.Time + GetSelectorLabels() map[string]string + GetStatusReplicas() string + GetEffectiveConfig() []byte +} diff --git a/cmd/operator-opamp-bridge/internal/operator/kube_resource_key.go b/cmd/operator-opamp-bridge/internal/operator/kube_resource_key.go new file mode 100644 index 0000000000..2688d1d495 --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/operator/kube_resource_key.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator + +import ( + "errors" + "fmt" + "strings" +) + +type KubeResourceKey struct { + name string + namespace string +} + +func NewKubeResourceKey(namespace, name string) KubeResourceKey { + return KubeResourceKey{name: name, namespace: namespace} +} + +func kubeResourceFromKey(key string) (KubeResourceKey, error) { + s := strings.Split(key, "/") + // We expect map keys to be of the form namespace/name. + if len(s) != 2 { + return KubeResourceKey{}, errors.New("invalid key") + } + return NewKubeResourceKey(s[0], s[1]), nil +} + +func (k KubeResourceKey) String() string { + return fmt.Sprintf("%s/%s", k.namespace, k.name) +} diff --git a/cmd/operator-opamp-bridge/internal/agent/kube_resource_key_test.go b/cmd/operator-opamp-bridge/internal/operator/kube_resource_key_test.go similarity index 58% rename from cmd/operator-opamp-bridge/internal/agent/kube_resource_key_test.go rename to cmd/operator-opamp-bridge/internal/operator/kube_resource_key_test.go index 0c31655f22..61d5098f19 100644 --- a/cmd/operator-opamp-bridge/internal/agent/kube_resource_key_test.go +++ b/cmd/operator-opamp-bridge/internal/operator/kube_resource_key_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package agent +package operator import ( "fmt" @@ -10,14 +10,14 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_collectorKeyFromKey(t *testing.T) { +func TestKubeResourceFromKey(t *testing.T) { type args struct { key string } tests := []struct { name string args args - want kubeResourceKey + want KubeResourceKey wantErr assert.ErrorAssertionFunc }{ { @@ -25,7 +25,7 @@ func Test_collectorKeyFromKey(t *testing.T) { args: args{ key: "namespace/good", }, - want: kubeResourceKey{ + want: KubeResourceKey{ name: "good", namespace: "namespace", }, @@ -36,7 +36,7 @@ func Test_collectorKeyFromKey(t *testing.T) { args: args{ key: "badnamespace", }, - want: kubeResourceKey{}, + want: KubeResourceKey{}, wantErr: assert.Error, }, { @@ -44,7 +44,7 @@ func Test_collectorKeyFromKey(t *testing.T) { args: args{ key: "too/many/slashes", }, - want: kubeResourceKey{}, + want: KubeResourceKey{}, wantErr: assert.Error, }, } @@ -59,29 +59,7 @@ func Test_collectorKeyFromKey(t *testing.T) { } } -func Test_collectorKey_String(t *testing.T) { - type fields struct { - name string - namespace string - } - tests := []struct { - name string - fields fields - want string - }{ - { - name: "can make a key", - fields: fields{ - name: "good", - namespace: "namespace", - }, - want: "namespace/good", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - k := newKubeResourceKey(tt.fields.namespace, tt.fields.name) - assert.Equalf(t, tt.want, k.String(), "String()") - }) - } +func TestKubeResourceKeyString(t *testing.T) { + key := NewKubeResourceKey("namespace", "good") + assert.Equal(t, "namespace/good", key.String()) } diff --git a/cmd/operator-opamp-bridge/internal/standalone/client.go b/cmd/operator-opamp-bridge/internal/standalone/client.go new file mode 100644 index 0000000000..97c4e126d9 --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/standalone/client.go @@ -0,0 +1,251 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package standalone + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/go-logr/logr" + "github.com/open-telemetry/opamp-go/protobufs" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/rest" + toolscache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config" + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator" +) + +const ( + standaloneConfigMapKind = "configmap" + restartAnnotation = "kubectl.kubernetes.io/restartedAt" +) + +// Client implements operator.ConfigApplier for standalone mode. +// ConfigMaps are the primary managed objects. +type Client struct { + log logr.Logger + k8sClient client.Client + restCfg *rest.Config + name string + cmCache cache.Cache + onUpdate func() +} + +// NewClient creates a standalone OpAMP bridge Client that works directly on ConfigMaps +// without the need for CRDs or the operator. +func NewClient(name string, log logr.Logger, c client.Client, restCfg *rest.Config, onUpdate func()) *Client { + return &Client{ + log: log, + k8sClient: c, + restCfg: restCfg, + name: name, + onUpdate: onUpdate, + } +} + +// Start creates an informer cache for ConfigMaps so configured agents can +// refresh their effective config when local data changes. +func (c *Client) Start(ctx context.Context) error { + ca, err := cache.New(c.restCfg, cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &v1.ConfigMap{}: {}, + }, + }) + if err != nil { + return fmt.Errorf("failed to create ConfigMap cache: %w", err) + } + + informer, err := ca.GetInformer(ctx, &v1.ConfigMap{}) + if err != nil { + return fmt.Errorf("failed to get ConfigMap informer: %w", err) + } + + handler := toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(_ any) { c.notifyUpdate() }, + UpdateFunc: func(_, _ any) { c.notifyUpdate() }, + DeleteFunc: func(_ any) { c.notifyUpdate() }, + } + if _, err = informer.AddEventHandler(handler); err != nil { + return fmt.Errorf("failed to add ConfigMap event handler: %w", err) + } + + go func() { + if err := ca.Start(ctx); err != nil { + c.log.Error(err, "ConfigMap cache stopped with error") + } + }() + + if !ca.WaitForCacheSync(ctx) { + return errors.New("timed out waiting for ConfigMap cache to sync") + } + + c.cmCache = ca + c.log.Info("ConfigMap informer cache synced") + return nil +} + +func (c *Client) notifyUpdate() { + if c.onUpdate != nil { + c.onUpdate() + } +} + +func (c *Client) getConfigMapFile(namespace string, entry config.StandaloneConfigEntry) ([]byte, error) { + cm := &v1.ConfigMap{} + if err := c.k8sClient.Get(context.Background(), client.ObjectKey{Name: entry.Name, Namespace: namespace}, cm); err != nil { + return nil, fmt.Errorf("failed to get ConfigMap %s/%s: %w", namespace, entry.Name, err) + } + body, ok := cm.Data[entry.Key] + if !ok { + return nil, fmt.Errorf("ConfigMap %s/%s does not contain key %q", namespace, entry.Name, entry.Key) + } + return []byte(body), nil +} + +func (c *Client) getConfigMapCreationTimestamp(namespace string, entry config.StandaloneConfigEntry) (v1.ConfigMap, error) { + cm := &v1.ConfigMap{} + if err := c.k8sClient.Get(context.Background(), client.ObjectKey{Name: entry.Name, Namespace: namespace}, cm); err != nil { + return v1.ConfigMap{}, fmt.Errorf("failed to get ConfigMap %s/%s: %w", namespace, entry.Name, err) + } + return *cm, nil +} + +func (c *Client) applyConfigMapFile(namespace, workloadType, workloadName string, entry config.StandaloneConfigEntry, configFile *protobufs.AgentConfigFile) error { + if len(configFile.Body) == 0 { + return errors.New("invalid config to apply: config is empty") + } + if err := validateCollectorConfigEntry(string(configFile.Body)); err != nil { + return fmt.Errorf("invalid collector config: %w", err) + } + + existing := &v1.ConfigMap{} + err := c.k8sClient.Get(context.Background(), client.ObjectKey{Name: entry.Name, Namespace: namespace}, existing) + if apierrors.IsNotFound(err) { + return fmt.Errorf("standalone mode does not support creating ConfigMap %s/%s", namespace, entry.Name) + } else if err != nil { + return fmt.Errorf("failed to get ConfigMap %s/%s: %w", namespace, entry.Name, err) + } + + if existing.Data == nil { + existing.Data = map[string]string{} + } + existing.Data[entry.Key] = string(configFile.Body) + if updateErr := c.k8sClient.Update(context.Background(), existing); updateErr != nil { + return fmt.Errorf("failed to update ConfigMap %s/%s: %w", namespace, entry.Name, updateErr) + } + c.log.Info("Updated ConfigMap key", "name", entry.Name, "namespace", namespace, "key", entry.Key) + + if err := c.triggerRollout(context.Background(), namespace, workloadType, workloadName); err != nil { + return err + } + return nil +} + +func (c *Client) triggerRollout(ctx context.Context, namespace, workloadType, workloadName string) error { + restartVal := time.Now().Format(time.RFC3339) + + switch strings.ToLower(workloadType) { + case "deployment": + deploy := &appsv1.Deployment{} + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: workloadName, Namespace: namespace}, deploy); err != nil { + return fmt.Errorf("failed to get Deployment %s/%s for rollout: %w", namespace, workloadName, err) + } + if deploy.Spec.Template.Annotations == nil { + deploy.Spec.Template.Annotations = map[string]string{} + } + deploy.Spec.Template.Annotations[restartAnnotation] = restartVal + if err := c.k8sClient.Update(ctx, deploy); err != nil { + return fmt.Errorf("failed to trigger rollout for Deployment %s/%s: %w", namespace, workloadName, err) + } + case "daemonset": + ds := &appsv1.DaemonSet{} + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: workloadName, Namespace: namespace}, ds); err != nil { + return fmt.Errorf("failed to get DaemonSet %s/%s for rollout: %w", namespace, workloadName, err) + } + if ds.Spec.Template.Annotations == nil { + ds.Spec.Template.Annotations = map[string]string{} + } + ds.Spec.Template.Annotations[restartAnnotation] = restartVal + if err := c.k8sClient.Update(ctx, ds); err != nil { + return fmt.Errorf("failed to trigger rollout for DaemonSet %s/%s: %w", namespace, workloadName, err) + } + case "statefulset": + sts := &appsv1.StatefulSet{} + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: workloadName, Namespace: namespace}, sts); err != nil { + return fmt.Errorf("failed to get StatefulSet %s/%s for rollout: %w", namespace, workloadName, err) + } + if sts.Spec.Template.Annotations == nil { + sts.Spec.Template.Annotations = map[string]string{} + } + sts.Spec.Template.Annotations[restartAnnotation] = restartVal + if err := c.k8sClient.Update(ctx, sts); err != nil { + return fmt.Errorf("failed to trigger rollout for StatefulSet %s/%s: %w", namespace, workloadName, err) + } + default: + return fmt.Errorf("unsupported workload type %q", workloadType) + } + + c.log.Info("Triggered workload rollout", "workloadType", workloadType, "name", workloadName, "namespace", namespace) + return nil +} + +func (c *Client) scopedApplier(agent config.StandaloneAgentConfig) operator.ConfigApplier { + return &scopedApplier{ + client: c, + agent: agent, + } +} + +type scopedApplier struct { + client *Client + agent config.StandaloneAgentConfig +} + +var _ operator.ConfigApplier = &scopedApplier{} + +func (s *scopedApplier) Apply(name string, configFile *protobufs.AgentConfigFile) error { + entry, ok := s.agent.Config[name] + if !ok { + return fmt.Errorf("standalone agent %q does not manage config %q", s.agent.WorkloadName, name) + } + if strings.ToLower(entry.Kind) != standaloneConfigMapKind { + return fmt.Errorf("unsupported standalone config kind %q", entry.Kind) + } + return s.client.applyConfigMapFile(s.agent.Namespace, s.agent.WorkloadType, s.agent.WorkloadName, entry, configFile) +} + +func (*scopedApplier) Delete(name string) error { + return fmt.Errorf("standalone mode does not support deleting config %s", name) +} + +func (s *scopedApplier) ListInstances() ([]operator.CollectorInstance, error) { + result := make([]operator.CollectorInstance, 0, len(s.agent.Config)) + for remoteName, entry := range s.agent.Config { + if strings.ToLower(entry.Kind) != standaloneConfigMapKind { + continue + } + body, err := s.client.getConfigMapFile(s.agent.Namespace, entry) + if err != nil { + return nil, err + } + cm, err := s.client.getConfigMapCreationTimestamp(s.agent.Namespace, entry) + if err != nil { + return nil, err + } + result = append(result, newStandaloneCollectorInstance(remoteName, "", cm.GetCreationTimestamp().Time, body)) + } + return result, nil +} + +func (*scopedApplier) GetCollectorPods(_ map[string]string, _ string) (*v1.PodList, error) { + return &v1.PodList{}, nil +} diff --git a/cmd/operator-opamp-bridge/internal/standalone/client_test.go b/cmd/operator-opamp-bridge/internal/standalone/client_test.go new file mode 100644 index 0000000000..eaba591474 --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/standalone/client_test.go @@ -0,0 +1,161 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package standalone + +import ( + "context" + "testing" + + "github.com/go-logr/logr" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config" +) + +const validCollectorConfig = `receivers: + otlp: + protocols: + grpc: +exporters: + debug: +service: + pipelines: + traces: + receivers: + - otlp + exporters: + - debug +` + +func getFakeK8sClient(t *testing.T, objs ...client.Object) client.Client { + scheme := runtime.NewScheme() + require.NoError(t, v1.AddToScheme(scheme)) + require.NoError(t, appsv1.AddToScheme(scheme)) + builder := fake.NewClientBuilder().WithScheme(scheme) + for _, obj := range objs { + builder = builder.WithObjects(obj) + } + return builder.Build() +} + +func testConfigMap() *v1.ConfigMap { + return &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "collector-config", + Namespace: "default", + }, + Data: map[string]string{ + "collector.yaml": validCollectorConfig, + "extra.yaml": "extra: true", + }, + } +} + +func testAgentConfig() config.StandaloneAgentConfig { + return config.StandaloneAgentConfig{ + WorkloadName: "standalone-collector", + Namespace: "default", + Type: "otel-collector", + WorkloadType: "deployment", + Config: map[string]config.StandaloneConfigEntry{ + "collector": { + Kind: "configmap", + Name: "collector-config", + Key: "collector.yaml", + }, + }, + } +} + +func testDeployment() *appsv1.Deployment { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "standalone-collector", + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{}, + }, + } +} + +func newTestClient(k8s client.Client) *Client { + return NewClient("test-bridge", logr.Discard(), k8s, nil, nil) +} + +func TestScopedApplierListInstancesReturnsConfiguredConfigMapKey(t *testing.T) { + cm := testConfigMap() + c := newTestClient(getFakeK8sClient(t, cm)) + + instances, err := c.scopedApplier(testAgentConfig()).ListInstances() + require.NoError(t, err) + require.Len(t, instances, 1) + + assert.Equal(t, "collector", instances[0].GetName()) + assert.Equal(t, "collector", instances[0].GetConfigMapKey()) + assert.Equal(t, validCollectorConfig, string(instances[0].GetEffectiveConfig())) +} + +func TestScopedApplierApplyUpdatesConfiguredConfigMapKeyAndRestartsWorkload(t *testing.T) { + cm := testConfigMap() + deploy := testDeployment() + k8s := getFakeK8sClient(t, cm, deploy) + c := newTestClient(k8s) + + updatedConfig := `receivers: + otlp: + protocols: + http: +exporters: + debug: +service: + pipelines: + traces: + receivers: [otlp] + exporters: [debug] +` + err := c.scopedApplier(testAgentConfig()).Apply("collector", &protobufs.AgentConfigFile{Body: []byte(updatedConfig)}) + require.NoError(t, err) + + updated := &v1.ConfigMap{} + require.NoError(t, k8s.Get(context.Background(), client.ObjectKey{Name: "collector-config", Namespace: "default"}, updated)) + assert.Equal(t, updatedConfig, updated.Data["collector.yaml"]) + assert.Equal(t, "extra: true", updated.Data["extra.yaml"]) + + updatedDeploy := &appsv1.Deployment{} + require.NoError(t, k8s.Get(context.Background(), client.ObjectKey{Name: "standalone-collector", Namespace: "default"}, updatedDeploy)) + assert.NotEmpty(t, updatedDeploy.Spec.Template.Annotations[restartAnnotation]) +} + +func TestScopedApplierApplyRejectsUnknownRemoteName(t *testing.T) { + c := newTestClient(getFakeK8sClient(t, testConfigMap())) + + err := c.scopedApplier(testAgentConfig()).Apply("unknown", &protobufs.AgentConfigFile{Body: []byte(validCollectorConfig)}) + require.Error(t, err) + assert.Contains(t, err.Error(), "does not manage config") +} + +func TestScopedApplierApplyDoesNotCreateConfigMap(t *testing.T) { + c := newTestClient(getFakeK8sClient(t)) + + err := c.scopedApplier(testAgentConfig()).Apply("collector", &protobufs.AgentConfigFile{Body: []byte(validCollectorConfig)}) + require.Error(t, err) + assert.Contains(t, err.Error(), "does not support creating ConfigMap") +} + +func TestScopedApplierDeleteUnsupported(t *testing.T) { + c := newTestClient(getFakeK8sClient(t, testConfigMap())) + + err := c.scopedApplier(testAgentConfig()).Delete("collector") + require.Error(t, err) + assert.Contains(t, err.Error(), "does not support deleting") +} diff --git a/cmd/operator-opamp-bridge/internal/standalone/configmap_instance.go b/cmd/operator-opamp-bridge/internal/standalone/configmap_instance.go new file mode 100644 index 0000000000..9d435d2058 --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/standalone/configmap_instance.go @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package standalone + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator" +) + +var _ operator.CollectorInstance = &standaloneCollectorInstance{} + +// standaloneCollectorInstance represents a bridge-managed ConfigMap as a +// CollectorInstance. name and namespace identify the ConfigMap itself. +type standaloneCollectorInstance struct { + name string + namespace string + createdAt time.Time + configBody []byte +} + +func newStandaloneCollectorInstance(name, namespace string, createdAt time.Time, configBody []byte) *standaloneCollectorInstance { + return &standaloneCollectorInstance{name: name, namespace: namespace, createdAt: createdAt, configBody: configBody} +} + +func (p *standaloneCollectorInstance) GetName() string { + return p.name +} + +func (p *standaloneCollectorInstance) GetNamespace() string { + return p.namespace +} + +func (p *standaloneCollectorInstance) GetConfigMapKey() string { + return p.GetName() +} + +func (p *standaloneCollectorInstance) GetCreationTimestamp() time.Time { + return p.createdAt +} + +// GetSelectorLabels returns an empty map. Standalone mode does not report +// individual pod health, so no pod selector is needed. +func (*standaloneCollectorInstance) GetSelectorLabels() map[string]string { + return map[string]string{} +} + +// GetStatusReplicas returns an empty string. Replica status is not tracked +// at the ConfigMap level in standalone mode. +func (*standaloneCollectorInstance) GetStatusReplicas() string { + return "" +} + +func (p *standaloneCollectorInstance) GetEffectiveConfig() []byte { + return p.configBody +} diff --git a/cmd/operator-opamp-bridge/internal/standalone/manager.go b/cmd/operator-opamp-bridge/internal/standalone/manager.go new file mode 100644 index 0000000000..571b25e5b5 --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/standalone/manager.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package standalone + +import ( + "context" + + "github.com/go-logr/logr" + "github.com/google/uuid" + opampclient "github.com/open-telemetry/opamp-go/client" + "github.com/open-telemetry/opamp-go/protobufs" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + + opampagent "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/agent" + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config" +) + +type Manager struct { + log logr.Logger + cfg *config.Config + client *Client + runtime []agentRuntime +} + +type agentRuntime struct { + name string + opampAgent *opampagent.Agent + client opampclient.OpAMPClient +} + +func NewManager(log logr.Logger, cfg *config.Config, c client.Client, restCfg *rest.Config) *Manager { + manager := &Manager{ + log: log, + cfg: cfg, + } + manager.client = NewClient(cfg.Name, log.WithName("client"), c, restCfg, manager.updateEffectiveConfig) + return manager +} + +func (m *Manager) Start(ctx context.Context) error { + if err := m.client.Start(ctx); err != nil { + return err + } + for _, configuredAgent := range m.cfg.Standalone.Agents { + agentCfg := m.cfg.ForStandaloneAgent(configuredAgent) + opampClient := agentCfg.CreateClient() + runtime := agentRuntime{ + name: configuredAgent.WorkloadName, + client: opampClient, + opampAgent: opampagent.NewAgent(m.log.WithName(configuredAgent.WorkloadName), m.client.scopedApplier(configuredAgent), agentCfg, opampClient, noopProxy{}), + } + if err := runtime.opampAgent.Start(); err != nil { + m.Shutdown() + return err + } + m.runtime = append(m.runtime, runtime) + } + return nil +} + +func (m *Manager) Shutdown() { + for _, runtime := range m.runtime { + runtime.opampAgent.Shutdown() + } +} + +func (m *Manager) updateEffectiveConfig() { + for _, runtime := range m.runtime { + if err := runtime.client.UpdateEffectiveConfig(context.Background()); err != nil { + m.log.Error(err, "failed to update effective config after ConfigMap change", "agent", runtime.name) + } + } +} + +type noopProxy struct{} + +func (noopProxy) GetAgentsByHostname() map[string]uuid.UUID { + return map[string]uuid.UUID{} +} + +func (noopProxy) GetConfigurations() map[uuid.UUID]*protobufs.EffectiveConfig { + return map[uuid.UUID]*protobufs.EffectiveConfig{} +} + +func (noopProxy) GetHealth() map[uuid.UUID]*protobufs.ComponentHealth { + return map[uuid.UUID]*protobufs.ComponentHealth{} +} + +func (noopProxy) HasUpdates() <-chan struct{} { + return make(chan struct{}) +} diff --git a/cmd/operator-opamp-bridge/internal/standalone/schema.go b/cmd/operator-opamp-bridge/internal/standalone/schema.go new file mode 100644 index 0000000000..dbaccf96ef --- /dev/null +++ b/cmd/operator-opamp-bridge/internal/standalone/schema.go @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package standalone + +import ( + "errors" + "fmt" + "slices" + "strings" + + "sigs.k8s.io/yaml" + + "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" +) + +const standaloneConfigVersion = "opentelemetry.io/opamp-bridge-standalone/v1alpha1" + +type standaloneConfig struct { + Version string `json:"version" yaml:"version"` + Name string `json:"name" yaml:"name"` + Namespace string `json:"namespace" yaml:"namespace"` + Config map[string]string `json:"config" yaml:"config"` +} + +func (c standaloneConfig) validate(name, namespace string) error { + if c.Version != standaloneConfigVersion { + return fmt.Errorf("unsupported standalone config version %q", c.Version) + } + if c.Name == "" { + return errors.New("standalone config name is required") + } + if c.Namespace == "" { + return errors.New("standalone config namespace is required") + } + if c.Name != name { + return fmt.Errorf("standalone config name %q does not match target name %q", c.Name, name) + } + if c.Namespace != namespace { + return fmt.Errorf("standalone config namespace %q does not match target namespace %q", c.Namespace, namespace) + } + if len(c.Config) == 0 { + return errors.New("standalone config data is required") + } + for key := range c.Config { + if strings.TrimSpace(key) == "" { + return errors.New("standalone config contains an empty data key") + } + } + return nil +} + +func (c standaloneConfig) validateCollectorConfig() error { + var validationErrors []string + + keys := make([]string, 0, len(c.Config)) + for key := range c.Config { + keys = append(keys, key) + } + slices.Sort(keys) + + for _, key := range keys { + if err := validateCollectorConfigEntry(c.Config[key]); err != nil { + validationErrors = append(validationErrors, fmt.Sprintf("%s: %v", key, err)) + continue + } + return nil + } + + return fmt.Errorf("no valid OpenTelemetry Collector config found in standalone config data: %s", strings.Join(validationErrors, "; ")) +} + +func validateCollectorConfigEntry(body string) error { + if strings.TrimSpace(body) == "" { + return errors.New("config value is empty") + } + + var cfg v1beta1.Config + if err := yaml.Unmarshal([]byte(body), &cfg); err != nil { + return fmt.Errorf("failed to parse collector config: %w", err) + } + + if len(cfg.Receivers.Object) == 0 { + return errors.New("collector config must define at least one receiver") + } + if len(cfg.Exporters.Object) == 0 { + return errors.New("collector config must define at least one exporter") + } + if len(cfg.Service.Pipelines) == 0 { + return errors.New("collector config must define at least one service pipeline") + } + + for pipelineName, pipeline := range cfg.Service.Pipelines { + if pipeline == nil { + return fmt.Errorf("pipeline %s is empty", pipelineName) + } + } + + return nil +} diff --git a/cmd/operator-opamp-bridge/main.go b/cmd/operator-opamp-bridge/main.go index f3602149bb..f38dccb53f 100644 --- a/cmd/operator-opamp-bridge/main.go +++ b/cmd/operator-opamp-bridge/main.go @@ -12,6 +12,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/proxy" + "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/standalone" ) func main() { @@ -22,18 +23,33 @@ func main() { l.Error(configLoadErr, "Unable to load configuration") os.Exit(1) } - l.Info("Starting the Remote Configuration service") + l.Info("Starting the Remote Configuration service", "mode", cfg.Mode) kubeClient, kubeErr := cfg.GetKubernetesClient() if kubeErr != nil { l.Error(kubeErr, "Couldn't create kubernetes client") os.Exit(1) } - operatorClient := operator.NewClient(cfg.Name, l.WithName("operator-client"), kubeClient, cfg.GetComponentsAllowed()) + + // signalCtx is cancelled on interrupt, which stops the informer goroutine. + signalCtx, cancelSignal := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancelSignal() + + if cfg.IsStandaloneMode() { + standaloneManager := standalone.NewManager(l.WithName("standalone"), cfg, kubeClient, cfg.GetRestConfig()) + if err := standaloneManager.Start(signalCtx); err != nil { + l.Error(err, "Cannot start standalone agents") + os.Exit(1) + } + <-signalCtx.Done() + standaloneManager.Shutdown() + return + } opampClient := cfg.CreateClient() + applier := operator.NewClient(cfg.Name, l.WithName("operator-client"), kubeClient, cfg.GetComponentsAllowed()) opampProxy := proxy.NewOpAMPProxy(l.WithName("server"), cfg.ListenAddr) - opampAgent := agent.NewAgent(l.WithName("agent"), operatorClient, cfg, opampClient, opampProxy) + opampAgent := agent.NewAgent(l.WithName("agent"), applier, cfg, opampClient, opampProxy) if err := opampAgent.Start(); err != nil { l.Error(err, "Cannot start OpAMP client") @@ -44,9 +60,7 @@ func main() { os.Exit(1) } - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - <-interrupt + <-signalCtx.Done() opampAgent.Shutdown() proxyStopErr := opampProxy.Stop(context.Background()) if proxyStopErr != nil { diff --git a/config/standalone-bridge/configmap.yaml b/config/standalone-bridge/configmap.yaml new file mode 100644 index 0000000000..331c7cdddd --- /dev/null +++ b/config/standalone-bridge/configmap.yaml @@ -0,0 +1,39 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: otel-opamp-bridge-standalone-config + namespace: opentelemetry-opamp-bridge +data: + config.yaml: | + mode: standalone + #endpoint: ws://opamp-server:4320/v1/opamp + endpoint: http://host.docker.internal:8080/v1/opamp + listenAddr: ":8080" + heartbeatInterval: 30s + name: opamp-bridge-standalone + capabilities: + AcceptsRemoteConfig: true + ReportsEffectiveConfig: true + ReportsHealth: true + ReportsOwnMetrics: true + ReportsRemoteConfig: true + standalone: + agents: + - workloadName: otel-collector + workloadType: deployment + namespace: opentelemetry-opamp-bridge + type: otel-collector + config: + collector: + kind: configmap + name: otel-collector-conf + key: otel-collector-config + - workloadName: otel-agent + namespace: opentelemetry-opamp-bridge + type: otel-agent + workloadType: daemonset + config: + agent: + kind: configmap + name: otel-agent-conf + key: otel-agent-config diff --git a/config/standalone-bridge/deployment.yaml b/config/standalone-bridge/deployment.yaml new file mode 100644 index 0000000000..20b2eec230 --- /dev/null +++ b/config/standalone-bridge/deployment.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: otel-opamp-bridge-standalone + namespace: opentelemetry-opamp-bridge + labels: + app.kubernetes.io/name: otel-opamp-bridge-standalone + app.kubernetes.io/component: opamp-bridge +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: otel-opamp-bridge-standalone + template: + metadata: + labels: + app.kubernetes.io/name: otel-opamp-bridge-standalone + app.kubernetes.io/component: opamp-bridge + spec: + serviceAccountName: otel-opamp-bridge-standalone + containers: + - name: bridge + image: operator-opamp-bridge:latest + args: + - --config-file=/conf/config.yaml + - --mode=standalone + ports: + - name: opamp + containerPort: 8080 + protocol: TCP + volumeMounts: + - name: config + mountPath: /conf + readOnly: true + livenessProbe: + tcpSocket: + port: opamp + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 200m + memory: 128Mi + volumes: + - name: config + configMap: + name: otel-opamp-bridge-standalone-config + diff --git a/config/standalone-bridge/kustomization.yaml b/config/standalone-bridge/kustomization.yaml new file mode 100644 index 0000000000..a5f7cbb91d --- /dev/null +++ b/config/standalone-bridge/kustomization.yaml @@ -0,0 +1,16 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: opentelemetry-opamp-bridge + +resources: +- namespace.yaml +- serviceaccount.yaml +- rbac.yaml +- configmap.yaml +- deployment.yaml + +images: +- name: operator-opamp-bridge + newName: ghcr.io/open-telemetry/opentelemetry-operator/operator-opamp-bridge + newTag: v0.150.0-28-gb5704446 diff --git a/config/standalone-bridge/namespace.yaml b/config/standalone-bridge/namespace.yaml new file mode 100644 index 0000000000..bc25d9b580 --- /dev/null +++ b/config/standalone-bridge/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: opentelemetry-opamp-bridge diff --git a/config/standalone-bridge/rbac.yaml b/config/standalone-bridge/rbac.yaml new file mode 100644 index 0000000000..aaadb0ad56 --- /dev/null +++ b/config/standalone-bridge/rbac.yaml @@ -0,0 +1,26 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: otel-opamp-bridge-standalone +rules: + # ConfigMaps: the bridge watches, creates, updates, and deletes these to push collector config + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "list", "watch", "create", "update", "delete"] + # Deployments + DaemonSets + StatefulSets: the bridge updates pod template annotations to trigger rollouts + - apiGroups: ["apps"] + resources: ["deployments", "daemonsets", "statefulsets"] + verbs: ["get", "update"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: otel-opamp-bridge-standalone +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: otel-opamp-bridge-standalone +subjects: + - kind: ServiceAccount + name: otel-opamp-bridge-standalone + namespace: opentelemetry-opamp-bridge diff --git a/config/standalone-bridge/serviceaccount.yaml b/config/standalone-bridge/serviceaccount.yaml new file mode 100644 index 0000000000..0c29ca6580 --- /dev/null +++ b/config/standalone-bridge/serviceaccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: otel-opamp-bridge-standalone + namespace: opentelemetry-opamp-bridge diff --git a/go.mod b/go.mod index 7817e04890..6eca81751a 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( go.uber.org/zap v1.28.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.5.2 k8s.io/api v0.35.4 k8s.io/apiextensions-apiserver v0.35.4 @@ -286,7 +287,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.4 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiserver v0.35.4 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.34.0 // indirect sigs.k8s.io/randfill v1.0.0 // indirect diff --git a/tests/e2e-opampbridge/standalone/00-assert.yaml b/tests/e2e-opampbridge/standalone/00-assert.yaml new file mode 100644 index 0000000000..da9755c5a4 --- /dev/null +++ b/tests/e2e-opampbridge/standalone/00-assert.yaml @@ -0,0 +1,7 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: e2e-test-app-bridge-server +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/e2e-opampbridge/standalone/00-install.yaml b/tests/e2e-opampbridge/standalone/00-install.yaml new file mode 100644 index 0000000000..7ad32726e7 --- /dev/null +++ b/tests/e2e-opampbridge/standalone/00-install.yaml @@ -0,0 +1,44 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: e2e-test-app-bridge-server +spec: + replicas: 1 + selector: + matchLabels: + app: e2e-test-app-bridge-server + template: + metadata: + labels: + app: e2e-test-app-bridge-server + spec: + containers: + - name: e2e-test-app-bridge-server + image: ghcr.io/open-telemetry/opentelemetry-operator/e2e-test-app-bridge-server:ve2e + ports: + - containerPort: 4320 + - containerPort: 4321 + resources: + limits: + memory: "128Mi" + cpu: "250m" + requests: + memory: "64Mi" + cpu: "100m" +--- +apiVersion: v1 +kind: Service +metadata: + name: e2e-test-app-bridge-server +spec: + selector: + app: e2e-test-app-bridge-server + ports: + - protocol: TCP + port: 4320 + targetPort: 4320 + name: "opamp" + - protocol: TCP + port: 4321 + targetPort: 4321 + name: "http" diff --git a/tests/e2e-opampbridge/standalone/01-assert.yaml b/tests/e2e-opampbridge/standalone/01-assert.yaml new file mode 100644 index 0000000000..4e3ed686cb --- /dev/null +++ b/tests/e2e-opampbridge/standalone/01-assert.yaml @@ -0,0 +1,28 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: otel-opamp-bridge-standalone +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: standalone-collector + labels: + opentelemetry.io/managed-by: opamp-bridge-standalone +data: + collector.yaml: | + receivers: + otlp: + protocols: + grpc: {} + http: {} + exporters: + debug: {} + service: + pipelines: + traces: + receivers: [otlp] + exporters: [debug] diff --git a/tests/e2e-opampbridge/standalone/01-install.yaml b/tests/e2e-opampbridge/standalone/01-install.yaml new file mode 100644 index 0000000000..a1566ee8fc --- /dev/null +++ b/tests/e2e-opampbridge/standalone/01-install.yaml @@ -0,0 +1,141 @@ +apiVersion: v1 +kind: ServiceAccount +automountServiceAccountToken: true +metadata: + name: otel-opamp-bridge-standalone +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: otel-opamp-bridge-standalone +rules: + - apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch + - create + - update + - delete + - apiGroups: + - apps + resources: + - deployments + - daemonsets + - statefulsets + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: otel-opamp-bridge-standalone +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: otel-opamp-bridge-standalone +subjects: + - kind: ServiceAccount + name: otel-opamp-bridge-standalone + namespace: ($namespace) +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: otel-opamp-bridge-standalone-config +data: + config.yaml: | + mode: standalone + endpoint: ws://e2e-test-app-bridge-server:4320/v1/opamp + listenAddr: ":8080" + heartbeatInterval: 1s + name: opamp-bridge-standalone + capabilities: + AcceptsRemoteConfig: true + ReportsEffectiveConfig: true + ReportsRemoteConfig: true + ReportsHealth: true + standalone: + agents: + - workloadName: standalone-collector + namespace: ($namespace) + type: otel-collector + workloadType: deployment + config: + collector: + kind: configmap + name: standalone-collector + key: collector.yaml +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: otel-opamp-bridge-standalone + labels: + app.kubernetes.io/name: otel-opamp-bridge-standalone + app.kubernetes.io/component: opamp-bridge +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: otel-opamp-bridge-standalone + template: + metadata: + labels: + app.kubernetes.io/name: otel-opamp-bridge-standalone + app.kubernetes.io/component: opamp-bridge + spec: + serviceAccountName: otel-opamp-bridge-standalone + containers: + - name: bridge + image: ($operatorOpampBridgeImage) + args: + - --config-file=/conf/config.yaml + - --mode=standalone + ports: + - name: opamp + containerPort: 8080 + protocol: TCP + volumeMounts: + - name: config + mountPath: /conf + readOnly: true + livenessProbe: + tcpSocket: + port: opamp + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 200m + memory: 128Mi + volumes: + - name: config + configMap: + name: otel-opamp-bridge-standalone-config +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: standalone-collector +data: + collector.yaml: | + receivers: + otlp: + protocols: + grpc: {} + http: {} + exporters: + debug: {} + service: + pipelines: + traces: + receivers: [otlp] + exporters: [debug] diff --git a/tests/e2e-opampbridge/standalone/chainsaw-test.yaml b/tests/e2e-opampbridge/standalone/chainsaw-test.yaml new file mode 100644 index 0000000000..c32f4c8b9e --- /dev/null +++ b/tests/e2e-opampbridge/standalone/chainsaw-test.yaml @@ -0,0 +1,112 @@ +apiVersion: chainsaw.kyverno.io/v1alpha1 +kind: Test +metadata: + name: opampbridge-standalone +spec: + steps: + - catch: + - podLogs: + selector: app=e2e-test-app-bridge-server + name: install fake OpAMP server + try: + - apply: + file: 00-install.yaml + - assert: + file: 00-assert.yaml + - catch: + - podLogs: + selector: app.kubernetes.io/name=otel-opamp-bridge-standalone + name: install standalone bridge + try: + - apply: + bindings: + - name: operatorOpampBridgeImage + value: (env('OPERATOROPAMPBRIDGE_IMG')) + file: 01-install.yaml + template: true + - assert: + file: 01-assert.yaml + - name: wait for standalone bridge heartbeat + try: + - sleep: + duration: 5s + - name: verify standalone effective config report + try: + - script: + content: | + #!/bin/bash + kubectl get --raw /api/v1/namespaces/$NAMESPACE/services/e2e-test-app-bridge-server:4321/proxy/agents + outputs: + - name: result + value: (json_parse($stdout)) + - assert: + resource: + (length(values($result))): 1 + (values($result)[0].status.sequence_num >= `2`): true + (values($result)[0].status.health.healthy): true + (contains(keys(values($result)[0].status.health.component_health_map), 'collector')): true + (contains(keys(values($result)[0].effective_config), 'collector')): true + (contains(keys(values($result)[0].status.effective_config.config_map.config_map), 'collector')): true + - name: send remote config to standalone bridge + try: + - script: + content: | + #!/bin/bash + set -eu + + remote_config=$(cat < `0`): true + (contains(keys(values($result)[0].effective_config), 'collector')): true + (contains(keys(values($result)[0].status.effective_config.config_map.config_map), 'collector')): true diff --git a/tests/test-e2e-apps/bridge-server/data/agent.go b/tests/test-e2e-apps/bridge-server/data/agent.go index 40f090fd8d..b5adae76e1 100644 --- a/tests/test-e2e-apps/bridge-server/data/agent.go +++ b/tests/test-e2e-apps/bridge-server/data/agent.go @@ -293,6 +293,13 @@ func (agent *Agent) SetCustomConfig( ) { agent.mux.Lock() + if agent.CustomInstanceConfig == nil { + agent.CustomInstanceConfig = map[string]string{} + } + if agent.EffectiveConfig == nil { + agent.EffectiveConfig = map[string]string{} + } + for key, file := range config.GetConfigMap() { agent.CustomInstanceConfig[key] = string(file.Body) agent.EffectiveConfig[key] = string(file.Body) diff --git a/tests/test-e2e-apps/bridge-server/opampsrv/opampsrv.go b/tests/test-e2e-apps/bridge-server/opampsrv/opampsrv.go index 3a574654ca..c946d9e684 100644 --- a/tests/test-e2e-apps/bridge-server/opampsrv/opampsrv.go +++ b/tests/test-e2e-apps/bridge-server/opampsrv/opampsrv.go @@ -6,10 +6,12 @@ package opampsrv import ( "context" "encoding/json" + "errors" "log" "net/http" "os" "regexp" + "time" "github.com/google/uuid" "github.com/oklog/ulid/v2" @@ -28,6 +30,11 @@ type Server struct { httpServer *http.Server } +type remoteConfigRequest struct { + Config map[string]string `json:"config"` + ContentType string `json:"content_type,omitempty"` +} + func NewServer(agents *data.Agents) *Server { logger := &Logger{ log.New( @@ -68,6 +75,7 @@ func (srv *Server) Start() { mux := http.NewServeMux() mux.HandleFunc("/agents", srv.getAgents) + mux.HandleFunc("/agents/push-config-to-agent", srv.pushConfigToAgent) mux.HandleFunc("/agents/", srv.getAgentById) srv.httpServer = &http.Server{ Addr: "0.0.0.0:4321", @@ -169,3 +177,66 @@ func (srv *Server) getAgentById(writer http.ResponseWriter, request *http.Reques } writer.Write(marshaled) } + +func (srv *Server) pushConfigToAgent(writer http.ResponseWriter, request *http.Request) { + if request.Method != http.MethodPost { + http.Error(writer, "method not allowed", http.StatusMethodNotAllowed) + return + } + + config, err := parseRemoteConfigRequest(request) + if err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + allAgents := srv.agents.GetAllAgentsReadonlyClone() + if len(allAgents) != 1 { + http.Error(writer, "expected exactly one connected agent", http.StatusConflict) + return + } + + var agentId data.InstanceId + for id := range allAgents { + agentId = id + } + + statusUpdated := make(chan struct{}, 1) + srv.agents.SetCustomConfigForAgent(agentId, config, statusUpdated) + + select { + case <-statusUpdated: + case <-request.Context().Done(): + return + case <-time.After(30 * time.Second): + http.Error(writer, "timed out waiting for agent status update", http.StatusGatewayTimeout) + return + } + + writer.WriteHeader(http.StatusAccepted) +} + +func parseRemoteConfigRequest(request *http.Request) (*protobufs.AgentConfigMap, error) { + var req remoteConfigRequest + if err := json.NewDecoder(request.Body).Decode(&req); err != nil { + return nil, err + } + if len(req.Config) == 0 { + return nil, errors.New("config must contain at least one entry") + } + + configMap := make(map[string]*protobufs.AgentConfigFile, len(req.Config)) + for key, body := range req.Config { + if key == "" { + return nil, errors.New("config keys must be non-empty") + } + configMap[key] = &protobufs.AgentConfigFile{ + Body: []byte(body), + ContentType: req.ContentType, + } + } + + return &protobufs.AgentConfigMap{ + ConfigMap: configMap, + }, nil +}