diff --git a/cmd/tidb-server/BUILD.bazel b/cmd/tidb-server/BUILD.bazel index 4ff8cdac2f33f..d163d7a3981cf 100644 --- a/cmd/tidb-server/BUILD.bazel +++ b/cmd/tidb-server/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/keyspace", "//pkg/kv", "//pkg/metrics", + "//pkg/metrics/common", "//pkg/parser/mysql", "//pkg/parser/terror", "//pkg/parser/types", @@ -67,11 +68,16 @@ go_library( "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/keyspacepb", "@com_github_pingcap_log//:log", "@com_github_prometheus_client_golang//prometheus", "@com_github_prometheus_client_golang//prometheus/push", + "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//txnkv/transaction", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//opt", + "@com_github_tikv_pd_client//pkg/caller", "@org_uber_go_automaxprocs//maxprocs", "@org_uber_go_zap//:zap", ], @@ -107,7 +113,7 @@ go_test( srcs = ["main_test.go"], embed = [":tidb-server_lib"], flaky = True, - shard_count = 6, + shard_count = 9, deps = [ "//pkg/config", "//pkg/config/deploymode", @@ -116,6 +122,7 @@ go_test( "//pkg/sessionctx/vardef", "//pkg/sessionctx/variable", "//pkg/testkit/testsetup", + "@com_github_pingcap_kvproto//pkg/keyspacepb", "@com_github_stretchr_testify//require", "@io_opencensus_go//stats/view", "@org_uber_go_goleak//:goleak", diff --git a/cmd/tidb-server/main.go b/cmd/tidb-server/main.go index 19593cc379599..2ed4a719d6174 100644 --- a/cmd/tidb-server/main.go +++ b/cmd/tidb-server/main.go @@ -19,6 +19,7 @@ import ( "flag" "fmt" "io/fs" + "maps" "os" "runtime" "strconv" @@ -30,6 +31,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" @@ -44,6 +46,7 @@ import ( "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" + metricscommon "github.com/pingcap/tidb/pkg/metrics/common" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" parsertypes "github.com/pingcap/tidb/pkg/parser/types" @@ -90,8 +93,12 @@ import ( repository "github.com/pingcap/tidb/pkg/util/workloadrepo" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" + tikvconfig "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" + pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" "go.uber.org/automaxprocs/maxprocs" "go.uber.org/zap" ) @@ -335,6 +342,8 @@ func main() { signal.SetupUSR1Handler() err = registerStores() terror.MustNil(err) + err = prepareKeyspaceObservability() + terror.MustNil(err) err = metricsutil.RegisterMetrics() terror.MustNil(err) @@ -1146,6 +1155,87 @@ func closeStmtSummary() { } } +var keyspaceMetaComponentName = caller.Component("tidb-keyspace-meta") + +const ( + keyspaceIDMetricLabel = "keyspace_id" + keyspaceNameMetricLabel = "keyspace_name" +) + +func prepareKeyspaceObservability() error { + cfg := config.GetGlobalConfig() + if !kerneltype.IsNextGen() { + return nil + } + if keyspace.IsKeyspaceNameEmpty(cfg.KeyspaceName) || cfg.Store != config.StoreTypeTiKV { + return nil + } + metricscommon.SetConstLabels(keyspaceNameMetricLabel, cfg.KeyspaceName) + pdAddrs, _, _, err := tikvconfig.ParsePath("tikv://" + cfg.Path) + if err != nil { + return err + } + timeoutSec := time.Duration(cfg.PDClient.PDServerTimeout) * time.Second + pdCli, err := pd.NewClient(keyspaceMetaComponentName, pdAddrs, pd.SecurityOption{ + CAPath: cfg.Security.ClusterSSLCA, + CertPath: cfg.Security.ClusterSSLCert, + KeyPath: cfg.Security.ClusterSSLKey, + }, opt.WithCustomTimeoutOption(timeoutSec), opt.WithInitMetricsOption(false)) + if err != nil { + return err + } + defer pdCli.Close() + + keyspaceMeta, err := getKeyspaceMeta(pdCli, cfg.KeyspaceName) + if err != nil { + return err + } + keyspace.SetKeyspaceMeta(keyspaceMeta) + return prepareKeyspaceObservabilityWithKeyspaceMeta(keyspaceMeta, cfg.KeyspaceName, deploymode.IsStarter()) +} + +func getKeyspaceMeta(pdCli pd.Client, keyspaceName string) (*keyspacepb.KeyspaceMeta, error) { + var keyspaceMeta *keyspacepb.KeyspaceMeta + err := util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (bool, error) { + var errInner error + keyspaceMeta, errInner = pdCli.LoadKeyspace(context.TODO(), keyspaceName) + if kvstore.IsNotBootstrappedError(errInner) || kvstore.IsKeyspaceNotExistError(errInner) { + return true, errInner + } + return false, errInner + }) + if err != nil { + return nil, err + } + return keyspaceMeta, nil +} + +func prepareKeyspaceObservabilityWithKeyspaceMeta(keyspaceMeta *keyspacepb.KeyspaceMeta, keyspaceName string, includeConfiguredFields bool) error { + if keyspaceMeta == nil { + return nil + } + resolvedValues := config.KeyspaceObservabilityValues{ + MetricLabels: map[string]string{ + keyspaceIDMetricLabel: fmt.Sprint(keyspaceMeta.GetId()), + keyspaceNameMetricLabel: keyspaceName, + }, + } + if includeConfiguredFields { + copiedConfig := *config.GetGlobalConfig() + if err := copiedConfig.ResolveKeyspaceObservability(keyspaceMeta.GetConfig()); err != nil { + return err + } + configuredValues := copiedConfig.KeyspaceObservabilityValues.Clone() + maps.Copy(resolvedValues.MetricLabels, configuredValues.MetricLabels) + resolvedValues.SlowLogFields = configuredValues.SlowLogFields + resolvedValues.StmtLogFields = configuredValues.StmtLogFields + } + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceObservabilityValues = resolvedValues + }) + return nil +} + func enablePyroscope() { if os.Getenv("PYROSCOPE_SERVER_ADDRESS") != "" { runtime.SetMutexProfileFraction(5) diff --git a/cmd/tidb-server/main_test.go b/cmd/tidb-server/main_test.go index 06aeafea45487..1d0a072d2ed6b 100644 --- a/cmd/tidb-server/main_test.go +++ b/cmd/tidb-server/main_test.go @@ -18,6 +18,7 @@ import ( "os" "testing" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/config/deploymode" "github.com/pingcap/tidb/pkg/config/kerneltype" @@ -154,3 +155,63 @@ func TestSetVersionByConfigNormalizeLegacyPlaceholderForNextGen(t *testing.T) { require.Equal(t, "v26.3.0", mysql.TiDBReleaseVersion) require.Equal(t, "8.0.11-TiDB-CLOUD.202603.0", mysql.ServerVersion) } + +func TestSetupKeyspaceObservabilityForStarter(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceObservability = config.KeyspaceObservability{ + Fields: []config.KeyspaceObservabilityField{{ + Source: "meta_a", + MetricLabel: "keyspace_meta_label_a", + SlowLogField: "Slow_meta_a", + StmtLogField: "stmt_meta_a", + Required: true, + }}, + } + }) + + err := prepareKeyspaceObservabilityWithKeyspaceMeta(&keyspacepb.KeyspaceMeta{ + Id: 42, + Config: map[string]string{"meta_a": "value_a"}, + }, "ks", true) + require.NoError(t, err) + + cfg := config.GetGlobalConfig() + require.Equal(t, map[string]string{"keyspace_id": "42", "keyspace_name": "ks", "keyspace_meta_label_a": "value_a"}, cfg.GetKeyspaceObservabilityMetricLabels()) + require.Equal(t, []config.KeyspaceObservabilityFieldPair{{Key: "Slow_meta_a", Value: "value_a"}}, cfg.GetKeyspaceObservabilitySlowLogFields()) + require.Equal(t, []config.KeyspaceObservabilityFieldPair{{Key: "stmt_meta_a", Value: "value_a"}}, cfg.GetKeyspaceObservabilityStmtLogFields()) +} + +func TestSetupKeyspaceObservabilityForNonStarter(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + + err := prepareKeyspaceObservabilityWithKeyspaceMeta(&keyspacepb.KeyspaceMeta{ + Id: 42, + Config: map[string]string{"meta_a": "value_a"}, + }, "ks", false) + require.NoError(t, err) + + cfg := config.GetGlobalConfig() + require.Equal(t, map[string]string{"keyspace_id": "42", "keyspace_name": "ks"}, cfg.GetKeyspaceObservabilityMetricLabels()) + require.Empty(t, cfg.GetKeyspaceObservabilitySlowLogFields()) + require.Empty(t, cfg.GetKeyspaceObservabilityStmtLogFields()) +} + +func TestSetupKeyspaceObservabilityForStartSkipsClassic(t *testing.T) { + if !kerneltype.IsClassic() { + t.Skip("only verifies the classic-mode short-circuit path") + } + + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Store = config.StoreTypeTiKV + conf.Path = "invalid-pd-path" + conf.KeyspaceName = "test_keyspace" + }) + + require.NoError(t, prepareKeyspaceObservability()) + require.Empty(t, config.GetGlobalConfig().GetKeyspaceObservabilityMetricLabels()) +} diff --git a/pkg/config/BUILD.bazel b/pkg/config/BUILD.bazel index 57b2c3b89a90c..5ab743a7d063b 100644 --- a/pkg/config/BUILD.bazel +++ b/pkg/config/BUILD.bazel @@ -42,7 +42,7 @@ go_test( data = glob(["**"]), embed = [":config"], flaky = True, - shard_count = 32, + shard_count = 34, deps = [ "//pkg/config/deploymode", "//pkg/config/kerneltype", diff --git a/pkg/config/config.go b/pkg/config/config.go index a069b87e82ef2..bc244dd632e8c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -262,6 +262,9 @@ type Config struct { // key will be the default value of the session variable `txn_scope` for this tidb-server. Labels map[string]string `toml:"labels" json:"labels"` + KeyspaceObservability KeyspaceObservability `toml:"keyspace-observability" json:"keyspace-observability"` + KeyspaceObservabilityValues KeyspaceObservabilityValues `toml:"-" json:"-"` + // EnableGlobalIndex is deprecated. EnableGlobalIndex bool `toml:"enable-global-index" json:"enable-global-index"` @@ -433,6 +436,423 @@ func encodeDefTempStorageDir(tempDir string, host, statusHost string, port, stat return filepath.Join(tempDir, osUID+"_tidb", dirName, "tmp-storage") } +// KeyspaceObservability maps metadata entries to observability outputs. +type KeyspaceObservability struct { + Fields []KeyspaceObservabilityField `toml:"fields" json:"fields"` +} + +// KeyspaceObservabilityField describes one metadata entry mapping. +type KeyspaceObservabilityField struct { + Source string `toml:"source" json:"source"` + MetricLabel string `toml:"metric-label" json:"metric-label,omitempty"` + SlowLogField string `toml:"slow-log-field" json:"slow-log-field,omitempty"` + StmtLogField string `toml:"stmt-log-field" json:"stmt-log-field,omitempty"` + Required bool `toml:"required" json:"required"` +} + +// KeyspaceObservabilityValues stores resolved metadata values. +type KeyspaceObservabilityValues struct { + MetricLabels map[string]string `toml:"-" json:"-"` + SlowLogFields []KeyspaceObservabilityFieldPair `toml:"-" json:"-"` + StmtLogFields []KeyspaceObservabilityFieldPair `toml:"-" json:"-"` +} + +// KeyspaceObservabilityFieldPair stores one resolved output field. +type KeyspaceObservabilityFieldPair struct { + Key string + Value string +} + +const keyspaceObservabilityMetricLabelPrefix = "keyspace_meta_" + +var reservedKeyspaceObservabilitySlowLogFields = map[string]struct{}{ + "backoff_detail": {}, + "backoff_time": {}, + "backoff_total": {}, + "backoff_types": {}, + "binary_plan": {}, + "commit_backoff_time": {}, + "commit_primary_rpc_detail": {}, + "commit_time": {}, + "compile_time": {}, + "conn_id": {}, + "cop_backoff_": {}, + "cop_mvcc_read_amplification": {}, + "cop_proc_addr": {}, + "cop_proc_avg": {}, + "cop_proc_max": {}, + "cop_proc_p90": {}, + "cop_time": {}, + "cop_wait_addr": {}, + "cop_wait_avg": {}, + "cop_wait_max": {}, + "cop_wait_p90": {}, + "db": {}, + "digest": {}, + "disk_max": {}, + "exec_retry_count": {}, + "exec_retry_time": {}, + "get_commit_ts_time": {}, + "get_latest_ts_time": {}, + "get_snapshot_time": {}, + "has_more_results": {}, + "host": {}, + "index_names": {}, + "is_internal": {}, + "isexplicittxn": {}, + "issyncstatsfailed": {}, + "iswritecachetable": {}, + "keyspace_id": {}, + "keyspace_name": {}, + "kv_total": {}, + "local_latch_wait_time": {}, + "lockkeys_time": {}, + "mem_arbitration": {}, + "mem_max": {}, + "num_cop_tasks": {}, + "opt_binding_match": {}, + "opt_logical": {}, + "opt_physical": {}, + "opt_stats_derive": {}, + "opt_stats_sync_wait": {}, + "optimize_time": {}, + "parse_time": {}, + "pd_total": {}, + "plan": {}, + "plan_digest": {}, + "plan_from_binding": {}, + "plan_from_cache": {}, + "preproc_subqueries": {}, + "preproc_subqueries_time": {}, + "prepared": {}, + "prewrite_backoff_types": {}, + "prewrite_region": {}, + "prewrite_time": {}, + "prev_stmt": {}, + "process_keys": {}, + "process_time": {}, + "query": {}, + "query_time": {}, + "request_count": {}, + "request_unit_read": {}, + "request_unit_v2": {}, + "request_unit_v2_detail": {}, + "request_unit_write": {}, + "resolve_lock_time": {}, + "resource_group": {}, + "result_rows": {}, + "rewrite_time": {}, + "rocksdb_block_cache_hit_count": {}, + "rocksdb_block_read_byte": {}, + "rocksdb_block_read_count": {}, + "rocksdb_block_read_time": {}, + "rocksdb_delete_skipped_count": {}, + "rocksdb_key_skipped_count": {}, + "session_alias": {}, + "session_connect_attrs": {}, + "slowest_prewrite_rpc_detail": {}, + "stats": {}, + "storage_from_kv": {}, + "storage_from_mpp": {}, + "succ": {}, + "tidb_cpu_time": {}, + "tikv_cpu_time": {}, + "time": {}, + "time_queued_by_rc": {}, + "total_keys": {}, + "txn_retry": {}, + "txn_start_ts": {}, + "unpacked_bytes_received_tiflash_cross_zone": {}, + "unpacked_bytes_received_tiflash_total": {}, + "unpacked_bytes_received_tikv_cross_zone": {}, + "unpacked_bytes_received_tikv_total": {}, + "unpacked_bytes_sent_tiflash_cross_zone": {}, + "unpacked_bytes_sent_tiflash_total": {}, + "unpacked_bytes_sent_tikv_cross_zone": {}, + "unpacked_bytes_sent_tikv_total": {}, + "user": {}, + "user@host": {}, + "wait_prewrite_binlog_time": {}, + "wait_time": {}, + "wait_ts": {}, + "warnings": {}, + "write_keys": {}, + "write_size": {}, + "write_sql_response_total": {}, +} + +var reservedKeyspaceObservabilitySlowLogFieldPrefixes = []string{ + "cop_backoff_", +} + +var reservedKeyspaceObservabilityStmtLogFields = map[string]struct{}{ + "auth_users": {}, + "backoff_types": {}, + "begin": {}, + "binding_digest": {}, + "binding_sql": {}, + "charset": {}, + "collation": {}, + "commit_count": {}, + "digest": {}, + "end": {}, + "exec_count": {}, + "exec_retry_count": {}, + "exec_retry_time": {}, + "first_seen": {}, + "index_names": {}, + "is_internal": {}, + "keyspace_id": {}, + "keyspace_name": {}, + "last_seen": {}, + "max_backoff_time": {}, + "max_commit_backoff_time": {}, + "max_commit_time": {}, + "max_compile_latency": {}, + "max_cop_process_address": {}, + "max_cop_process_time": {}, + "max_cop_wait_address": {}, + "max_cop_wait_time": {}, + "max_disk": {}, + "max_get_commit_ts_time": {}, + "max_latency": {}, + "max_local_latch_time": {}, + "max_mem": {}, + "max_mem_arbitration": {}, + "max_parse_latency": {}, + "max_prewrite_region_num": {}, + "max_prewrite_time": {}, + "max_process_time": {}, + "max_processed_keys": {}, + "max_resolve_lock_time": {}, + "max_result_rows": {}, + "max_rocksdb_block_cache_hit_count": {}, + "max_rocksdb_block_read_byte": {}, + "max_rocksdb_block_read_count": {}, + "max_rocksdb_delete_skipped_count": {}, + "max_rocksdb_key_skipped_count": {}, + "max_rru": {}, + "max_ru_wait_duration": {}, + "max_ruv2": {}, + "max_total_keys": {}, + "max_txn_retry": {}, + "max_wait_time": {}, + "max_write_keys": {}, + "max_write_size": {}, + "max_wru": {}, + "min_latency": {}, + "min_result_rows": {}, + "normalized_sql": {}, + "plan_cache_hits": {}, + "plan_cache_unqualified_count": {}, + "plan_cache_unqualified_last_reason": {}, + "plan_digest": {}, + "plan_hint": {}, + "plan_in_binding": {}, + "plan_in_cache": {}, + "prepared": {}, + "prev_sql": {}, + "resource_group_name": {}, + "sample_binary_plan": {}, + "sample_plan": {}, + "sample_sql": {}, + "schema_name": {}, + "stmt_type": {}, + "storage_kv": {}, + "storage_mpp": {}, + "sum_affected_rows": {}, + "sum_backoff_time": {}, + "sum_backoff_times": {}, + "sum_backoff_total": {}, + "sum_commit_backoff_time": {}, + "sum_commit_time": {}, + "sum_compile_latency": {}, + "sum_disk": {}, + "sum_errors": {}, + "sum_get_commit_ts_time": {}, + "sum_kv_total": {}, + "sum_latency": {}, + "sum_local_latch_time": {}, + "sum_mem": {}, + "sum_mem_arbitration": {}, + "sum_num_cop_tasks": {}, + "sum_parse_latency": {}, + "sum_pd_total": {}, + "sum_prewrite_region_num": {}, + "sum_prewrite_time": {}, + "sum_process_time": {}, + "sum_processed_keys": {}, + "sum_resolve_lock_time": {}, + "sum_result_rows": {}, + "sum_rocksdb_block_cache_hit_count": {}, + "sum_rocksdb_block_read_byte": {}, + "sum_rocksdb_block_read_count": {}, + "sum_rocksdb_delete_skipped_count": {}, + "sum_rocksdb_key_skipped_count": {}, + "sum_rru": {}, + "sum_ru_wait_duration": {}, + "sum_ruv2": {}, + "sum_tidb_cpu": {}, + "sum_tikv_cpu": {}, + "sum_total_keys": {}, + "sum_txn_retry": {}, + "sum_wait_time": {}, + "sum_warnings": {}, + "sum_write_keys": {}, + "sum_write_size": {}, + "sum_write_sql_resp_total": {}, + "sum_wru": {}, + "table_names": {}, + "unpacked_bytes_received_tiflash_cross_zone": {}, + "unpacked_bytes_received_tiflash_total": {}, + "unpacked_bytes_received_tikv_cross_zone": {}, + "unpacked_bytes_received_tikv_total": {}, + "unpacked_bytes_send_tiflash_cross_zone": {}, + "unpacked_bytes_send_tiflash_total": {}, + "unpacked_bytes_send_tikv_cross_zone": {}, + "unpacked_bytes_send_tikv_total": {}, +} + +// Valid validates metadata observability mappings. +func (o KeyspaceObservability) Valid() error { + metricLabels := make(map[string]struct{}, len(o.Fields)) + slowLogFields := make(map[string]struct{}, len(o.Fields)) + stmtLogFields := make(map[string]struct{}, len(o.Fields)) + for i, field := range o.Fields { + if field.Source == "" { + return fmt.Errorf("[keyspace-observability.fields.%d] source cannot be empty", i) + } + if field.MetricLabel == "" && field.SlowLogField == "" && field.StmtLogField == "" { + return fmt.Errorf("[keyspace-observability.fields.%d] at least one output must be set", i) + } + if field.MetricLabel != "" { + if !validPrometheusLabelName(field.MetricLabel) { + return fmt.Errorf("[keyspace-observability.fields.%d] invalid metric-label %q", i, field.MetricLabel) + } + key := strings.ToLower(field.MetricLabel) + if !strings.HasPrefix(key, keyspaceObservabilityMetricLabelPrefix) { + return fmt.Errorf("[keyspace-observability.fields.%d] metric-label %q must start with %q", i, field.MetricLabel, keyspaceObservabilityMetricLabelPrefix) + } + if _, ok := metricLabels[key]; ok { + return fmt.Errorf("[keyspace-observability.fields.%d] duplicated metric-label %q", i, field.MetricLabel) + } + metricLabels[key] = struct{}{} + } + if field.SlowLogField != "" { + if !validKeyspaceObservabilityLogFieldName(field.SlowLogField) { + return fmt.Errorf("[keyspace-observability.fields.%d] invalid slow-log-field %q", i, field.SlowLogField) + } + key := strings.ToLower(field.SlowLogField) + if isReservedKeyspaceObservabilitySlowLogField(key) { + return fmt.Errorf("[keyspace-observability.fields.%d] reserved slow-log-field %q", i, field.SlowLogField) + } + if _, ok := slowLogFields[key]; ok { + return fmt.Errorf("[keyspace-observability.fields.%d] duplicated slow-log-field %q", i, field.SlowLogField) + } + slowLogFields[key] = struct{}{} + } + if field.StmtLogField != "" { + key := strings.ToLower(field.StmtLogField) + if _, ok := reservedKeyspaceObservabilityStmtLogFields[key]; ok { + return fmt.Errorf("[keyspace-observability.fields.%d] reserved stmt-log-field %q", i, field.StmtLogField) + } + if _, ok := stmtLogFields[key]; ok { + return fmt.Errorf("[keyspace-observability.fields.%d] duplicated stmt-log-field %q", i, field.StmtLogField) + } + stmtLogFields[key] = struct{}{} + } + } + return nil +} + +func isReservedKeyspaceObservabilitySlowLogField(field string) bool { + if _, ok := reservedKeyspaceObservabilitySlowLogFields[field]; ok { + return true + } + for _, prefix := range reservedKeyspaceObservabilitySlowLogFieldPrefixes { + if strings.HasPrefix(field, prefix) { + return true + } + } + return false +} + +func validKeyspaceObservabilityLogFieldName(field string) bool { + return validPrometheusLabelName(field) +} + +func validPrometheusLabelName(label string) bool { + for i, r := range label { + if i == 0 { + if r == '_' || r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' { + continue + } + return false + } + if r == '_' || r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' { + continue + } + return false + } + return label != "" +} + +// ResolveKeyspaceObservability resolves configured output values from metadata. +func (c *Config) ResolveKeyspaceObservability(values map[string]string) error { + resolved := KeyspaceObservabilityValues{ + MetricLabels: make(map[string]string), + } + for _, field := range c.KeyspaceObservability.Fields { + value, ok := values[field.Source] + if !ok { + if field.Required { + return fmt.Errorf("missing required keyspace metadata entry %q", field.Source) + } + continue + } + if field.MetricLabel != "" { + resolved.MetricLabels[field.MetricLabel] = value + } + if field.SlowLogField != "" { + resolved.SlowLogFields = append(resolved.SlowLogFields, KeyspaceObservabilityFieldPair{Key: field.SlowLogField, Value: value}) + } + if field.StmtLogField != "" { + resolved.StmtLogFields = append(resolved.StmtLogFields, KeyspaceObservabilityFieldPair{Key: field.StmtLogField, Value: value}) + } + } + c.KeyspaceObservabilityValues = resolved.Clone() + return nil +} + +// Clone returns a deep copy of resolved metadata observability values. +func (v KeyspaceObservabilityValues) Clone() KeyspaceObservabilityValues { + res := KeyspaceObservabilityValues{} + if len(v.MetricLabels) > 0 { + res.MetricLabels = make(map[string]string, len(v.MetricLabels)) + for k, value := range v.MetricLabels { + res.MetricLabels[k] = value + } + } + res.SlowLogFields = append([]KeyspaceObservabilityFieldPair(nil), v.SlowLogFields...) + res.StmtLogFields = append([]KeyspaceObservabilityFieldPair(nil), v.StmtLogFields...) + return res +} + +// GetKeyspaceObservabilityMetricLabels returns resolved metric labels. +func (c *Config) GetKeyspaceObservabilityMetricLabels() map[string]string { + return c.KeyspaceObservabilityValues.Clone().MetricLabels +} + +// GetKeyspaceObservabilitySlowLogFields returns resolved slow log fields. +func (c *Config) GetKeyspaceObservabilitySlowLogFields() []KeyspaceObservabilityFieldPair { + return c.KeyspaceObservabilityValues.Clone().SlowLogFields +} + +// GetKeyspaceObservabilityStmtLogFields returns resolved statement log fields. +func (c *Config) GetKeyspaceObservabilityStmtLogFields() []KeyspaceObservabilityFieldPair { + return c.KeyspaceObservabilityValues.Clone().StmtLogFields +} + // nullableBool defaults unset bool options to unset instead of false, which enables us to know if the user has set 2 // conflict options at the same time. type nullableBool struct { @@ -1473,12 +1893,18 @@ func (c *Config) Valid() error { if !kerneltype.IsNextGen() && c.DeployMode != deploymode.Premium { return fmt.Errorf("deploy-mode can only be configured for nextgen TiDB") } + if len(c.KeyspaceObservability.Fields) > 0 && c.DeployMode != deploymode.Starter { + return fmt.Errorf("keyspace-observability.fields can only be configured when deploy-mode is starter") + } if c.DXFResourceLimit < MinDXFResourceLimit || c.DXFResourceLimit > MaxDXFResourceLimit { return fmt.Errorf("dxf-resource-limit should be between %d and %d", MinDXFResourceLimit, MaxDXFResourceLimit) } if c.DXFResourceLimit != DefDXFResourceLimit && c.DeployMode != deploymode.PremiumReserved { return fmt.Errorf("dxf-resource-limit can only be configured when deploy-mode is premium_reserved") } + if err := c.KeyspaceObservability.Valid(); err != nil { + return err + } if c.Store == StoreTypeMockTiKV && !c.Instance.TiDBEnableDDL.Load() { return fmt.Errorf("can't disable DDL on mocktikv") } diff --git a/pkg/config/config.toml.example b/pkg/config/config.toml.example index 70d5f1ad6916d..64b0d551ddde3 100644 --- a/pkg/config/config.toml.example +++ b/pkg/config/config.toml.example @@ -480,6 +480,15 @@ tikv-raftstore-store-write-trigger-wb-bytes = 0.00006100 tikv-storage-processed-keys-batch-get = 0.00266791 tikv-storage-processed-keys-get = 0.01416829 +# Map selected keyspace metadata entries to observability outputs. +# Only valid when deploy-mode is starter. +# [[keyspace-observability.fields]] +# source = "meta_key" +# metric-label = "metric_label" +# slow-log-field = "Slow_log_field" +# stmt-log-field = "stmt_log_field" +# required = false + # instance scope variables # These options are also available as a system variable for online configuration # changes to the system variable do not persist to the cluster. You must make changes diff --git a/pkg/config/config.toml.nextgen.example b/pkg/config/config.toml.nextgen.example index f8ad031a58d92..a04d6a19c0c39 100644 --- a/pkg/config/config.toml.nextgen.example +++ b/pkg/config/config.toml.nextgen.example @@ -446,6 +446,15 @@ allow-expression-index = false # engines means allow the tidb server read data from which types of engines. options: "tikv", "tiflash", "tidb". engines = ["tikv", "tiflash", "tidb"] +# Map selected keyspace metadata entries to observability outputs. +# Only valid when deploy-mode is starter. +# [[keyspace-observability.fields]] +# source = "meta_key" +# metric-label = "metric_label" +# slow-log-field = "Slow_log_field" +# stmt-log-field = "stmt_log_field" +# required = false + # instance scope variables # These options are also available as a system variable for online configuration # changes to the system variable do not persist to the cluster. You must make changes diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 5628ff4a6242d..16adb20ccbc0c 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -170,6 +170,201 @@ disable-error-stack = false `, nbFalse, nbUnset, nbUnset, nbUnset, false, true) } +func TestKeyspaceObservability(t *testing.T) { + conf := NewConfig() + content := ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "keyspace_meta_label_a" +slow-log-field = "Slow_meta_a" +stmt-log-field = "stmt_meta_a" +required = true + +[[keyspace-observability.fields]] +source = "meta_b" +metric-label = "keyspace_meta_label_b" +` + _, err := toml.Decode(content, conf) + require.NoError(t, err) + require.NoError(t, conf.KeyspaceObservability.Valid()) + require.NoError(t, conf.ResolveKeyspaceObservability(map[string]string{ + "meta_a": "value_a", + "meta_b": "value_b", + })) + require.Equal(t, map[string]string{"keyspace_meta_label_a": "value_a", "keyspace_meta_label_b": "value_b"}, conf.GetKeyspaceObservabilityMetricLabels()) + require.Equal(t, []KeyspaceObservabilityFieldPair{{Key: "Slow_meta_a", Value: "value_a"}}, conf.GetKeyspaceObservabilitySlowLogFields()) + require.Equal(t, []KeyspaceObservabilityFieldPair{{Key: "stmt_meta_a", Value: "value_a"}}, conf.GetKeyspaceObservabilityStmtLogFields()) + + metricLabels := conf.GetKeyspaceObservabilityMetricLabels() + metricLabels["keyspace_meta_label_a"] = "changed" + require.Equal(t, "value_a", conf.GetKeyspaceObservabilityMetricLabels()["keyspace_meta_label_a"]) + + require.ErrorContains(t, conf.ResolveKeyspaceObservability(map[string]string{"meta_b": "value_b"}), `missing required keyspace metadata entry "meta_a"`) +} + +func TestKeyspaceObservabilityInvalid(t *testing.T) { + tests := []struct { + name string + content string + err string + }{ + { + name: "empty source", + content: ` +[[keyspace-observability.fields]] +source = "" +metric-label = "keyspace_meta_label_a" +`, + err: "source cannot be empty", + }, + { + name: "empty output", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +`, + err: "at least one output must be set", + }, + { + name: "invalid label", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "1_label" +`, + err: `invalid metric-label "1_label"`, + }, + { + name: "duplicate label", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "keyspace_meta_label_a" + +[[keyspace-observability.fields]] +source = "meta_b" +metric-label = "KEYSPACE_META_LABEL_A" +`, + err: `duplicated metric-label "KEYSPACE_META_LABEL_A"`, + }, + { + name: "reserved label without prefix", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "KEYSPACE_ID" +`, + err: `metric-label "KEYSPACE_ID" must start with "keyspace_meta_"`, + }, + { + name: "metric variable label without prefix", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "TYPE" +`, + err: `metric-label "TYPE" must start with "keyspace_meta_"`, + }, + { + name: "api label without prefix", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "api" +`, + err: `metric-label "api" must start with "keyspace_meta_"`, + }, + { + name: "service scope label without prefix", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "service_scope" +`, + err: `metric-label "service_scope" must start with "keyspace_meta_"`, + }, + { + name: "task id label without prefix", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "task_id" +`, + err: `metric-label "task_id" must start with "keyspace_meta_"`, + }, + { + name: "reserved slow log field", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +slow-log-field = "Digest" +`, + err: `reserved slow-log-field "Digest"`, + }, + { + name: "invalid slow log field", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +slow-log-field = "Bad Field" +`, + err: `invalid slow-log-field "Bad Field"`, + }, + { + name: "duplicate slow log field", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +slow-log-field = "Slow_meta" + +[[keyspace-observability.fields]] +source = "meta_b" +slow-log-field = "Slow_meta" +`, + err: `duplicated slow-log-field "Slow_meta"`, + }, + { + name: "reserved stmt log field", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +stmt-log-field = "digest" +`, + err: `reserved stmt-log-field "digest"`, + }, + { + name: "duplicate stmt log field", + content: ` +[[keyspace-observability.fields]] +source = "meta_a" +stmt-log-field = "stmt_meta" + +[[keyspace-observability.fields]] +source = "meta_b" +stmt-log-field = "stmt_meta" +`, + err: `duplicated stmt-log-field "stmt_meta"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := NewConfig() + _, err := toml.Decode(tt.content, conf) + require.NoError(t, err) + require.ErrorContains(t, conf.KeyspaceObservability.Valid(), tt.err) + }) + } + + conf := NewConfig() + _, err := toml.Decode(` +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "keyspace_meta_label_a" +`, conf) + require.NoError(t, err) + require.ErrorContains(t, conf.Valid(), "keyspace-observability.fields can only be configured when deploy-mode is starter") +} + func TestRemovedVariableCheck(t *testing.T) { configTest := []struct { options string @@ -1112,6 +1307,17 @@ dxf-resource-limit = 101`), 0644)) require.Equal(t, deploymode.Starter, conf.DeployMode) require.NoError(t, conf.Valid()) + require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "starter" + +[[keyspace-observability.fields]] +source = "meta_a" +metric-label = "keyspace_meta_label_a" +`), 0644)) + conf = NewConfig() + require.NoError(t, conf.Load(configFile)) + require.Equal(t, deploymode.Starter, conf.DeployMode) + require.NoError(t, conf.Valid()) + require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "unknown"`), 0644)) conf = NewConfig() require.ErrorContains(t, conf.Load(configFile), `invalid deploy mode "unknown"`) diff --git a/pkg/keyspace/BUILD.bazel b/pkg/keyspace/BUILD.bazel index 72ad3ccff1a6c..65243270ef1c1 100644 --- a/pkg/keyspace/BUILD.bazel +++ b/pkg/keyspace/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//pkg/config", "//pkg/config/kerneltype", + "@com_github_pingcap_kvproto//pkg/keyspacepb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_zap//:zap", @@ -24,10 +25,13 @@ go_test( srcs = ["keyspace_test.go"], embed = [":keyspace"], flaky = True, - shard_count = 2, + shard_count = 3, deps = [ "//pkg/config", "//pkg/config/kerneltype", + "@com_github_pingcap_kvproto//pkg/keyspacepb", "@com_github_stretchr_testify//require", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zaptest/observer", ], ) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 86eba95040998..1207bd30c6fab 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -18,6 +18,7 @@ import ( "fmt" "sync" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/config/kerneltype" @@ -61,6 +62,7 @@ func GetKeyspaceNameBySettings() (keyspaceName string) { var keyspaceNameBytes []byte var genKeyspaceNameOnce sync.Once +var keyspaceMeta sync.Map // GetKeyspaceNameBytesBySettings is used to get keyspace name setting as a byte slice. func GetKeyspaceNameBytesBySettings() []byte { @@ -80,12 +82,33 @@ func IsKeyspaceNameEmpty(keyspaceName string) bool { return keyspaceName == "" } +// SetKeyspaceMeta stores keyspace metadata loaded at startup for later reuse. +func SetKeyspaceMeta(meta *keyspacepb.KeyspaceMeta) { + if meta == nil || IsKeyspaceNameEmpty(meta.GetName()) { + return + } + keyspaceMeta.Store(meta.GetName(), meta) +} + +// GetKeyspaceMeta returns keyspace metadata loaded at startup. +func GetKeyspaceMeta(keyspaceName string) (*keyspacepb.KeyspaceMeta, bool) { + meta, ok := keyspaceMeta.Load(keyspaceName) + if !ok { + return nil, false + } + return meta.(*keyspacepb.KeyspaceMeta), true +} + // WrapZapcoreWithKeyspace is used to wrap zapcore.Core. func WrapZapcoreWithKeyspace() zap.Option { return zap.WrapCore(func(core zapcore.Core) zapcore.Core { keyspaceName := GetKeyspaceNameBySettings() if !IsKeyspaceNameEmpty(keyspaceName) { - core = core.With([]zap.Field{zap.String("keyspaceName", keyspaceName)}) + fields := []zap.Field{zap.String("keyspaceName", keyspaceName)} + if meta, ok := GetKeyspaceMeta(keyspaceName); ok { + fields = append(fields, zap.Uint32("keyspaceID", meta.GetId())) + } + core = core.With(fields) } return core }) diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index 50e64bd4b032d..b6a205b6cd45d 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -18,9 +18,12 @@ import ( "sync" "testing" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) func TestSetKeyspaceNameInConf(t *testing.T) { @@ -71,6 +74,41 @@ func TestNoKeyspaceNameSet(t *testing.T) { } } +func TestSetKeyspaceMeta(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + t.Cleanup(func() { + keyspaceMeta.Delete("test_keyspace") + }) + + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceName = "" + }) + + meta := &keyspacepb.KeyspaceMeta{Id: 42, Name: "test_keyspace"} + SetKeyspaceMeta(meta) + + got, ok := GetKeyspaceMeta("test_keyspace") + require.True(t, ok) + require.Equal(t, uint32(42), got.GetId()) + require.Equal(t, "test_keyspace", got.GetName()) + + SetKeyspaceMeta(nil) + _, ok = GetKeyspaceMeta("") + require.False(t, ok) + + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceName = "test_keyspace" + }) + core, logs := observer.New(zap.InfoLevel) + logger := zap.New(core, WrapZapcoreWithKeyspace()) + logger.Info("test") + entries := logs.All() + require.Len(t, entries, 1) + require.Equal(t, "test_keyspace", entries[0].ContextMap()["keyspaceName"]) + require.Equal(t, uint32(42), entries[0].ContextMap()["keyspaceID"]) +} + func BenchmarkGetKeyspaceNameBytesBySettings(b *testing.B) { if !kerneltype.IsNextGen() { b.Skip("NextGen is not enabled, skipping benchmark") diff --git a/pkg/metrics/common/wrapper.go b/pkg/metrics/common/wrapper.go index f668ceb7c7458..d9ab537bffb83 100644 --- a/pkg/metrics/common/wrapper.go +++ b/pkg/metrics/common/wrapper.go @@ -53,6 +53,14 @@ func SetConstLabels(kv ...string) { } } +// SetConstLabelsFromMap sets constant labels for metrics from a map. +func SetConstLabelsFromMap(labels map[string]string) { + constLabels = make(prometheus.Labels, len(labels)) + for k, v := range labels { + constLabels[strings.ToLower(k)] = v + } +} + // NewCounter wraps a prometheus.NewCounter. func NewCounter(opts prometheus.CounterOpts) prometheus.Counter { opts.ConstLabels = constLabels diff --git a/pkg/sessionctx/variable/slow_log.go b/pkg/sessionctx/variable/slow_log.go index 7db2b4b2340b9..c4391885c6857 100644 --- a/pkg/sessionctx/variable/slow_log.go +++ b/pkg/sessionctx/variable/slow_log.go @@ -29,6 +29,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx/slowlogrule" @@ -589,6 +590,9 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string { if logItems.PrevStmt != "" { writeSlowLogItem(&buf, SlowLogPrevStmt, logItems.PrevStmt) } + for _, field := range config.GetGlobalConfig().GetKeyspaceObservabilitySlowLogFields() { + writeSlowLogItem(&buf, field.Key, field.Value) + } if s.CurrentDBChanged { fmt.Fprintf(&buf, "use %s;\n", strings.ToLower(s.CurrentDB)) diff --git a/pkg/sessionctx/variable/tests/session_test.go b/pkg/sessionctx/variable/tests/session_test.go index ffa0562724f8e..a77c4ed466e15 100644 --- a/pkg/sessionctx/variable/tests/session_test.go +++ b/pkg/sessionctx/variable/tests/session_test.go @@ -385,6 +385,20 @@ func TestSlowLogFormat(t *testing.T) { // Restore for subsequent assertions. logItems.SessionConnectAttrs = nil + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceObservability = config.KeyspaceObservability{ + Fields: []config.KeyspaceObservabilityField{{ + Source: "meta_a", + SlowLogField: "Slow_meta_a", + }}, + } + require.NoError(t, conf.ResolveKeyspaceObservability(map[string]string{"meta_a": "value_a"})) + }) + logString = seVar.SlowLogFormat(logItems) + require.Equal(t, resultFields+"\n"+"# Slow_meta_a: value_a\n"+sql, logString) + // test PrepareSlowLogItemsForRules and CompleteSlowLogItemsForRules seVar.SlowLogRules = slowlogrule.NewSessionSlowLogRules(&slowlogrule.SlowLogRules{ Fields: map[string]struct{}{ diff --git a/pkg/store/driver/BUILD.bazel b/pkg/store/driver/BUILD.bazel index 13a574bf66695..d7dd84b06e416 100644 --- a/pkg/store/driver/BUILD.bazel +++ b/pkg/store/driver/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/kv", "//pkg/metrics", + "//pkg/metrics/common", "//pkg/sessionctx/variable", "//pkg/store/copr", "//pkg/store/driver/error", @@ -49,6 +50,7 @@ go_test( shard_count = 8, deps = [ "//pkg/kv", + "//pkg/metrics/common", "//pkg/session", "//pkg/store/mockstore", "//pkg/testkit", @@ -64,6 +66,7 @@ go_test( "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_pd_client//opt", "@org_uber_go_goleak//:goleak", ], ) diff --git a/pkg/store/driver/config_test.go b/pkg/store/driver/config_test.go index ab78573ee8a03..07fa0fdbc6633 100644 --- a/pkg/store/driver/config_test.go +++ b/pkg/store/driver/config_test.go @@ -17,8 +17,10 @@ package driver import ( "testing" + metricscommon "github.com/pingcap/tidb/pkg/metrics/common" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/config" + "github.com/tikv/pd/client/opt" ) func TestSetDefaultAndOptions(t *testing.T) { @@ -34,4 +36,14 @@ func TestSetDefaultAndOptions(t *testing.T) { require.Equal(t, globalConfig.TxnLocalLatches, d.txnLocalLatches) require.Equal(t, globalConfig.PDClient, d.pdConfig) require.Equal(t, origSecurity, config.GetGlobalConfig().Security) + + metricscommon.SetConstLabels("keyspace_id", "42", "keyspace_name", "ks") + t.Cleanup(func() { + metricscommon.SetConstLabels() + }) + pdOpt := opt.NewOption() + for _, apply := range d.pdClientOptions() { + apply(pdOpt) + } + require.Equal(t, metricscommon.GetConstLabels(), pdOpt.MetricsLabels) } diff --git a/pkg/store/driver/tikv_driver.go b/pkg/store/driver/tikv_driver.go index 9ca2772fca115..d61fa2c2d2200 100644 --- a/pkg/store/driver/tikv_driver.go +++ b/pkg/store/driver/tikv_driver.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" + metricscommon "github.com/pingcap/tidb/pkg/metrics/common" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/store/copr" derr "github.com/pingcap/tidb/pkg/store/driver/error" @@ -164,17 +165,7 @@ func (d *TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore k CertPath: d.security.ClusterSSLCert, KeyPath: d.security.ClusterSSLKey, }, - opt.WithGRPCDialOptions( - // keep the same with etcd, see - // https://github.com/etcd-io/etcd/blob/5704c6148d798ea444db26a966394406d8c10526/server/etcdserver/api/v3rpc/grpc.go#L34 - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Duration(d.tikvConfig.GrpcKeepAliveTime) * time.Second, - Timeout: time.Duration(d.tikvConfig.GrpcKeepAliveTimeout) * time.Second, - }), - ), - opt.WithCustomTimeoutOption(time.Duration(d.pdConfig.PDServerTimeout)*time.Second), - opt.WithForwardingOption(config.GetGlobalConfig().EnableForwarding)) + d.pdClientOptions()...) if err != nil { return nil, errors.Trace(err) } @@ -253,6 +244,26 @@ func (d *TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore k return store, nil } +func (d *TiKVDriver) pdClientOptions() []opt.ClientOption { + opts := []opt.ClientOption{ + opt.WithGRPCDialOptions( + // keep the same with etcd, see + // https://github.com/etcd-io/etcd/blob/5704c6148d798ea444db26a966394406d8c10526/server/etcdserver/api/v3rpc/grpc.go#L34 + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(d.tikvConfig.GrpcKeepAliveTime) * time.Second, + Timeout: time.Duration(d.tikvConfig.GrpcKeepAliveTimeout) * time.Second, + }), + ), + opt.WithCustomTimeoutOption(time.Duration(d.pdConfig.PDServerTimeout) * time.Second), + opt.WithForwardingOption(config.GetGlobalConfig().EnableForwarding), + } + if labels := metricscommon.GetConstLabels(); len(labels) > 0 { + opts = append(opts, opt.WithMetricsLabels(labels)) + } + return opts +} + type tikvStore struct { *tikv.KVStore etcdAddrs []string diff --git a/pkg/util/metricsutil/BUILD.bazel b/pkg/util/metricsutil/BUILD.bazel index ac57d68fa038b..1640bb09e51e4 100644 --- a/pkg/util/metricsutil/BUILD.bazel +++ b/pkg/util/metricsutil/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "metricsutil", @@ -29,9 +29,21 @@ go_library( "//pkg/util", "//pkg/util/topsql/reporter/metrics", "@com_github_pingcap_kvproto//pkg/keyspacepb", - "@com_github_tikv_client_go_v2//config", "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//opt", "@com_github_tikv_pd_client//pkg/caller", ], ) + +go_test( + name = "metricsutil_test", + timeout = "short", + srcs = ["common_test.go"], + embed = [":metricsutil"], + flaky = True, + deps = [ + "//pkg/config", + "//pkg/metrics/common", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/util/metricsutil/common.go b/pkg/util/metricsutil/common.go index 42e38e31cfff3..76c0cb2f0f091 100644 --- a/pkg/util/metricsutil/common.go +++ b/pkg/util/metricsutil/common.go @@ -17,6 +17,7 @@ package metricsutil import ( "context" "fmt" + "maps" "time" "github.com/pingcap/kvproto/pkg/keyspacepb" @@ -42,7 +43,6 @@ import ( ttlmetrics "github.com/pingcap/tidb/pkg/ttl/metrics" "github.com/pingcap/tidb/pkg/util" topsqlreporter_metrics "github.com/pingcap/tidb/pkg/util/topsql/reporter/metrics" - tikvconfig "github.com/tikv/client-go/v2/config" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/caller" @@ -50,52 +50,21 @@ import ( var componentName = caller.Component("tidb-metrics-util") -// RegisterMetrics register metrics with const label 'keyspace_id' if keyspaceName set. +const defaultKeyspaceLabel = "keyspace_id" + +// RegisterMetrics registers metrics with keyspace metadata labels when available. func RegisterMetrics() error { cfg := config.GetGlobalConfig() - if keyspace.IsKeyspaceNameEmpty(cfg.KeyspaceName) || cfg.Store != config.StoreTypeTiKV { - registerMetrics(nil) // register metrics without label 'keyspace_id'. - return nil - } - - if kerneltype.IsNextGen() { + if !keyspace.IsKeyspaceNameEmpty(cfg.KeyspaceName) && kerneltype.IsNextGen() { metricscommon.SetConstLabels("keyspace_name", cfg.KeyspaceName) } - - pdAddrs, _, _, err := tikvconfig.ParsePath("tikv://" + cfg.Path) - if err != nil { - return err - } - - timeoutSec := time.Duration(cfg.PDClient.PDServerTimeout) * time.Second - // Note: for NextGen, we need to use the side effect of `NewClient` to init the metrics' builtin const labels - pdCli, err := pd.NewClient(componentName, pdAddrs, pd.SecurityOption{ - CAPath: cfg.Security.ClusterSSLCA, - CertPath: cfg.Security.ClusterSSLCert, - KeyPath: cfg.Security.ClusterSSLKey, - }, opt.WithCustomTimeoutOption(timeoutSec), opt.WithMetricsLabels(metricscommon.GetConstLabels())) - if err != nil { - return err - } - defer pdCli.Close() - - if kerneltype.IsNextGen() { - registerMetrics(nil) // metrics' const label already set - } else { - keyspaceMeta, err := getKeyspaceMeta(pdCli, cfg.KeyspaceName) - if err != nil { - return err - } - registerMetrics(keyspaceMeta) - } - return nil + return registerMetrics() } -// RegisterMetricsForBR register metrics with const label keyspace_id for BR. +// RegisterMetricsForBR registers metrics with keyspace metadata labels for BR. func RegisterMetricsForBR(pdAddrs []string, tls task.TLSConfig, keyspaceName string) error { if keyspace.IsKeyspaceNameEmpty(keyspaceName) { - registerMetrics(nil) // register metrics without label 'keyspace_id'. - return nil + return registerMetrics() } if kerneltype.IsNextGen() { @@ -107,24 +76,19 @@ func RegisterMetricsForBR(pdAddrs []string, tls task.TLSConfig, keyspaceName str if tls.IsEnabled() { securityOpt = tls.ToPDSecurityOption() } - // Note: for NextGen, pdCli is created to init the metrics' const labels pdCli, err := pd.NewClient(componentName, pdAddrs, securityOpt, - opt.WithCustomTimeoutOption(timeoutSec), opt.WithMetricsLabels(metricscommon.GetConstLabels())) + opt.WithCustomTimeoutOption(timeoutSec), opt.WithInitMetricsOption(false)) if err != nil { return err } defer pdCli.Close() - if kerneltype.IsNextGen() { - registerMetrics(nil) // metrics' const label already set - } else { - keyspaceMeta, err := getKeyspaceMeta(pdCli, keyspaceName) - if err != nil { - return err - } - registerMetrics(keyspaceMeta) + keyspaceMeta, err := getKeyspaceMeta(pdCli, keyspaceName) + if err != nil { + return err } - return nil + setKeyspaceIDConstLabel(keyspaceMeta.GetId()) + return registerMetrics() } func initMetrics() { @@ -150,11 +114,28 @@ func initMetrics() { } } -func registerMetrics(keyspaceMeta *keyspacepb.KeyspaceMeta) { - if keyspaceMeta != nil { - metricscommon.SetConstLabels("keyspace_id", fmt.Sprint(keyspaceMeta.GetId())) +func registerMetrics() error { + labels := cloneConstLabels() + maps.Copy(labels, config.GetGlobalConfig().GetKeyspaceObservabilityMetricLabels()) + if len(labels) > 0 { + metricscommon.SetConstLabelsFromMap(labels) } initMetrics() + return nil +} + +func cloneConstLabels() map[string]string { + labels := maps.Clone(metricscommon.GetConstLabels()) + if labels == nil { + labels = make(map[string]string) + } + return labels +} + +func setKeyspaceIDConstLabel(keyspaceID uint32) { + labels := cloneConstLabels() + labels[defaultKeyspaceLabel] = fmt.Sprint(keyspaceID) + metricscommon.SetConstLabelsFromMap(labels) } func getKeyspaceMeta(pdCli pd.Client, keyspaceName string) (*keyspacepb.KeyspaceMeta, error) { diff --git a/pkg/util/metricsutil/common_test.go b/pkg/util/metricsutil/common_test.go new file mode 100644 index 0000000000000..a25ae826eeeb3 --- /dev/null +++ b/pkg/util/metricsutil/common_test.go @@ -0,0 +1,53 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsutil + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/config" + metricscommon "github.com/pingcap/tidb/pkg/metrics/common" + "github.com/stretchr/testify/require" +) + +func TestRegisterMetricsWithKeyspaceObservabilityValues(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + t.Cleanup(func() { + metricscommon.SetConstLabels() + }) + + labels := cloneConstLabels() + labels["label_a"] = "value_a" + require.Equal(t, "value_a", labels["label_a"]) + + metricscommon.SetConstLabels("base_label", "base_value") + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceObservabilityValues = config.KeyspaceObservabilityValues{ + MetricLabels: map[string]string{"label_a": "value_a"}, + } + }) + + require.NoError(t, registerMetrics()) + labels = metricscommon.GetConstLabels() + require.Equal(t, "base_value", labels["base_label"]) + require.Equal(t, "value_a", labels["label_a"]) + + metricscommon.SetConstLabels("keyspace_name", "ks") + setKeyspaceIDConstLabel(42) + labels = metricscommon.GetConstLabels() + require.Equal(t, "ks", labels["keyspace_name"]) + require.Equal(t, "42", labels["keyspace_id"]) +} diff --git a/pkg/util/stmtsummary/v2/BUILD.bazel b/pkg/util/stmtsummary/v2/BUILD.bazel index 1abfaf8d31663..db4ef2262fcd0 100644 --- a/pkg/util/stmtsummary/v2/BUILD.bazel +++ b/pkg/util/stmtsummary/v2/BUILD.bazel @@ -51,6 +51,7 @@ go_test( flaky = True, shard_count = 15, deps = [ + "//pkg/config", "//pkg/meta/model", "//pkg/metrics", "//pkg/parser/ast", diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 64c3499c6ba28..f3610d06c411b 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" "go.uber.org/zap/buffer" @@ -71,7 +72,7 @@ func (s *stmtLogStorage) sync() error { } func (s *stmtLogStorage) log(r *StmtRecord) { - b, err := json.Marshal(r) + b, err := marshalStmtRecord(r) if err != nil { logutil.BgLogger().Warn("failed to marshal statement summary", zap.Error(err)) return @@ -79,6 +80,32 @@ func (s *stmtLogStorage) log(r *StmtRecord) { s.logger.Info(string(b)) } +func marshalStmtRecord(r *StmtRecord) ([]byte, error) { + fields := config.GetGlobalConfig().GetKeyspaceObservabilityStmtLogFields() + if len(fields) == 0 { + return json.Marshal(r) + } + b, err := json.Marshal(r) + if err != nil { + return nil, err + } + if !json.Valid(b) || len(b) < 2 || b[0] != '{' || b[len(b)-1] != '}' { + return b, nil + } + items := make(map[string]json.RawMessage) + if err := json.Unmarshal(b, &items); err != nil { + return nil, err + } + for _, field := range fields { + value, err := json.Marshal(field.Value) + if err != nil { + return nil, err + } + items[field.Key] = value + } + return json.Marshal(items) +} + type stmtLogEncoder struct{} func (*stmtLogEncoder) EncodeEntry(entry zapcore.Entry, _ []zapcore.Field) (*buffer.Buffer, error) { diff --git a/pkg/util/stmtsummary/v2/record_test.go b/pkg/util/stmtsummary/v2/record_test.go index fbb65feb5468c..9b3700497b4f9 100644 --- a/pkg/util/stmtsummary/v2/record_test.go +++ b/pkg/util/stmtsummary/v2/record_test.go @@ -15,8 +15,10 @@ package stmtsummary import ( + "encoding/json" "testing" + "github.com/pingcap/tidb/pkg/config" "github.com/stretchr/testify/require" ) @@ -83,4 +85,22 @@ func TestStmtRecord(t *testing.T) { require.Equal(t, info.TotalRUV2*2, record2.SumRUV2) require.Equal(t, info.CPUUsages.TidbCPUTime*2, record2.SumTidbCPU) require.Equal(t, info.CPUUsages.TikvCPUTime*2, record2.SumTikvCPU) + + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceObservability = config.KeyspaceObservability{ + Fields: []config.KeyspaceObservabilityField{{ + Source: "meta_a", + StmtLogField: "stmt_meta_a", + }}, + } + require.NoError(t, conf.ResolveKeyspaceObservability(map[string]string{"meta_a": "value_a"})) + }) + b, err := marshalStmtRecord(record2) + require.NoError(t, err) + items := make(map[string]any) + require.NoError(t, json.Unmarshal(b, &items)) + require.Equal(t, "value_a", items["stmt_meta_a"]) + require.Equal(t, record2.Digest, items["digest"]) }