Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ go_test(
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testmain",
"//pkg/testkit/testsetup",
"//pkg/testkit/testutil",
"//pkg/types",
"//pkg/util",
"//pkg/util/benchdaily",
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/adapter_slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func SetSlowLogItems(a *ExecStmt, txnTS uint64, hasMoreResults bool, items *vari
items.CPUUsages = sessVars.SQLCPUUsages.GetCPUUsages()
items.StorageKV = stmtCtx.IsTiKV.Load()
items.StorageMPP = stmtCtx.IsTiFlash.Load()
if sessVars.ConnectionInfo != nil && len(sessVars.ConnectionInfo.Attributes) > 0 {
items.SessionConnectAttrs = sessVars.ConnectionInfo.Attributes
}

if a.retryCount > 0 {
items.ExecRetryTime = items.TimeTotal - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
Expand Down
36 changes: 36 additions & 0 deletions pkg/executor/cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testutil"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -354,3 +355,38 @@ func removeFiles(t *testing.T, fileNames []string) {
require.NoError(t, os.Remove(fileName))
}
}

func TestClusterTableSlowQuerySessionConnectAttrs(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := createRPCServer(t, dom)
defer srv.Stop()

logData := `
# Time: 2024-01-15T10:00:00.000000+08:00
# Txn_start_ts: 123456789
# User@Host: root[root] @ localhost [127.0.0.1]
# Query_time: 0.5
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Is_internal: false
# Succ: true
` + testutil.DefaultSessionConnectAttrsSlowLogLine() + `
select * from t;`
fileName := "tidb-slow-query-attrs.log"
prepareLogs(t, []string{logData}, []string{fileName})
defer removeFiles(t, []string{fileName})

defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.Log.SlowQueryFile = fileName
})

tk := testkit.NewTestKit(t, store)
tk.MustExec("use information_schema")

// Verify Session_connect_attrs column is present in cluster_slow_query as well.
clusterRows := tk.MustQuery("select Session_connect_attrs from information_schema.cluster_slow_query " +
"where time > '2024-01-01 00:00:00' and query = 'select * from t;'").Rows()
require.Len(t, clusterRows, 1)
clusterAttrsStr := clusterRows[0][0].(string)
testutil.RequireContainsDefaultSessionConnectAttrs(t, clusterAttrsStr)
}
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func TestColumnTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|tbl1|col_2"))
tk.MustQuery(`select count(*) from information_schema.columns;`).Check(
testkit.RowsWithSep("|", "5017"))
testkit.RowsWithSep("|", "5019"))
}

func TestIndexUsageTable(t *testing.T) {
Expand Down
20 changes: 19 additions & 1 deletion pkg/executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
startTime := time.Now()
var logs [][]string
var err error
if !e.extractor.Desc {
if e.extractor == nil || !e.extractor.Desc {
logs, err = e.getBatchLog(ctx, reader, &offset, logNum)
} else {
logs, err = e.getBatchLogForReversedScan(ctx, reader, &offset, logNum)
Expand Down Expand Up @@ -703,6 +703,12 @@ func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Conte
} else if strings.HasPrefix(line, variable.SlowLogWarnings) {
line = line[len(variable.SlowLogWarnings+variable.SlowLogSpaceMarkStr):]
valid = e.setColumnValue(sctx, row, tz, variable.SlowLogWarnings, line, e.checker, fileLine)
} else if strings.HasPrefix(line, variable.SlowLogSessionConnectAttrs+variable.SlowLogSpaceMarkStr) {
line = line[len(variable.SlowLogSessionConnectAttrs+variable.SlowLogSpaceMarkStr):]
valid = e.setColumnValue(sctx, row, tz, variable.SlowLogSessionConnectAttrs, line, e.checker, fileLine)
} else if strings.HasPrefix(line, variable.SlowLogDBStr+variable.SlowLogSpaceMarkStr) {
line = line[len(variable.SlowLogDBStr+variable.SlowLogSpaceMarkStr):]
valid = e.setColumnValue(sctx, row, tz, variable.SlowLogDBStr, line, e.checker, fileLine)
} else {
fields, values := splitByColon(line)
for i := 0; i < len(fields); i++ {
Expand Down Expand Up @@ -885,6 +891,18 @@ func getColumnValueFactoryByName(colName string, columnIdx int) (slowQueryColumn
row[columnIdx] = types.NewDatum(v)
return true, nil
}, nil
case variable.SlowLogSessionConnectAttrs:
return func(row []types.Datum, value string, _ *time.Location, _ *slowLogChecker) (valid bool, err error) {
if len(value) == 0 {
return true, nil
}
bj, err := types.ParseBinaryJSONFromString(value)
if err != nil {
return false, err
}
row[columnIdx] = types.NewDatum(bj)
return true, nil
}, nil
}
return nil, nil
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/executor/slow_query_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testdata"
"github.com/pingcap/tidb/pkg/testkit/testutil"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -540,3 +541,98 @@ func TestStorageEnginesInSlowQuery(t *testing.T) {
"where query like 'select%tablesample%;'").
Check(testkit.Rows("1 0"))
}

func TestSessionConnectAttrsInSlowQuery(t *testing.T) {
originCfg := config.GetGlobalConfig()
newCfg := *originCfg
f, err := os.CreateTemp("", "tidb-slow-*.log")
require.NoError(t, err)
_, err = f.WriteString(`# Time: 2024-01-15T10:00:00.000000+08:00
# Txn_start_ts: 123456789
# User@Host: root[root] @ localhost [127.0.0.1]
# Query_time: 0.5
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Is_internal: false
# Succ: true
` + testutil.DefaultSessionConnectAttrsSlowLogLine() + `
select * from t;
`)
require.NoError(t, err)
require.NoError(t, f.Close())
newCfg.Log.SlowQueryFile = f.Name()
config.StoreGlobalConfig(&newCfg)
defer func() {
config.StoreGlobalConfig(originCfg)
require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile))
}()
require.NoError(t, logutil.InitLogger(newCfg.Log.ToLogConfig()))
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@time_zone='+08:00'")
tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name()))

// Verify Session_connect_attrs column is present and returns the correct JSON value.
rows := tk.MustQuery("select Session_connect_attrs from information_schema.slow_query " +
"where query = 'select * from t;'").Rows()
require.Len(t, rows, 1)
attrsStr := rows[0][0].(string)
testutil.RequireContainsDefaultSessionConnectAttrs(t, attrsStr)

// Verify individual keys are accessible via JSON_EXTRACT.
tk.MustQuery("select JSON_EXTRACT(Session_connect_attrs, '$._client_name') from information_schema.slow_query " +
"where query = 'select * from t;'").
Check(testkit.Rows(`"Go-MySQL-Driver"`))
tk.MustQuery("select JSON_EXTRACT(Session_connect_attrs, '$.app_name') from information_schema.slow_query " +
"where query = 'select * from t;'").
Check(testkit.Rows(`"test_app"`))
}

func TestSessionConnectAttrsMissingAndTruncatedInSlowQuery(t *testing.T) {
originCfg := config.GetGlobalConfig()
newCfg := *originCfg
f, err := os.CreateTemp("", "tidb-slow-*.log")
require.NoError(t, err)
_, err = f.WriteString(`# Time: 2024-01-15T10:00:00.000000+08:00
# Txn_start_ts: 123456789
# User@Host: root[root] @ localhost [127.0.0.1]
# Query_time: 0.5
# Digest: 1111111111111111111111111111111111111111111111111111111111111111
# Is_internal: false
# Succ: true
select * from t_no_attrs;
# Time: 2024-01-15T10:00:01.000000+08:00
# Txn_start_ts: 123456790
# User@Host: root[root] @ localhost [127.0.0.1]
# Query_time: 0.6
# Digest: 2222222222222222222222222222222222222222222222222222222222222222
# Is_internal: false
# Succ: true
# Session_connect_attrs: {"_truncated":"4","app_name":"trunc_case"}
select * from t_truncated;
`)
require.NoError(t, err)
require.NoError(t, f.Close())
newCfg.Log.SlowQueryFile = f.Name()
config.StoreGlobalConfig(&newCfg)
defer func() {
config.StoreGlobalConfig(originCfg)
require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile))
}()
require.NoError(t, logutil.InitLogger(newCfg.Log.ToLogConfig()))
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@time_zone='+08:00'")
tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name()))

// Missing Session_connect_attrs should parse to JSON null-like empty behavior.
tk.MustQuery("select Session_connect_attrs = cast('null' as json), JSON_EXTRACT(Session_connect_attrs, '$._truncated') is null from information_schema.slow_query " +
"where query = 'select * from t_no_attrs;' ").
Check(testkit.Rows("1 1"))

// Truncation metadata key should be preserved and queryable from JSON.
tk.MustQuery("select JSON_UNQUOTE(JSON_EXTRACT(Session_connect_attrs, '$._truncated')), JSON_UNQUOTE(JSON_EXTRACT(Session_connect_attrs, '$.app_name')) from information_schema.slow_query " +
"where query = 'select * from t_truncated;' ").
Check(testkit.Rows("4 trunc_case"))
}
56 changes: 54 additions & 2 deletions pkg/executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit/testutil"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -178,7 +179,7 @@ select * from t;`
`0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,65536,0,0,0,0,0,,` +
`Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` +
`0,0,1,0,1,1,0,default,2.158,2.123,0.05,0.01,0.021,1,1,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` +
`,update t set i = 1;,select * from t;`
`,update t set i = 1;,null,select * from t;`
require.Equal(t, expectRecordString, recordString)

// Issue 20928
Expand All @@ -201,7 +202,7 @@ select * from t;`
`0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,65536,0,0,0,0,0,,` +
`Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` +
`0,0,1,0,1,1,0,default,2.158,2.123,0.05,0.01,0.021,1,1,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` +
`,update t set i = 1;,select * from t;`
`,update t set i = 1;,null,select * from t;`
require.Equal(t, expectRecordString, recordString)

// fix sql contain '# ' bug
Expand Down Expand Up @@ -254,6 +255,57 @@ select * from t;
require.Equal(t, warnings[0].Err.Error(), "Parse slow log at line 2, failed field is Succ, failed value is abc, error is strconv.ParseBool: parsing \"abc\": invalid syntax")
}

func TestParseSlowLogSessionConnectAttrs(t *testing.T) {
// Slow log entry that includes Session_connect_attrs JSON.
slowLogStr := `# Time: 2019-04-28T15:24:04.309074+08:00
# Txn_start_ts: 405888132465033227
# User@Host: root[root] @ localhost [127.0.0.1]
# Query_time: 0.216905
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Is_internal: false
# Succ: true
` + testutil.DefaultSessionConnectAttrsSlowLogLine() + `
# Prev_stmt: begin;
select * from t;
`
loc, err := time.LoadLocation("Asia/Shanghai")
require.NoError(t, err)
ctx := mock.NewContext()
ctx.ResetSessionAndStmtTimeZone(loc)

// Use the retriever directly (without initialize) to avoid reading
// from actual slow log files on disk, which can produce extra rows.
retriever, err := newSlowQueryRetriever()
require.NoError(t, err)
retriever.columnValueFactoryMap = make(map[string]slowQueryColumnValueFactory, len(retriever.outputCols))
for idx, col := range retriever.outputCols {
factory, err := getColumnValueFactoryByName(col.Name.O, idx)
require.NoError(t, err)
require.NotNil(t, factory, "column %s should have a factory", col.Name.O)
retriever.columnValueFactoryMap[col.Name.O] = factory
}

reader := bufio.NewReader(bytes.NewBufferString(slowLogStr))
rows, err := parseLog(retriever, ctx, reader)
require.NoError(t, err)
require.Len(t, rows, 1)

// Find the Session_connect_attrs column.
colIdx := -1
for i, col := range retriever.outputCols {
if col.Name.L == strings.ToLower(variable.SlowLogSessionConnectAttrs) {
colIdx = i
break
}
}
require.NotEqual(t, -1, colIdx, "Session_connect_attrs column should exist")

// Verify the parsed JSON contains the expected keys.
bj := rows[0][colIdx].GetMysqlJSON()
bjStr := bj.String()
testutil.RequireContainsDefaultSessionConnectAttrs(t, bjStr)
}

// It changes variable.MaxOfMaxAllowedPacket, so must be stayed in SerialSuite.
func TestParseSlowLogFileSerial(t *testing.T) {
loc, err := time.LoadLocation("Asia/Shanghai")
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ var slowQueryCols = []columnInfo{
{name: variable.SlowLogPlanDigest, tp: mysql.TypeVarchar, size: 128},
{name: variable.SlowLogBinaryPlan, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
{name: variable.SlowLogPrevStmt, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
{name: variable.SlowLogSessionConnectAttrs, tp: mysql.TypeJSON, size: types.UnspecifiedLength},
{name: variable.SlowLogQuerySQLStr, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
}

Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/test/clustertablestest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_test(
"//pkg/testkit",
"//pkg/testkit/external",
"//pkg/testkit/testsetup",
"//pkg/testkit/testutil",
"//pkg/types",
"//pkg/util",
"//pkg/util/dbterror/exeerrors",
Expand Down
50 changes: 50 additions & 0 deletions pkg/infoschema/test/clustertablestest/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testutil"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -293,6 +294,55 @@ func TestSelectClusterTable(t *testing.T) {
tk.MustQuery("select instance from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testkit.Rows(instanceAddr))
}

func TestClusterSlowQuerySessionConnectAttrs(t *testing.T) {
// setup suite
s := new(clusterTablesSuite)
s.store, s.dom = testkit.CreateMockStoreAndDomain(t)
s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil)
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()
defer s.rpcserver.Stop()

f, err := os.CreateTemp("", "tidb-cluster-slow-*.log")
require.NoError(t, err)
_, err = f.WriteString(`# Time: 2024-01-15T10:00:00.000000+08:00
# Txn_start_ts: 123456789
# User@Host: root[root] @ localhost [127.0.0.1]
# Query_time: 0.5
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Is_internal: false
# Succ: true
` + testutil.DefaultSessionConnectAttrsSlowLogLine() + `
select * from t;
`)
require.NoError(t, err)
require.NoError(t, f.Close())
defer func() { require.NoError(t, os.Remove(f.Name())) }()

defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.Log.SlowQueryFile = f.Name()
})

tk := s.newTestKitWithRoot(t)
tk.MustExec("use information_schema")
tk.MustExec("set time_zone = '+08:00';")

rows := tk.MustQuery("select Session_connect_attrs from information_schema.cluster_slow_query " +
"where query = 'select * from t;'").Rows()
require.Len(t, rows, 1)
attrsStr := rows[0][0].(string)
testutil.RequireContainsDefaultSessionConnectAttrs(t, attrsStr)

tk.MustQuery("select JSON_EXTRACT(Session_connect_attrs, '$._client_name') from information_schema.cluster_slow_query " +
"where query = 'select * from t;'").
Check(testkit.Rows(`"Go-MySQL-Driver"`))
tk.MustQuery("select JSON_EXTRACT(Session_connect_attrs, '$.app_name') from information_schema.cluster_slow_query " +
"where query = 'select * from t;'").
Check(testkit.Rows(`"test_app"`))
}

func TestSelectClusterTablePrivilege(t *testing.T) {
// setup suite
s := new(clusterTablesSuite)
Expand Down
Loading