增加更多采集指标

This commit is contained in:
Alex Yang
2025-12-06 21:12:56 +08:00
parent c87071390d
commit 8a5ca62793
22 changed files with 3541058 additions and 63 deletions

View File

@@ -84,15 +84,14 @@ func (s *Storage) Close() {
// 写入数据到InfluxDB带重试机制
func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string, timestamp ...time.Time) error {
// 重试配置 - 减少重试次数和延迟,确保在超时时间内完成
maxRetries := 2
baseDelay := 200 * time.Millisecond
// 重试配置 - 调整重试策略,减少重试次数,增加单次写入超时
maxRetries := 1
baseDelay := 100 * time.Millisecond
for i := 0; i <= maxRetries; i++ {
// 如果上下文已取消,直接返回
if ctx.Err() != nil {
return ctx.Err()
}
// 创建独立的写入上下文,设置较短的超时时间
writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// 写入数据点
writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket)
@@ -122,7 +121,7 @@ func (s *Storage) writeData(ctx context.Context, measurement string, tags map[st
ts = time.Now().UnixNano()
}
line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), ts)
err := writeAPI.WriteRecord(ctx, line)
err := writeAPI.WriteRecord(writeCtx, line)
if err == nil {
// 写入成功,直接返回
@@ -211,7 +210,7 @@ func (s *Storage) WriteProcessMetric(ctx context.Context, deviceID string, proce
}
// WriteDiskDetailMetric 写入磁盘详细信息
func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskDeviceID, status, diskType string, sizeGB float64, model, interfaceType, description string, tags map[string]string) error {
func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskPath, status, diskType string, sizeGB float64, model, interfaceType, description string, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
@@ -221,7 +220,7 @@ func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskDevic
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加磁盘相关标签
allTags["disk_id"] = diskDeviceID
allTags["disk_path"] = diskPath
allTags["status"] = status
allTags["type"] = diskType
allTags["model"] = model
@@ -260,6 +259,23 @@ func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence
return s.writeData(ctx, "logs", allTags, fields, deviceID, "log", time)
}
// WriteHardwareMetric 写入硬件信息指标
func (s *Storage) WriteHardwareMetric(ctx context.Context, deviceID string, hardwareType string, hardwareData map[string]interface{}, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加硬件类型标签
allTags["type"] = hardwareType
// 使用新的writeData方法
return s.writeData(ctx, "hardware", allTags, hardwareData, deviceID, hardwareType)
}
// QueryMetrics 查询监控指标,支持采样
func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string, limit ...int) ([]MetricPoint, error) {
queryAPI := s.client.QueryAPI(s.org)
@@ -448,6 +464,112 @@ func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, err
return deviceList, nil
}
// QueryHardwareMetrics 查询硬件信息指标
func (s *Storage) QueryHardwareMetrics(ctx context.Context, deviceID string) (map[string]interface{}, error) {
queryAPI := s.client.QueryAPI(s.org)
// 初始化硬件信息结果
hardwareInfo := make(map[string]interface{})
// 定义要查询的硬件类型列表
hardwareTypes := []string{"os", "cpu", "memory", "disk", "network"}
// 为每个硬件类型单独查询,避免类型冲突
for _, hardwareType := range hardwareTypes {
// 构建查询语句,获取设备最新的特定类型硬件信息
query := `from(bucket: "` + s.bucket + `")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "hardware")
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")
|> filter(fn: (r) => r["type"] == "` + hardwareType + `")
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
// 查询单个硬件类型失败,继续查询其他类型
log.Printf("Warning: Failed to query %s hardware metrics: %v", hardwareType, err)
continue
}
defer queryResult.Close()
// 对于os、cpu、memory类型我们只需要一个结果
if hardwareType == "os" || hardwareType == "cpu" || hardwareType == "memory" {
// 获取字段值映射
fieldMap := make(map[string]interface{})
// 遍历所有结果行,收集所有字段
for queryResult.Next() {
record := queryResult.Record()
fieldName := record.Field()
fieldValue := record.Value()
fieldMap[fieldName] = fieldValue
}
// 如果有字段数据,添加到硬件信息结果中
if len(fieldMap) > 0 {
hardwareInfo[hardwareType] = fieldMap
}
} else {
// 对于disk和network类型我们可能有多个结果
// 我们需要收集所有结果并按index排序
// 使用map来去重
resultMap := make(map[string]map[string]interface{})
// 遍历所有结果行,收集所有结果
for queryResult.Next() {
// 获取记录
record := queryResult.Record()
// 获取字段名和值
fieldName := record.Field()
fieldValue := record.Value()
// 获取硬件ID用于去重
hardwareID := "unknown"
if hardwareType == "disk" {
// 对于磁盘使用device_id作为唯一标识
if devID, ok := record.ValueByKey("device_id").(string); ok {
hardwareID = devID
}
} else {
// 对于网卡使用name作为唯一标识
if name, ok := record.ValueByKey("name").(string); ok {
hardwareID = name
}
}
// 获取或创建硬件数据映射
if _, ok := resultMap[hardwareID]; !ok {
resultMap[hardwareID] = make(map[string]interface{})
}
// 添加字段值
resultMap[hardwareID][fieldName] = fieldValue
}
// 将去重后的结果转换为切片
var allResults []map[string]interface{}
for _, result := range resultMap {
allResults = append(allResults, result)
}
// 如果有结果,添加到硬件信息结果中
if len(allResults) > 0 {
hardwareInfo[hardwareType] = allResults
}
}
// 检查查询结果是否有错误
if queryResult.Err() != nil {
// 查询单个硬件类型失败,继续查询其他类型
log.Printf("Warning: Failed to process %s hardware query result: %v", hardwareType, queryResult.Err())
}
}
return hardwareInfo, nil
}
// DeviceStatus 设备状态
func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) {
queryAPI := s.client.QueryAPI(s.org)
@@ -601,12 +723,12 @@ func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTim
for queryResult.Next() {
// 获取记录
record := queryResult.Record()
// 获取时间和source作为唯一键
timeStr := record.Time().Format(time.RFC3339Nano)
source := record.ValueByKey("source").(string)
key := timeStr + "-" + source
// 检查是否已经有这个日志条目的基础信息
logData, exists := logMap[key]
if !exists {
@@ -621,7 +743,7 @@ func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTim
}
logMap[key] = logData
}
// 根据字段类型更新相应的值
field := record.Field()
switch field {
@@ -635,20 +757,20 @@ func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTim
if queryResult.Err() != nil {
return nil, queryResult.Err()
}
// 将map转换为切片
logs := make([]map[string]interface{}, 0, len(logMap))
for _, logData := range logMap {
logs = append(logs, logData)
}
// 按时间倒序排序
sort.Slice(logs, func(i, j int) bool {
timeI := logs[i]["time"].(time.Time)
timeJ := logs[j]["time"].(time.Time)
return timeI.After(timeJ)
})
// 限制返回100条最新日志
if len(logs) > 100 {
logs = logs[:100]