解决日志api的message字段为空的问题
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -82,7 +83,7 @@ func (s *Storage) Close() {
|
||||
}
|
||||
|
||||
// 写入数据到InfluxDB,带重试机制
|
||||
func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string) error {
|
||||
func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string, timestamp ...time.Time) error {
|
||||
// 重试配置 - 减少重试次数和延迟,确保在超时时间内完成
|
||||
maxRetries := 2
|
||||
baseDelay := 200 * time.Millisecond
|
||||
@@ -113,7 +114,14 @@ func (s *Storage) writeData(ctx context.Context, measurement string, tags map[st
|
||||
}
|
||||
fieldList = append(fieldList, fieldStr)
|
||||
}
|
||||
line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), time.Now().UnixNano())
|
||||
// 确定时间戳
|
||||
var ts int64
|
||||
if len(timestamp) > 0 {
|
||||
ts = timestamp[0].UnixNano()
|
||||
} else {
|
||||
ts = time.Now().UnixNano()
|
||||
}
|
||||
line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), ts)
|
||||
err := writeAPI.WriteRecord(ctx, line)
|
||||
|
||||
if err == nil {
|
||||
@@ -248,8 +256,8 @@ func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence
|
||||
"message": message,
|
||||
}
|
||||
|
||||
// 使用新的writeData方法
|
||||
return s.writeData(ctx, "logs", allTags, fields, deviceID, "log")
|
||||
// 使用新的writeData方法,传入日志的实际时间
|
||||
return s.writeData(ctx, "logs", allTags, fields, deviceID, "log", time)
|
||||
}
|
||||
|
||||
// QueryMetrics 查询监控指标,支持采样
|
||||
@@ -577,7 +585,7 @@ func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTim
|
||||
// 按时间倒序排列,获取最新的日志
|
||||
query += `
|
||||
|> sort(columns: ["_time"], desc: true)
|
||||
|> limit(n: 100)` // 限制返回100条最新日志
|
||||
|> limit(n: 200)` // 限制返回200条记录(因为message和sequence是分开存储的)
|
||||
|
||||
// 执行查询
|
||||
queryResult, err := queryAPI.Query(ctx, query)
|
||||
@@ -586,36 +594,65 @@ func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTim
|
||||
}
|
||||
defer queryResult.Close()
|
||||
|
||||
// 存储日志数据
|
||||
logs := make([]map[string]interface{}, 0)
|
||||
// 使用map存储日志数据,key是时间戳和source的组合
|
||||
logMap := make(map[string]map[string]interface{})
|
||||
|
||||
// 处理查询结果
|
||||
for queryResult.Next() {
|
||||
if queryResult.TableChanged() {
|
||||
// 表结构变化,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
// 获取记录
|
||||
record := queryResult.Record()
|
||||
|
||||
// 构建日志数据
|
||||
logData := map[string]interface{}{
|
||||
"time": record.Time(),
|
||||
"device_id": record.ValueByKey("device_id"),
|
||||
"source": record.ValueByKey("source"),
|
||||
"sequence": record.ValueByKey("sequence"),
|
||||
"message": record.ValueByKey("message"),
|
||||
"agent_name": record.ValueByKey("agent_name"),
|
||||
|
||||
// 获取时间和source作为唯一键
|
||||
timeStr := record.Time().Format(time.RFC3339Nano)
|
||||
source := record.ValueByKey("source").(string)
|
||||
key := timeStr + "-" + source
|
||||
|
||||
// 检查是否已经有这个日志条目的基础信息
|
||||
logData, exists := logMap[key]
|
||||
if !exists {
|
||||
// 创建新的日志条目
|
||||
logData = map[string]interface{}{
|
||||
"time": record.Time(),
|
||||
"device_id": record.ValueByKey("device_id"),
|
||||
"source": source,
|
||||
"sequence": nil,
|
||||
"message": nil,
|
||||
"agent_name": record.ValueByKey("agent_name"),
|
||||
}
|
||||
logMap[key] = logData
|
||||
}
|
||||
|
||||
// 根据字段类型更新相应的值
|
||||
field := record.Field()
|
||||
switch field {
|
||||
case "message":
|
||||
logData["message"] = record.Value()
|
||||
case "sequence":
|
||||
logData["sequence"] = record.Value()
|
||||
}
|
||||
|
||||
// 添加到日志列表
|
||||
logs = append(logs, logData)
|
||||
}
|
||||
|
||||
if queryResult.Err() != nil {
|
||||
return nil, queryResult.Err()
|
||||
}
|
||||
|
||||
// 将map转换为切片
|
||||
logs := make([]map[string]interface{}, 0, len(logMap))
|
||||
for _, logData := range logMap {
|
||||
logs = append(logs, logData)
|
||||
}
|
||||
|
||||
// 按时间倒序排序
|
||||
sort.Slice(logs, func(i, j int) bool {
|
||||
timeI := logs[i]["time"].(time.Time)
|
||||
timeJ := logs[j]["time"].(time.Time)
|
||||
return timeI.After(timeJ)
|
||||
})
|
||||
|
||||
// 限制返回100条最新日志
|
||||
if len(logs) > 100 {
|
||||
logs = logs[:100]
|
||||
}
|
||||
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user