Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
github.com/prometheus/procfs v0.19.2
github.com/sirupsen/logrus v1.9.4
Expand Down Expand Up @@ -80,7 +81,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
17 changes: 14 additions & 3 deletions pkg/metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func registerCollector(collector string, factory collectorFactoryFunc, relatedDr
type Collector interface {
// Get new metrics and expose them via prometheus registry.
Update(ch chan<- prometheus.Metric) error
Get() []*Metric
}

// CSICollector implements the prometheus.Collector interface.
type CSICollector struct {
Collectors map[string]Collector
}

// newCSICollector method returns the CSICollector object
func newCSICollector(driverNames []string, serviceType utils.ServiceType) {
func initCSICollector(driverNames []string, serviceType utils.ServiceType) {
if csiCollectorInstance != nil {
return
}
Expand All @@ -67,7 +67,7 @@ func newCSICollector(driverNames []string, serviceType utils.ServiceType) {
if enabled {
collector, err := reg.Factory()
if err != nil {
klog.ErrorS(err, "Failed to create collector")
klog.ErrorS(err, "Failed to create collector", "name", reg.Name)
} else {
collectors[reg.Name] = collector
}
Expand Down Expand Up @@ -98,6 +98,17 @@ func (csi CSICollector) Collect(ch chan<- prometheus.Metric) {
wg.Wait()
}

func GetMetrics(driverNames []string, serviceType utils.ServiceType) []*Metric {
if csiCollectorInstance == nil {
initCSICollector(driverNames, serviceType)
}
var metrics []*Metric
for _, c := range csiCollectorInstance.Collectors {
metrics = append(metrics, c.Get()...)
}
return metrics
}

func execute(name string, c Collector, ch chan<- prometheus.Metric) {
defer func() {
if err := recover(); err != nil {
Expand Down
12 changes: 1 addition & 11 deletions pkg/metric/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ const (
//diskDriverName represents the csi storage type name of Disk
diskDriverName string = "diskplugin.csi.alibabacloud.com"
localDriverName string = "localplugin.csi.alibabacloud.com"
// unknown metric value
UnknownValue string = "unknown"
)

const (
Expand All @@ -41,7 +39,6 @@ const (
diskSectorSize = 512
diskDefaultsLatencyThreshold = 10
diskDefaultsCapacityPercentageThreshold = 85
nfsDefaultsCapacityPercentageThreshold = 85
nfsStatsFileName = "/proc/self/mountstats"
latencyTooHigh = "LatencyTooHigh"
capacityNotEnough = "NotEnoughDiskSpace"
Expand Down Expand Up @@ -69,14 +66,7 @@ var (
)

type typedFactorDesc struct {
desc *prometheus.Desc
*MetaDesc
valueType prometheus.ValueType
factor float64
}

func (d *typedFactorDesc) mustNewConstMetric(value float64, labels ...string) prometheus.Metric {
if d.factor != 0 {
value *= d.factor
}
return prometheus.MustNewConstMetric(d.desc, d.valueType, value, labels...)
}
30 changes: 24 additions & 6 deletions pkg/metric/csi_grpc_exec_time_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@ type csiGrpcExecTimeCollector struct {
ExecTimeTotalMetric *prometheus.CounterVec
}

var (
execCountName = "execution_count"
execCountFQName = prometheus.BuildFQName(csiNamespace, grpcSubsystem, execCountName)
execCountHelp = "CSI grpc execution count."

execTimeName = "execution_time_total"
execTimeFQName = prometheus.BuildFQName(csiNamespace, grpcSubsystem, execTimeName)
execTimeHelp = "CSI grpc execution time in total."
)

var CsiGrpcExecTimeCollector = csiGrpcExecTimeCollector{
ExecCountMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: csiNamespace,
Subsystem: grpcSubsystem,
Name: "execution_count",
Help: "CSI grpc execution count.",
Name: execCountName,
Help: execCountHelp,
}, csiGrpcExecTimeLabels),
ExecTimeTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: csiNamespace,
Subsystem: grpcSubsystem,
Name: "execution_time_total",
Help: "CSI grpc execution time in total.",
Name: execTimeName,
Help: execTimeHelp,
}, csiGrpcExecTimeLabels),
}

Expand All @@ -44,9 +54,17 @@ func GetCsiGrpcExecTimeCollector() (Collector, error) {
return &CsiGrpcExecTimeCollector, nil
}

func (c *csiGrpcExecTimeCollector) Get() []*Metric {
countMetrics := extractMetricsFromMetricVec(execCountFQName, execCountHelp, c.ExecCountMetric, prometheus.CounterValue)
timeMetrics := extractMetricsFromMetricVec(execTimeFQName, execTimeHelp, c.ExecTimeTotalMetric, prometheus.CounterValue)
return append(countMetrics, timeMetrics...)
}

func (c *csiGrpcExecTimeCollector) Update(ch chan<- prometheus.Metric) error {
c.ExecCountMetric.Collect(ch)
c.ExecTimeTotalMetric.Collect(ch)
metrics := c.Get()
for _, metric := range metrics {
ch <- prometheus.MustNewConstMetric(metric.Desc, metric.ValueType, metric.Value, convertLabelsToString(metric.VariableLabelPairs)...)
}
return nil
}

Expand Down
104 changes: 56 additions & 48 deletions pkg/metric/disk_stat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/prometheus/client_golang/prometheus"
promdto "github.com/prometheus/client_model/go"
"github.com/prometheus/procfs/blockdevice"
"golang.org/x/sys/unix"
v1 "k8s.io/api/core/v1"
Expand All @@ -22,7 +23,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/mount-utils"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/options"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
)

Expand All @@ -32,17 +32,22 @@ var (
scalerPvcMap *sync.Map
)

func diskMetricDesc(name, help string) *prometheus.Desc {
return prometheus.NewDesc(
prometheus.BuildFQName(nodeNamespace, volumeSubsystem, name),
help,
diskStatLabelNames, diskStatConstLabels,
)
func diskMetricDesc(name, help string) *MetaDesc {
return NewMetaDesc(nodeNamespace, volumeSubsystem, name, help, diskStatLabelNames, diskStatConstLabels)
}

func convertLabelsToString(labelPairs []*promdto.LabelPair) []string {
result := make([]string, len(labelPairs))
for i, pair := range labelPairs {
result[i] = *pair.Value
}
return result
}

// stats from /proc/diskstats
var (
diskReadsCompletedDesc = diskMetricDesc("read_completed_total", "The total number of reads completed successfully.")
diskReadsCompletedDesc = diskMetricDesc("read_completed_total", "The total number of reads completed successfully.")

diskReadsMergeDesc = diskMetricDesc("read_merged_total", "The total number of reads merged.")
diskReadBytesDesc = diskMetricDesc("read_bytes_total", "The total number of bytes read successfully.")
diskReadTimeMilliSecondsDesc = diskMetricDesc("read_time_milliseconds_total", "The total number of milliseconds spent by all reads.")
Expand All @@ -57,9 +62,9 @@ var (
)

type capDescs struct {
Available *prometheus.Desc
Total *prometheus.Desc
Used *prometheus.Desc
Available *MetaDesc
Total *MetaDesc
Used *MetaDesc
}

var capStatDescs = map[csi.VolumeUsage_Unit]capDescs{
Expand Down Expand Up @@ -89,7 +94,7 @@ type diskStatCollector struct {
lastPvDiskInfoMap map[string]diskInfo
lastPvStatsMap atomic.Pointer[map[uint64]*blockdevice.Diskstats]
diskStats *ProcDiskStats
clientSet *kubernetes.Clientset
client kubernetes.Interface
recorder record.EventRecorder
mounter mount.Interface
nodeName string
Expand Down Expand Up @@ -120,12 +125,7 @@ func getDiskCapacityThreshold() float64 {
// NewDiskStatCollector returns a new Collector exposing disk stats.
func NewDiskStatCollector() (Collector, error) {
recorder := utils.NewEventRecorder()
config, err := options.GetRestConfig()
if err != nil {
return nil, err
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
client, err := newK8sClient()
if err != nil {
return nil, err
}
Expand All @@ -140,7 +140,7 @@ func NewDiskStatCollector() (Collector, error) {
return &diskStatCollector{
lastPvDiskInfoMap: make(map[string]diskInfo, 0),
diskStats: diskStats,
clientSet: clientset,
client: client,
milliSecondsLatencyThreshold: getDiskLatencyThreshold(),
capacityPercentageThreshold: getDiskCapacityThreshold(),
recorder: recorder,
Expand All @@ -149,17 +149,17 @@ func NewDiskStatCollector() (Collector, error) {
}, nil
}

func (p *diskStatCollector) Update(ch chan<- prometheus.Metric) error {
//startTime := time.Now()
func (p *diskStatCollector) Get() (metrics []*Metric) {
volJSONPaths, err := findVolJSON(podsRootPath)
if err != nil {
return err
return
}
p.updateMap(&p.lastPvDiskInfoMap, volJSONPaths, diskDriverName)

diskStats, err := p.diskStats.GetStats()
if err != nil {
return fmt.Errorf("couldn't get diskstats: %s", err)
klog.Errorf("couldn't get diskstats: %s", err)
return
}
deviceNameStatsMap := make(map[uint64]*blockdevice.Diskstats, len(diskStats))
for i, s := range diskStats {
Expand Down Expand Up @@ -189,8 +189,8 @@ func (p *diskStatCollector) Update(ch chan<- prometheus.Metric) error {
continue
}
devPath := "/dev/" + stats.DeviceName
labels := []string{info.PVCRef.Namespace, info.PVCRef.Name, devPath}
p.sendDiskStats(&stats.IOStats, labels, ch)
labelValues := []string{info.PVCRef.Namespace, info.PVCRef.Name, devPath}
metrics = append(metrics, p.getDiskStatMetrics(&stats.IOStats, labelValues...)...)
if lastStats, ok := lastStatsMap[info.Dev]; ok {
p.latencyEventAlert(&stats.IOStats, &lastStats.IOStats, info.PVCRef)
}
Expand All @@ -199,16 +199,23 @@ func (p *diskStatCollector) Update(ch chan<- prometheus.Metric) error {
pvName, p.nodeName, info.DiskID, devPath)
}

capStats, err := getDiskCapacityMetric(info.DiskID)
capStats, err := getDiskCapacityStats(info.DiskID)
if err != nil {
klog.ErrorS(err, "Get disk capacity failed", "disk", info.DiskID, err)
continue
}
p.sendCapStats(capStats, labels, ch)
metrics = append(metrics, p.getDiskCapacityMetrics(capStats, labelValues)...)
p.capacityEventAlert(capStats, info.PVCRef)
}
//elapsedTime := time.Since(startTime)
//logrus.Info("DiskStat spent time:", elapsedTime)

return metrics
}

func (p *diskStatCollector) Update(ch chan<- prometheus.Metric) error {
metrics := p.Get()
for _, metric := range metrics {
ch <- prometheus.MustNewConstMetric(metric.Desc, metric.ValueType, metric.Value, convertLabelsToString(metric.VariableLabelPairs)...)
}
return nil
}

Expand Down Expand Up @@ -247,28 +254,29 @@ func (p *diskStatCollector) capacityEventAlert(usage []*csi.VolumeUsage, ref *v1
}
}

func (p *diskStatCollector) sendDiskStats(stats *blockdevice.IOStats, labels []string, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(diskReadsCompletedDesc, prometheus.CounterValue, float64(stats.ReadIOs), labels...)
ch <- prometheus.MustNewConstMetric(diskReadsMergeDesc, prometheus.CounterValue, float64(stats.ReadMerges), labels...)
ch <- prometheus.MustNewConstMetric(diskReadBytesDesc, prometheus.CounterValue, float64(stats.ReadSectors)*diskSectorSize, labels...)
ch <- prometheus.MustNewConstMetric(diskReadTimeMilliSecondsDesc, prometheus.CounterValue, float64(stats.ReadTicks), labels...)

ch <- prometheus.MustNewConstMetric(diskWritesCompletedDesc, prometheus.CounterValue, float64(stats.WriteIOs), labels...)
ch <- prometheus.MustNewConstMetric(diskWriteMergeDesc, prometheus.CounterValue, float64(stats.WriteMerges), labels...)
ch <- prometheus.MustNewConstMetric(diskWrittenBytesDesc, prometheus.CounterValue, float64(stats.WriteSectors)*diskSectorSize, labels...)
ch <- prometheus.MustNewConstMetric(diskWriteTimeMilliSecondsDesc, prometheus.CounterValue, float64(stats.WriteTicks), labels...)

ch <- prometheus.MustNewConstMetric(diskIONowDesc, prometheus.GaugeValue, float64(stats.IOsInProgress), labels...)
ch <- prometheus.MustNewConstMetric(diskIOTimeSecondsDesc, prometheus.CounterValue, float64(stats.IOsTotalTicks)/1000, labels...)
func (p *diskStatCollector) getDiskStatMetrics(stats *blockdevice.IOStats, labelValues ...string) []*Metric {
return []*Metric{
MustNewMetricWithMetaDesc(diskReadsCompletedDesc, float64(stats.ReadIOs), prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskReadsMergeDesc, float64(stats.ReadMerges), prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskReadBytesDesc, float64(stats.ReadSectors)*diskSectorSize, prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskReadTimeMilliSecondsDesc, float64(stats.ReadTicks), prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskWritesCompletedDesc, float64(stats.WriteIOs), prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskWriteMergeDesc, float64(stats.WriteMerges), prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskWrittenBytesDesc, float64(stats.WriteSectors)*diskSectorSize, prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskWriteTimeMilliSecondsDesc, float64(stats.WriteTicks), prometheus.CounterValue, labelValues...),
MustNewMetricWithMetaDesc(diskIONowDesc, float64(stats.IOsInProgress), prometheus.GaugeValue, labelValues...),
MustNewMetricWithMetaDesc(diskIOTimeSecondsDesc, float64(stats.IOsTotalTicks)/1000, prometheus.CounterValue, labelValues...),
}
}

func (p *diskStatCollector) sendCapStats(stats []*csi.VolumeUsage, labels []string, ch chan<- prometheus.Metric) {
func (p *diskStatCollector) getDiskCapacityMetrics(stats []*csi.VolumeUsage, labelValues []string) (metrics []*Metric) {
for _, stat := range stats {
descs := capStatDescs[stat.Unit]
ch <- prometheus.MustNewConstMetric(descs.Available, prometheus.GaugeValue, float64(stat.Available), labels...)
ch <- prometheus.MustNewConstMetric(descs.Total, prometheus.GaugeValue, float64(stat.Total), labels...)
ch <- prometheus.MustNewConstMetric(descs.Used, prometheus.GaugeValue, float64(stat.Used), labels...)
metrics = append(metrics, MustNewMetricWithMetaDesc(descs.Available, float64(stat.Available), prometheus.GaugeValue, labelValues...))
metrics = append(metrics, MustNewMetricWithMetaDesc(descs.Total, float64(stat.Total), prometheus.GaugeValue, labelValues...))
metrics = append(metrics, MustNewMetricWithMetaDesc(descs.Used, float64(stat.Used), prometheus.GaugeValue, labelValues...))
}
return
}

func (p *diskStatCollector) updateMap(lastPvDiskInfoMap *map[string]diskInfo, jsonPaths []string, driverName string) {
Expand Down Expand Up @@ -321,7 +329,7 @@ func (p *diskStatCollector) updateDiskInfoMap(thisPvDiskInfoMap map[string]diskI
lastInfo, ok := (*lastPvDiskInfoMap)[pv]
// add and modify
if !ok || thisInfo.VolDataPath != lastInfo.VolDataPath {
pvcRef, err := getDiskPvcByPvName(p.clientSet, pv)
pvcRef, err := getDiskPvcByPvName(p.client, pv, thisInfo.VolDataPath)
if err != nil {
continue
}
Expand All @@ -343,7 +351,7 @@ func (p *diskStatCollector) updateDiskInfoMap(thisPvDiskInfoMap map[string]diskI
}
}

func getDiskCapacityMetric(diskID string) ([]*csi.VolumeUsage, error) {
func getDiskCapacityStats(diskID string) ([]*csi.VolumeUsage, error) {
globalMountPath := getGlobalMountPathByDiskID(diskID)
response, err := utils.GetMetrics(globalMountPath)
if err != nil {
Expand Down
Loading