diff --git a/backend/internal/handler/handler.go b/backend/internal/handler/handler.go index 31cb10b..11ba96a 100644 --- a/backend/internal/handler/handler.go +++ b/backend/internal/handler/handler.go @@ -476,13 +476,15 @@ func broadcastMetricsUpdate(deviceID string, metrics map[string]interface{}) { func GetCPUMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 - startTime := c.DefaultQuery("start_time", "-24h") + startTime := c.DefaultQuery("start_time", "-1h") // 缩短默认查询时间范围到1小时,减少默认数据量 endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 + limitStr := c.DefaultQuery("limit", "5000") // 添加limit参数,默认5000条记录 + limit, _ := strconv.Atoi(limitStr) // 查询数据 - points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "cpu", startTime, endTime) + points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "cpu", startTime, endTime, limit) if err != nil { // 只记录警告,返回空数据 log.Printf("Warning: Failed to query CPU metrics: %v", err) @@ -504,13 +506,15 @@ func GetCPUMetrics(c *gin.Context) { func GetMemoryMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 - startTime := c.DefaultQuery("start_time", "-24h") + startTime := c.DefaultQuery("start_time", "-1h") // 缩短默认查询时间范围到1小时,减少默认数据量 endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 + limitStr := c.DefaultQuery("limit", "5000") // 添加limit参数,默认5000条记录 + limit, _ := strconv.Atoi(limitStr) // 查询数据 - points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "memory", startTime, endTime) + points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "memory", startTime, endTime, limit) if err != nil { // 只记录警告,返回空数据 log.Printf("Warning: Failed to query memory metrics: %v", err) @@ -532,13 +536,15 @@ func GetMemoryMetrics(c *gin.Context) { func GetDiskMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 - startTime := c.DefaultQuery("start_time", "-24h") + startTime := c.DefaultQuery("start_time", "-1h") // 缩短默认查询时间范围到1小时,减少默认数据量 endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 + limitStr := c.DefaultQuery("limit", "5000") // 添加limit参数,默认5000条记录 + limit, _ := strconv.Atoi(limitStr) // 查询数据 - points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "disk", startTime, endTime) + points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "disk", startTime, endTime, limit) if err != nil { // 只记录警告,返回空数据 log.Printf("Warning: Failed to query disk metrics: %v", err) @@ -575,18 +581,20 @@ func GetDiskMetrics(c *gin.Context) { func GetNetworkMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 - startTime := c.DefaultQuery("start_time", "-24h") + startTime := c.DefaultQuery("start_time", "-1h") // 缩短默认查询时间范围到1小时,减少默认数据量 endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 + limitStr := c.DefaultQuery("limit", "5000") // 添加limit参数,默认5000条记录 + limit, _ := strconv.Atoi(limitStr) // 查询发送和接收的网络速率指标 - sentPoints, err1 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_sent", startTime, endTime) - receivedPoints, err2 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_received", startTime, endTime) + sentPoints, err1 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_sent", startTime, endTime, limit) + receivedPoints, err2 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_received", startTime, endTime, limit) // 查询发送和接收的累积总流量指标 - txBytesPoints, err3 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_tx_bytes", startTime, endTime) - rxBytesPoints, err4 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_rx_bytes", startTime, endTime) + txBytesPoints, err3 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_tx_bytes", startTime, endTime, limit) + rxBytesPoints, err4 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_rx_bytes", startTime, endTime, limit) // 处理错误 if err1 != nil { diff --git a/backend/internal/handler/processor.go b/backend/internal/handler/processor.go index fa85c16..716f5e8 100644 --- a/backend/internal/handler/processor.go +++ b/backend/internal/handler/processor.go @@ -149,20 +149,7 @@ func ProcessMetricData(points []storage.MetricPoint, aggregation string, interva // 如果没有数据点,生成覆盖整个时间范围的空数据点 if len(points) == 0 { - // 生成空数据点,确保覆盖整个时间范围 - result := make([]MetricData, 0) - currentTime := start - - // 循环生成数据点,直到超过结束时间 - for currentTime.Before(end) { - result = append(result, MetricData{ - Time: currentTime, - Value: 0, // 使用0表示无数据 - }) - currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second) - } - - return result + return generateEmptyData(start, end, intervalSeconds) } // 根据时间区段精细程度自动调整聚合策略 @@ -192,6 +179,24 @@ func ProcessMetricData(points []storage.MetricPoint, aggregation string, interva } } +// generateEmptyData 生成覆盖整个时间范围的空数据点 +func generateEmptyData(start, end time.Time, intervalSeconds int) []MetricData { + // 生成空数据点,确保覆盖整个时间范围 + result := make([]MetricData, 0) + currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second) + + // 循环生成数据点,直到超过结束时间 + for currentTime.Before(end) { + result = append(result, MetricData{ + Time: currentTime, + Value: 0, // 使用0表示无数据 + }) + currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second) + } + + return result +} + // processRawData 处理原始数据,添加去重逻辑 func processRawData(points []storage.MetricPoint) []MetricData { // 使用map进行去重,key为时间戳(精确到秒)+值的组合 @@ -224,36 +229,43 @@ func processRawData(points []storage.MetricPoint) []MetricData { func processAverageData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData { // 按指定时间区间聚合的平均值 - // 按指定区间分组,保留UTC时区信息 - intervalData := make(map[string][]float64) - - for _, point := range points { - // 使用UTC时间,按指定区间对齐 - utcTime := point.Time.UTC() - // 格式化时间键 - intervalKey := FormatTimeByInterval(utcTime, intervalSeconds) - value := point.Value - intervalData[intervalKey] = append(intervalData[intervalKey], value) + // 如果没有数据点,生成空数据 + if len(points) == 0 { + return generateEmptyData(start, end, intervalSeconds) } - // 生成覆盖整个请求时间范围的完整时间序列 + // 先对原始数据按时间排序 + sort.Slice(points, func(i, j int) bool { + return points[i].Time.Before(points[j].Time) + }) + + // 初始化结果和指针 result := make([]MetricData, 0) currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second) + pointIndex := 0 + pointCount := len(points) - // 循环生成每个时间区间的数据点 + // 循环处理每个时间区间 for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) { - // 格式化当前时间为区间键 - intervalKey := FormatTimeByInterval(currentTime, intervalSeconds) + intervalStart := currentTime + intervalEnd := intervalStart.Add(time.Duration(intervalSeconds) * time.Second) - // 计算当前区间的平均值 - var value float64 = 0 - if values, exists := intervalData[intervalKey]; exists && len(values) > 0 { - // 计算平均值 - sum := 0.0 - for _, v := range values { - sum += v + // 收集当前区间内的数据点 + var sum float64 = 0 + var count int = 0 + for pointIndex < pointCount && points[pointIndex].Time.Before(intervalEnd) { + // 只处理区间内的数据点 + if points[pointIndex].Time.After(intervalStart) || points[pointIndex].Time.Equal(intervalStart) { + sum += points[pointIndex].Value + count++ } - value = sum / float64(len(values)) + pointIndex++ + } + + // 计算平均值 + var value float64 = 0 + if count > 0 { + value = sum / float64(count) } // 添加到结果 @@ -263,7 +275,7 @@ func processAverageData(points []storage.MetricPoint, intervalSeconds int, start }) // 移动到下一个时间区间 - currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second) + currentTime = intervalEnd } return result @@ -273,38 +285,41 @@ func processAverageData(points []storage.MetricPoint, intervalSeconds int, start func processMaxData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData { // 按指定时间区间聚合的最大值 - // 按指定区间分组,保留UTC时区信息 - intervalData := make(map[string][]float64) - - for _, point := range points { - // 使用UTC时间,按指定区间对齐 - utcTime := point.Time.UTC() - // 格式化时间键 - intervalKey := FormatTimeByInterval(utcTime, intervalSeconds) - value := point.Value - intervalData[intervalKey] = append(intervalData[intervalKey], value) + // 如果没有数据点,生成空数据 + if len(points) == 0 { + return generateEmptyData(start, end, intervalSeconds) } - // 生成覆盖整个请求时间范围的完整时间序列 + // 先对原始数据按时间排序 + sort.Slice(points, func(i, j int) bool { + return points[i].Time.Before(points[j].Time) + }) + + // 初始化结果和指针 result := make([]MetricData, 0) currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second) + pointIndex := 0 + pointCount := len(points) - // 循环生成每个时间区间的数据点 + // 循环处理每个时间区间 for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) { - // 格式化当前时间为区间键 - intervalKey := FormatTimeByInterval(currentTime, intervalSeconds) + intervalStart := currentTime + intervalEnd := intervalStart.Add(time.Duration(intervalSeconds) * time.Second) - // 计算当前区间的最大值 + // 查找当前区间内的最大值 var value float64 = 0 - if values, exists := intervalData[intervalKey]; exists && len(values) > 0 { - // 计算最大值 - max := values[0] - for _, v := range values { - if v > max { - max = v + var foundData bool = false + for pointIndex < pointCount && points[pointIndex].Time.Before(intervalEnd) { + // 只处理区间内的数据点 + if points[pointIndex].Time.After(intervalStart) || points[pointIndex].Time.Equal(intervalStart) { + if !foundData { + value = points[pointIndex].Value + foundData = true + } else if points[pointIndex].Value > value { + value = points[pointIndex].Value } } - value = max + pointIndex++ } // 添加到结果 @@ -314,7 +329,7 @@ func processMaxData(points []storage.MetricPoint, intervalSeconds int, start, en }) // 移动到下一个时间区间 - currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second) + currentTime = intervalEnd } return result @@ -324,38 +339,41 @@ func processMaxData(points []storage.MetricPoint, intervalSeconds int, start, en func processMinData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData { // 按指定时间区间聚合的最小值 - // 按指定区间分组,保留UTC时区信息 - intervalData := make(map[string][]float64) - - for _, point := range points { - // 使用UTC时间,按指定区间对齐 - utcTime := point.Time.UTC() - // 格式化时间键 - intervalKey := FormatTimeByInterval(utcTime, intervalSeconds) - value := point.Value - intervalData[intervalKey] = append(intervalData[intervalKey], value) + // 如果没有数据点,生成空数据 + if len(points) == 0 { + return generateEmptyData(start, end, intervalSeconds) } - // 生成覆盖整个请求时间范围的完整时间序列 + // 先对原始数据按时间排序 + sort.Slice(points, func(i, j int) bool { + return points[i].Time.Before(points[j].Time) + }) + + // 初始化结果和指针 result := make([]MetricData, 0) currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second) + pointIndex := 0 + pointCount := len(points) - // 循环生成每个时间区间的数据点 + // 循环处理每个时间区间 for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) { - // 格式化当前时间为区间键 - intervalKey := FormatTimeByInterval(currentTime, intervalSeconds) + intervalStart := currentTime + intervalEnd := intervalStart.Add(time.Duration(intervalSeconds) * time.Second) - // 计算当前区间的最小值 + // 查找当前区间内的最小值 var value float64 = 0 - if values, exists := intervalData[intervalKey]; exists && len(values) > 0 { - // 计算最小值 - min := values[0] - for _, v := range values { - if v < min { - min = v + var foundData bool = false + for pointIndex < pointCount && points[pointIndex].Time.Before(intervalEnd) { + // 只处理区间内的数据点 + if points[pointIndex].Time.After(intervalStart) || points[pointIndex].Time.Equal(intervalStart) { + if !foundData { + value = points[pointIndex].Value + foundData = true + } else if points[pointIndex].Value < value { + value = points[pointIndex].Value } } - value = min + pointIndex++ } // 添加到结果 @@ -365,7 +383,7 @@ func processMinData(points []storage.MetricPoint, intervalSeconds int, start, en }) // 移动到下一个时间区间 - currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second) + currentTime = intervalEnd } return result @@ -375,40 +393,47 @@ func processMinData(points []storage.MetricPoint, intervalSeconds int, start, en func processSumData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData { // 按指定时间区间聚合的总和 - // 按指定区间分组,保留UTC时区信息 - intervalData := make(map[string][]storage.MetricPoint) - - for _, point := range points { - // 使用UTC时间,按指定区间对齐 - utcTime := point.Time.UTC() - // 格式化时间键 - intervalKey := FormatTimeByInterval(utcTime, intervalSeconds) - intervalData[intervalKey] = append(intervalData[intervalKey], point) + // 如果没有数据点,生成空数据 + if len(points) == 0 { + return generateEmptyData(start, end, intervalSeconds) } - // 生成覆盖整个请求时间范围的完整时间序列 + // 先对原始数据按时间排序 + sort.Slice(points, func(i, j int) bool { + return points[i].Time.Before(points[j].Time) + }) + + // 初始化结果和指针 result := make([]MetricData, 0) currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second) + pointIndex := 0 + pointCount := len(points) - // 循环生成每个时间区间的数据点 + // 循环处理每个时间区间 for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) { - // 格式化当前时间为区间键 - intervalKey := FormatTimeByInterval(currentTime, intervalSeconds) + intervalStart := currentTime + intervalEnd := intervalStart.Add(time.Duration(intervalSeconds) * time.Second) - // 计算当前区间的总和 + // 收集当前区间内的数据点 + intervalPoints := make([]storage.MetricPoint, 0) + for pointIndex < pointCount && points[pointIndex].Time.Before(intervalEnd) { + // 只处理区间内的数据点 + if points[pointIndex].Time.After(intervalStart) || points[pointIndex].Time.Equal(intervalStart) { + intervalPoints = append(intervalPoints, points[pointIndex]) + } + pointIndex++ + } + + // 计算总和 sum := 0.0 - if intervalPoints, exists := intervalData[intervalKey]; exists { - // 如果数据点数量小于2,直接返回第一个数据点的值 - if len(intervalPoints) < 2 { - if len(intervalPoints) > 0 { - sum = intervalPoints[0].Value - } + intervalPointCount := len(intervalPoints) + if intervalPointCount > 0 { + if intervalPointCount < 2 { + // 如果数据点数量小于2,直接返回第一个数据点的值 + sum = intervalPoints[0].Value } else { - // 按时间排序 - sort.Slice(intervalPoints, func(i, j int) bool { - return intervalPoints[i].Time.Before(intervalPoints[j].Time) - }) - for i := 1; i < len(intervalPoints); i++ { + // 计算区间内的流量总和 + for i := 1; i < intervalPointCount; i++ { // 计算时间差(秒) timeDiff := intervalPoints[i].Time.Sub(intervalPoints[i-1].Time).Seconds() if timeDiff > 0 { @@ -430,7 +455,7 @@ func processSumData(points []storage.MetricPoint, intervalSeconds int, start, en }) // 移动到下一个时间区间 - currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second) + currentTime = intervalEnd } return result diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index 966d3af..2483181 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "math/rand" + "strconv" "strings" "time" @@ -251,8 +252,8 @@ func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence return s.writeData(ctx, "logs", allTags, fields, deviceID, "log") } -// QueryMetrics 查询监控指标 -func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string) ([]MetricPoint, error) { +// QueryMetrics 查询监控指标,支持采样 +func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string, limit ...int) ([]MetricPoint, error) { queryAPI := s.client.QueryAPI(s.org) // 处理时间参数 @@ -324,6 +325,15 @@ func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startT |> filter(fn: (r) => r["type"] == "` + metricType + `") |> sort(columns: ["_time"])` + // 设置默认限制 + maxLimit := 10000 + if len(limit) > 0 && limit[0] > 0 { + maxLimit = limit[0] + } + + query += ` +|> limit(n: ` + strconv.Itoa(maxLimit) + `)` // 限制返回的数据点数量,防止内存溢出 + // 执行查询 result, err := queryAPI.Query(ctx, query) if err != nil { @@ -591,11 +601,11 @@ func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTim // 构建日志数据 logData := map[string]interface{}{ - "time": record.Time(), - "device_id": record.ValueByKey("device_id"), - "source": record.ValueByKey("source"), - "sequence": record.ValueByKey("sequence"), - "message": record.ValueByKey("message"), + "time": record.Time(), + "device_id": record.ValueByKey("device_id"), + "source": record.ValueByKey("source"), + "sequence": record.ValueByKey("sequence"), + "message": record.ValueByKey("message"), "agent_name": record.ValueByKey("agent_name"), } diff --git a/backend/monitor-server b/backend/monitor-server index 02631dd..f6b5071 100755 Binary files a/backend/monitor-server and b/backend/monitor-server differ