diff --git a/Makefile b/Makefile index 3862e5525bfd7..2a61b8028c3d9 100644 --- a/Makefile +++ b/Makefile @@ -83,6 +83,9 @@ CGO_ENABLED=0 export CGO_ENABLED BUILDFLAGS=-trimpath -buildvcs=false +ifndef DEBUGGABLE +NODEUP_GCFLAGS?=-gcflags=all=-l +endif # Go exports: LDFLAGS := -ldflags=all= @@ -210,7 +213,7 @@ crossbuild: crossbuild-kops-linux-amd64 crossbuild-kops-linux-arm64 crossbuild-k .PHONY: nodeup-amd64 nodeup-arm64 nodeup-amd64 nodeup-arm64: nodeup-%: mkdir -p ${DIST}/linux/$* - GOOS=linux GOARCH=$* go build ${GCFLAGS} ${BUILDFLAGS} ${EXTRA_BUILDFLAGS} -o ${DIST}/linux/$*/nodeup ${LDFLAGS}"${EXTRA_LDFLAGS} -X k8s.io/kops.Version=${VERSION} -X k8s.io/kops.GitVersion=${GITSHA}" k8s.io/kops/cmd/nodeup + GOOS=linux GOARCH=$* go build ${GCFLAGS} ${NODEUP_GCFLAGS} ${BUILDFLAGS} ${EXTRA_BUILDFLAGS} -o ${DIST}/linux/$*/nodeup ${LDFLAGS}"${EXTRA_LDFLAGS} -X k8s.io/kops.Version=${VERSION} -X k8s.io/kops.GitVersion=${GITSHA}" k8s.io/kops/cmd/nodeup .PHONY: nodeup nodeup: nodeup-amd64 @@ -295,7 +298,7 @@ push-aws-run-amd64 push-aws-run-arm64: push-aws-run-%: push-% .PHONY: ${NODEUP} ${NODEUP}: - go build ${GCFLAGS} ${BUILDFLAGS} ${EXTRA_BUILDFLAGS} ${LDFLAGS}"${EXTRA_LDFLAGS} -X k8s.io/kops.Version=${VERSION} -X k8s.io/kops.GitVersion=${GITSHA}" -o $@ k8s.io/kops/cmd/nodeup + go build ${GCFLAGS} ${NODEUP_GCFLAGS} ${BUILDFLAGS} ${EXTRA_BUILDFLAGS} ${LDFLAGS}"${EXTRA_LDFLAGS} -X k8s.io/kops.Version=${VERSION} -X k8s.io/kops.GitVersion=${GITSHA}" -o $@ k8s.io/kops/cmd/nodeup .PHONY: dns-controller-push dns-controller-push: ko-dns-controller-push diff --git a/cmd/nodeup/main.go b/cmd/nodeup/main.go index 5c6735a9b2176..493acb13bf6a4 100644 --- a/cmd/nodeup/main.go +++ b/cmd/nodeup/main.go @@ -43,7 +43,7 @@ func main() { var flagConf, flagCacheDir, gitVersion string var flagRetries int - var dryrun, installSystemdUnit bool + var dryrun, installSystemdUnit, logMemory bool target := "direct" if kops.GitVersion != "" { @@ -56,6 +56,7 @@ func main() { flag.BoolVar(&dryrun, "dryrun", false, "Don't create cloud resources; just show what would be done") flag.StringVar(&target, "target", target, "Target - direct, dryrun") flag.BoolVar(&installSystemdUnit, "install-systemd-unit", installSystemdUnit, "If true, will install a systemd unit instead of running directly") + flag.BoolVar(&logMemory, "log-memory", true, "Log Go and cgroup memory statistics during nodeup") if dryrun { target = "dryrun" @@ -118,6 +119,7 @@ func main() { ConfigLocation: flagConf, Target: target, CacheDir: flagCacheDir, + LogMemory: logMemory, } err = cmd.Run(os.Stdout) if err == nil { diff --git a/upup/pkg/fi/nodeup/command.go b/upup/pkg/fi/nodeup/command.go index 039f079b79498..f938e8abee04e 100644 --- a/upup/pkg/fi/nodeup/command.go +++ b/upup/pkg/fi/nodeup/command.go @@ -77,12 +77,16 @@ const MaxTaskDuration = 365 * 24 * time.Hour type NodeUpCommand struct { CacheDir string ConfigLocation string + LogMemory bool Target string } // Run is responsible for perform the nodeup process func (c *NodeUpCommand) Run(out io.Writer) error { ctx := context.Background() + memoryLogger := newNodeupMemoryLogger(c.LogMemory) + memoryLogger.Log("start", nil) + defer memoryLogger.Log("exit", nil) var bootConfig nodeup.BootConfig if c.ConfigLocation != "" { @@ -102,6 +106,7 @@ func (c *NodeUpCommand) Run(out io.Writer) error { if c.CacheDir == "" { return fmt.Errorf("CacheDir is required") } + memoryLogger.Log("boot-config-loaded", nil) region, err := getRegion(ctx, &bootConfig) if err != nil { @@ -170,6 +175,7 @@ func (c *NodeUpCommand) Run(out io.Writer) error { if err != nil { return err } + memoryLogger.Log("nodeup-config-loaded", nil) architecture, err := architectures.FindArchitecture() if err != nil { @@ -189,6 +195,7 @@ func (c *NodeUpCommand) Run(out io.Writer) error { return fmt.Errorf("error adding asset %q: %v", asset, err) } } + memoryLogger.Log("assets-loaded", nil) var cloud fi.Cloud @@ -246,6 +253,7 @@ func (c *NodeUpCommand) Run(out io.Writer) error { if err := modelContext.Init(); err != nil { return err } + memoryLogger.Log("model-context-initialized", nil) switch bootConfig.CloudProvider { case api.CloudProviderAWS: @@ -290,10 +298,12 @@ func (c *NodeUpCommand) Run(out io.Writer) error { modelContext.GPUVendor = architectures.GPUVendorNvidia } } + memoryLogger.Log("node-metadata-loaded", nil) if err := loadKernelModules(modelContext, distribution); err != nil { return err } + memoryLogger.Log("kernel-modules-loaded", nil) loader := &Loader{} loader.Builders = append(loader.Builders, &model.DiscoveryService{NodeupModelContext: modelContext}) @@ -350,6 +360,8 @@ func (c *NodeUpCommand) Run(out io.Writer) error { } } // Protokube load image task is in ProtokubeBuilder + taskSummary := summarizeNodeupMemoryTasks(taskMap) + memoryLogger.Log("task-graph-built", &taskSummary) var target fi.NodeupTarget @@ -370,24 +382,33 @@ func (c *NodeUpCommand) Run(out io.Writer) error { if err != nil { klog.Exitf("error building context: %v", err) } + memoryLogger.Log("context-built", &taskSummary) var options fi.RunTasksOptions options.InitDefaults() + memoryLogger.Log("before-run-tasks", &taskSummary) + stopMemoryLogging := memoryLogger.StartPeriodic("run-tasks", &taskSummary) err = context.RunTasks(options) + stopMemoryLogging() if err != nil { + memoryLogger.Log("run-tasks-error", &taskSummary) klog.Exitf("error running tasks: %v", err) } + memoryLogger.Log("after-run-tasks", &taskSummary) err = target.Finish(taskMap) if err != nil { + memoryLogger.Log("target-finish-error", &taskSummary) klog.Exitf("error closing target: %v", err) } + memoryLogger.Log("after-target-finish", &taskSummary) if nodeupConfig.EnableLifecycleHook { if bootConfig.CloudProvider == api.CloudProviderAWS { err := completeWarmingLifecycleAction(ctx, cloud.(awsup.AWSCloud), modelContext) if err != nil { + memoryLogger.Log("lifecycle-hook-error", &taskSummary) return fmt.Errorf("failed to complete lifecylce action: %w", err) } } diff --git a/upup/pkg/fi/nodeup/memory_logging.go b/upup/pkg/fi/nodeup/memory_logging.go new file mode 100644 index 0000000000000..29e65c68d9aac --- /dev/null +++ b/upup/pkg/fi/nodeup/memory_logging.go @@ -0,0 +1,425 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeup + +import ( + "fmt" + "os" + "path" + "runtime" + "strconv" + "strings" + "time" + + "k8s.io/klog/v2" + "k8s.io/kops/upup/pkg/fi" + "k8s.io/kops/upup/pkg/fi/nodeup/nodetasks" +) + +const ( + nodeupLogMemoryEnv = "NODEUP_LOG_MEMORY" + nodeupLogMemoryIntervalEnv = "NODEUP_LOG_MEMORY_INTERVAL" + nodeupLogMemoryInterval = 30 * time.Second +) + +var nodeupCgroupMemoryStatKeys = []string{ + "anon", + "file", + "kernel", + "kernel_stack", + "pagetables", + "sock", + "shmem", + "slab", + "file_mapped", + "file_dirty", + "file_writeback", + "inactive_file", + "active_file", + "rss", + "cache", + "mapped_file", + "swap", + "total_rss", + "total_cache", + "total_mapped_file", +} + +type nodeupMemoryLogger struct { + enabled bool + interval time.Duration + start time.Time +} + +func newNodeupMemoryLogger(enabled bool) *nodeupMemoryLogger { + enabled = enabled || nodeupMemoryLoggingEnabledFromEnv() + interval := nodeupLogMemoryInterval + if enabled { + interval = nodeupMemoryLoggingIntervalFromEnv() + } + + return &nodeupMemoryLogger{ + enabled: enabled, + interval: interval, + start: time.Now(), + } +} + +func nodeupMemoryLoggingEnabledFromEnv() bool { + value := strings.TrimSpace(os.Getenv(nodeupLogMemoryEnv)) + if value == "" { + return false + } + + enabled, err := strconv.ParseBool(value) + if err != nil { + klog.Warningf("ignoring invalid %s=%q: %v", nodeupLogMemoryEnv, value, err) + return false + } + return enabled +} + +func nodeupMemoryLoggingIntervalFromEnv() time.Duration { + value := strings.TrimSpace(os.Getenv(nodeupLogMemoryIntervalEnv)) + if value == "" { + return nodeupLogMemoryInterval + } + + interval, err := time.ParseDuration(value) + if err != nil { + klog.Warningf("ignoring invalid %s=%q: %v", nodeupLogMemoryIntervalEnv, value, err) + return nodeupLogMemoryInterval + } + if interval < 0 { + klog.Warningf("ignoring negative %s=%q", nodeupLogMemoryIntervalEnv, value) + return nodeupLogMemoryInterval + } + return interval +} + +func (l *nodeupMemoryLogger) Log(phase string, taskSummary *nodeupMemoryTaskSummary) { + if l == nil || !l.enabled { + return + } + + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + cgroupMemory := readNodeupCgroupMemory() + + fields := []string{ + fmt.Sprintf("phase=%q", phase), + fmt.Sprintf("elapsed=%s", time.Since(l.start).Round(time.Millisecond)), + fmt.Sprintf("go_heap_alloc_bytes=%d", mem.HeapAlloc), + fmt.Sprintf("go_heap_inuse_bytes=%d", mem.HeapInuse), + fmt.Sprintf("go_heap_sys_bytes=%d", mem.HeapSys), + fmt.Sprintf("go_stack_inuse_bytes=%d", mem.StackInuse), + fmt.Sprintf("go_sys_bytes=%d", mem.Sys), + fmt.Sprintf("go_mallocs=%d", mem.Mallocs), + fmt.Sprintf("go_frees=%d", mem.Frees), + fmt.Sprintf("go_num_gc=%d", mem.NumGC), + fmt.Sprintf("cgroup_version=%q", cgroupMemory.version), + fmt.Sprintf("cgroup_path=%q", cgroupMemory.path), + fmt.Sprintf("cgroup_current_bytes=%d", cgroupMemory.current), + fmt.Sprintf("cgroup_peak_bytes=%d", cgroupMemory.peak), + fmt.Sprintf("cgroup_stat=%q", formatNodeupCgroupMemoryStats(cgroupMemory.stat)), + } + if taskSummary != nil { + fields = append(fields, taskSummary.logFields()...) + } + + klog.Infof("nodeup memory stats %s", strings.Join(fields, " ")) +} + +func (l *nodeupMemoryLogger) StartPeriodic(phase string, taskSummary *nodeupMemoryTaskSummary) func() { + if l == nil || !l.enabled || l.interval == 0 { + return func() {} + } + + stop := make(chan struct{}) + done := make(chan struct{}) + go func() { + defer close(done) + ticker := time.NewTicker(l.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + l.Log(phase, taskSummary) + case <-stop: + return + } + } + }() + + return func() { + close(stop) + <-done + } +} + +type nodeupMemoryTaskSummary struct { + tasks int + files int + fileContents int + directories int + services int + packages int + issueCerts int + kubeconfigs int + loadImages int + bootstrapTasks int +} + +func summarizeNodeupMemoryTasks(tasks map[string]fi.NodeupTask) nodeupMemoryTaskSummary { + summary := nodeupMemoryTaskSummary{ + tasks: len(tasks), + } + + for _, task := range tasks { + switch task := task.(type) { + case *nodetasks.File: + summary.files++ + if task.Type == nodetasks.FileType_Directory { + summary.directories++ + } + if task.Contents != nil { + summary.fileContents++ + } + case *nodetasks.Service: + summary.services++ + case *nodetasks.Package: + summary.packages++ + case *nodetasks.IssueCert: + summary.issueCerts++ + case *nodetasks.KubeConfig: + summary.kubeconfigs++ + case *nodetasks.LoadImageTask: + summary.loadImages++ + case *nodetasks.BootstrapClientTask: + summary.bootstrapTasks++ + } + } + + return summary +} + +func (s *nodeupMemoryTaskSummary) logFields() []string { + return []string{ + fmt.Sprintf("tasks=%d", s.tasks), + fmt.Sprintf("file_tasks=%d", s.files), + fmt.Sprintf("file_content_tasks=%d", s.fileContents), + fmt.Sprintf("directory_tasks=%d", s.directories), + fmt.Sprintf("service_tasks=%d", s.services), + fmt.Sprintf("package_tasks=%d", s.packages), + fmt.Sprintf("issue_cert_tasks=%d", s.issueCerts), + fmt.Sprintf("kubeconfig_tasks=%d", s.kubeconfigs), + fmt.Sprintf("load_image_tasks=%d", s.loadImages), + fmt.Sprintf("bootstrap_client_tasks=%d", s.bootstrapTasks), + } +} + +type nodeupCgroupMemory struct { + version string + path string + current int64 + peak int64 + stat map[string]int64 +} + +type nodeupCgroupMemoryCandidate struct { + version string + path string +} + +func readNodeupCgroupMemory() nodeupCgroupMemory { + result := nodeupCgroupMemory{ + version: "unavailable", + current: -1, + peak: -1, + } + + candidates := nodeupCgroupMemoryCandidates() + for _, candidate := range candidates { + current, currentOK := readNodeupInt64File(path.Join(candidate.path, nodeupCgroupMemoryCurrentFile(candidate.version))) + peak, peakOK := readNodeupInt64File(path.Join(candidate.path, nodeupCgroupMemoryPeakFile(candidate.version))) + stat, statOK := readNodeupCgroupMemoryStatFile(path.Join(candidate.path, "memory.stat")) + if !currentOK && !peakOK && !statOK { + continue + } + + result.version = candidate.version + result.path = candidate.path + if currentOK { + result.current = current + } + if peakOK { + result.peak = peak + } + if statOK { + result.stat = stat + } + return result + } + + return result +} + +func nodeupCgroupMemoryCurrentFile(version string) string { + if version == "v1" { + return "memory.usage_in_bytes" + } + return "memory.current" +} + +func nodeupCgroupMemoryPeakFile(version string) string { + if version == "v1" { + return "memory.max_usage_in_bytes" + } + return "memory.peak" +} + +func nodeupCgroupMemoryCandidates() []nodeupCgroupMemoryCandidate { + data, err := os.ReadFile("/proc/self/cgroup") + if err != nil { + return fallbackNodeupCgroupMemoryCandidates() + } + + candidates := nodeupCgroupMemoryCandidatesFromProcCgroup(string(data)) + if len(candidates) == 0 { + return fallbackNodeupCgroupMemoryCandidates() + } + return candidates +} + +func nodeupCgroupMemoryCandidatesFromProcCgroup(data string) []nodeupCgroupMemoryCandidate { + var candidates []nodeupCgroupMemoryCandidate + for _, line := range strings.Split(data, "\n") { + if line == "" { + continue + } + tokens := strings.SplitN(line, ":", 3) + if len(tokens) != 3 { + continue + } + + hierarchyID, controllers, cgroupPath := tokens[0], tokens[1], tokens[2] + switch { + case hierarchyID == "0" && controllers == "": + candidates = append(candidates, nodeupCgroupMemoryCandidate{ + version: "v2", + path: path.Join("/sys/fs/cgroup", cleanNodeupCgroupPath(cgroupPath)), + }) + case nodeupCgroupControllersIncludeMemory(controllers): + candidates = append(candidates, nodeupCgroupMemoryCandidate{ + version: "v1", + path: path.Join("/sys/fs/cgroup/memory", cleanNodeupCgroupPath(cgroupPath)), + }) + } + } + + candidates = append(candidates, fallbackNodeupCgroupMemoryCandidates()...) + return dedupeNodeupCgroupMemoryCandidates(candidates) +} + +func nodeupCgroupControllersIncludeMemory(controllers string) bool { + for _, controller := range strings.Split(controllers, ",") { + if controller == "memory" { + return true + } + } + return false +} + +func cleanNodeupCgroupPath(cgroupPath string) string { + cleaned := path.Clean("/" + cgroupPath) + if cleaned == "/" { + return "" + } + return strings.TrimPrefix(cleaned, "/") +} + +func fallbackNodeupCgroupMemoryCandidates() []nodeupCgroupMemoryCandidate { + return []nodeupCgroupMemoryCandidate{ + {version: "v2", path: "/sys/fs/cgroup"}, + {version: "v1", path: "/sys/fs/cgroup/memory"}, + } +} + +func dedupeNodeupCgroupMemoryCandidates(candidates []nodeupCgroupMemoryCandidate) []nodeupCgroupMemoryCandidate { + var deduped []nodeupCgroupMemoryCandidate + seen := make(map[nodeupCgroupMemoryCandidate]bool) + for _, candidate := range candidates { + if seen[candidate] { + continue + } + seen[candidate] = true + deduped = append(deduped, candidate) + } + return deduped +} + +func readNodeupInt64File(filePath string) (int64, bool) { + data, err := os.ReadFile(filePath) + if err != nil { + return 0, false + } + value := strings.TrimSpace(string(data)) + if value == "" || value == "max" { + return 0, false + } + n, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return 0, false + } + return n, true +} + +func readNodeupCgroupMemoryStatFile(filePath string) (map[string]int64, bool) { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, false + } + return parseNodeupCgroupMemoryStat(string(data)), true +} + +func parseNodeupCgroupMemoryStat(data string) map[string]int64 { + stats := make(map[string]int64) + for _, line := range strings.Split(data, "\n") { + fields := strings.Fields(line) + if len(fields) != 2 { + continue + } + value, err := strconv.ParseInt(fields[1], 10, 64) + if err != nil { + continue + } + stats[fields[0]] = value + } + return stats +} + +func formatNodeupCgroupMemoryStats(stats map[string]int64) string { + var parts []string + for _, key := range nodeupCgroupMemoryStatKeys { + value, ok := stats[key] + if !ok { + continue + } + parts = append(parts, fmt.Sprintf("%s=%d", key, value)) + } + return strings.Join(parts, ",") +} diff --git a/upup/pkg/fi/nodeup/memory_logging_test.go b/upup/pkg/fi/nodeup/memory_logging_test.go new file mode 100644 index 0000000000000..3809de9112676 --- /dev/null +++ b/upup/pkg/fi/nodeup/memory_logging_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeup + +import ( + "reflect" + "testing" +) + +func TestNodeupCgroupMemoryCandidatesFromProcCgroupV2(t *testing.T) { + candidates := nodeupCgroupMemoryCandidatesFromProcCgroup("0::/system.slice/kops-configuration.service\n") + + expected := []nodeupCgroupMemoryCandidate{ + {version: "v2", path: "/sys/fs/cgroup/system.slice/kops-configuration.service"}, + {version: "v2", path: "/sys/fs/cgroup"}, + {version: "v1", path: "/sys/fs/cgroup/memory"}, + } + if !reflect.DeepEqual(candidates, expected) { + t.Fatalf("unexpected candidates:\nexpected: %#v\nactual: %#v", expected, candidates) + } +} + +func TestNodeupCgroupMemoryCandidatesFromProcCgroupV1(t *testing.T) { + candidates := nodeupCgroupMemoryCandidatesFromProcCgroup("9:cpu,cpuacct:/ignored\n8:memory:/system.slice/kops-configuration.service\n") + + expected := []nodeupCgroupMemoryCandidate{ + {version: "v1", path: "/sys/fs/cgroup/memory/system.slice/kops-configuration.service"}, + {version: "v2", path: "/sys/fs/cgroup"}, + {version: "v1", path: "/sys/fs/cgroup/memory"}, + } + if !reflect.DeepEqual(candidates, expected) { + t.Fatalf("unexpected candidates:\nexpected: %#v\nactual: %#v", expected, candidates) + } +} + +func TestParseNodeupCgroupMemoryStat(t *testing.T) { + stats := parseNodeupCgroupMemoryStat(`anon 123 +file 456 +malformed +kernel_stack not-a-number +slab 789 +`) + + expected := map[string]int64{ + "anon": 123, + "file": 456, + "slab": 789, + } + if !reflect.DeepEqual(stats, expected) { + t.Fatalf("unexpected stats:\nexpected: %#v\nactual: %#v", expected, stats) + } +} + +func TestFormatNodeupCgroupMemoryStats(t *testing.T) { + stats := map[string]int64{ + "file": 456, + "anon": 123, + "slab": 789, + } + + expected := "anon=123,file=456,slab=789" + if actual := formatNodeupCgroupMemoryStats(stats); actual != expected { + t.Fatalf("unexpected formatted stats: expected %q, got %q", expected, actual) + } +}