Files
monitor/backend/internal/storage/storage.go
2025-12-02 23:20:10 +08:00

318 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package storage
import (
"context"
"log"
"strings"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/monitor/backend/config"
)
// MetricPoint 自定义监控指标点
type MetricPoint struct {
Time time.Time `json:"time"`
Value float64 `json:"value"`
Tags map[string]string `json:"tags,omitempty"`
}
type Storage struct {
client influxdb2.Client
org string
bucket string
}
// NewStorage 创建新的存储实例
func NewStorage(cfg *config.Config) *Storage {
var client influxdb2.Client
// InfluxDB 2.x版本使用Token认证
// 检查Token是否为空
if cfg.InfluxDB.Token != "" {
// 使用Token认证适用于InfluxDB 2.x
client = influxdb2.NewClient(cfg.InfluxDB.URL, cfg.InfluxDB.Token)
} else {
// Token为空尝试使用默认客户端
// 注意InfluxDB 2.x需要有效的Token才能写入数据
log.Printf("Warning: InfluxDB Token is empty, write operations may fail")
client = influxdb2.NewClient(cfg.InfluxDB.URL, "")
}
// 禁用InfluxDB客户端的调试日志
client.Options().SetLogLevel(0)
return &Storage{
client: client,
org: cfg.InfluxDB.Org,
bucket: cfg.InfluxDB.Bucket,
}
}
// Close 关闭存储连接
func (s *Storage) Close() {
s.client.Close()
}
// WriteMetric 写入监控指标
func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string, value float64, tags map[string]string) error {
writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket)
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加指标类型标签
allTags["type"] = metricType
// 创建数据点
point := influxdb2.NewPoint(
"metrics",
allTags,
map[string]interface{}{
"value": value,
},
time.Now(),
)
// 写入数据点
return writeAPI.WritePoint(ctx, point)
}
// QueryMetrics 查询监控指标
func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string) ([]MetricPoint, error) {
queryAPI := s.client.QueryAPI(s.org)
// 处理时间参数
processedStartTime := startTime
processedEndTime := endTime
// 检查并修复时间格式确保InfluxDB能正确识别
// 对于相对时间(如-24h或now(),直接使用
// 对于绝对时间,确保格式正确
if len(startTime) > 0 {
// 检查是否是相对时间(以-开头或为now()
isRelativeTime := strings.HasPrefix(startTime, "-") || startTime == "now()"
if !isRelativeTime {
// 是绝对时间尝试解析为time.Time类型
t, err := time.Parse(time.RFC3339, startTime)
if err != nil {
// 尝试其他常见格式
t, err = time.Parse("2006-01-02T15:04", startTime)
if err != nil {
// 尝试YYYY-MM-DD HH:MM格式
t, err = time.Parse("2006-01-02 15:04", startTime)
}
}
if err == nil {
// 解析成功使用RFC3339格式InfluxDB可以识别
processedStartTime = t.Format(time.RFC3339)
}
}
}
if len(endTime) > 0 {
// 检查是否是相对时间(以-开头或为now()
isRelativeTime := strings.HasPrefix(endTime, "-") || endTime == "now()"
if !isRelativeTime {
// 是绝对时间尝试解析为time.Time类型
t, err := time.Parse(time.RFC3339, endTime)
if err != nil {
// 尝试其他常见格式
t, err = time.Parse("2006-01-02T15:04", endTime)
if err != nil {
// 尝试YYYY-MM-DD HH:MM格式
t, err = time.Parse("2006-01-02 15:04", endTime)
}
}
if err == nil {
// 解析成功使用RFC3339格式InfluxDB可以识别
processedEndTime = t.Format(time.RFC3339)
}
}
}
// 构建查询语句,包含标签信息和字段过滤
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + processedStartTime + `, stop: ` + processedEndTime + `)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> filter(fn: (r) => r["_field"] == "value")`
// 只有当deviceID非空时才添加device_id过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
query += `
|> filter(fn: (r) => r["type"] == "` + metricType + `")
|> sort(columns: ["_time"])`
// 执行查询
result, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer result.Close()
var points []MetricPoint
// 处理查询结果
for result.Next() {
// 表结构变化也需要处理数据
// 获取记录
record := result.Record()
// 获取所有标签
tags := make(map[string]string)
for k, v := range record.Values() {
// 跳过内置字段,只保留自定义标签
if k != "_time" && k != "_value" && k != "_field" && k != "_measurement" {
if strValue, ok := v.(string); ok {
tags[k] = strValue
}
}
}
// 转换为自定义MetricPoint
point := MetricPoint{
Time: record.Time(),
Value: record.Value().(float64),
Tags: tags,
}
points = append(points, point)
}
if result.Err() != nil {
return nil, result.Err()
}
return points, nil
}
// DeviceInfo 设备信息
func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句获取所有唯一的device_id和对应的agent_name
query := `from(bucket: "` + s.bucket + `")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> group(columns: ["device_id", "agent_name"])
|> last()
|> keep(columns: ["device_id", "agent_name"])`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
defer queryResult.Close()
deviceMap := make(map[string]string)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
deviceID := record.ValueByKey("device_id").(string)
agentName := ""
if name, ok := record.ValueByKey("agent_name").(string); ok {
agentName = name
}
if agentName == "" {
agentName = deviceID
}
deviceMap[deviceID] = agentName
}
if queryResult.Err() != nil {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
// 转换为切片
deviceList := make([]map[string]string, 0, len(deviceMap))
for deviceID, agentName := range deviceMap {
deviceList = append(deviceList, map[string]string{
"id": deviceID,
"name": agentName,
})
}
// 如果没有设备,返回默认设备
if len(deviceList) == 0 {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
return deviceList, nil
}
// DeviceStatus 设备状态
func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句获取设备最新的指标数据和agent名称
query := `from(bucket: "` + s.bucket + `")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return "", nil, err
}
defer queryResult.Close()
status := make(map[string]float64)
agentName := deviceID
hasData := false
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
hasData = true
// 获取记录
record := queryResult.Record()
metricType := record.ValueByKey("type").(string)
value := record.Value().(float64)
status[metricType] = value
// 获取agent名称
if name, ok := record.ValueByKey("agent_name").(string); ok && name != "" {
agentName = name
}
}
if queryResult.Err() != nil {
return "", nil, queryResult.Err()
}
if !hasData {
// 如果没有数据,返回空状态
return "", map[string]float64{}, nil
}
return agentName, status, nil
}