package storage import ( "context" "log" "strings" "time" influxdb2 "github.com/influxdata/influxdb-client-go" "github.com/monitor/backend/config" ) // MetricPoint 自定义监控指标点 type MetricPoint struct { Time time.Time `json:"time"` Value float64 `json:"value"` Tags map[string]string `json:"tags,omitempty"` } type Storage struct { client influxdb2.Client org string bucket string } // NewStorage 创建新的存储实例 func NewStorage(cfg *config.Config) *Storage { var client influxdb2.Client // InfluxDB 2.x版本使用Token认证 // 检查Token是否为空 if cfg.InfluxDB.Token != "" { // 使用Token认证(适用于InfluxDB 2.x) client = influxdb2.NewClient(cfg.InfluxDB.URL, cfg.InfluxDB.Token) } else { // Token为空,尝试使用默认客户端 // 注意:InfluxDB 2.x需要有效的Token才能写入数据 log.Printf("Warning: InfluxDB Token is empty, write operations may fail") client = influxdb2.NewClient(cfg.InfluxDB.URL, "") } // 禁用InfluxDB客户端的调试日志 client.Options().SetLogLevel(0) return &Storage{ client: client, org: cfg.InfluxDB.Org, bucket: cfg.InfluxDB.Bucket, } } // Close 关闭存储连接 func (s *Storage) Close() { s.client.Close() } // WriteMetric 写入监控指标 func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string, value float64, tags map[string]string) error { writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket) // 创建标签映射,合并原有标签和新标签 allTags := make(map[string]string) // 复制原有标签 for k, v := range tags { allTags[k] = v } // 添加设备ID标签 allTags["device_id"] = deviceID // 添加指标类型标签 allTags["type"] = metricType // 创建数据点 point := influxdb2.NewPoint( "metrics", allTags, map[string]interface{}{ "value": value, }, time.Now(), ) // 写入数据点 return writeAPI.WritePoint(ctx, point) } // QueryMetrics 查询监控指标 func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string) ([]MetricPoint, error) { queryAPI := s.client.QueryAPI(s.org) // 处理时间参数 processedStartTime := startTime processedEndTime := endTime // 检查并修复时间格式,确保InfluxDB能正确识别 // 对于相对时间(如-24h)或now(),直接使用 // 对于绝对时间,确保格式正确 if len(startTime) > 0 { // 检查是否是相对时间(以-开头或为now()) isRelativeTime := strings.HasPrefix(startTime, "-") || startTime == "now()" if !isRelativeTime { // 是绝对时间,尝试解析为time.Time类型 t, err := time.Parse(time.RFC3339, startTime) if err != nil { // 尝试其他常见格式 t, err = time.Parse("2006-01-02T15:04", startTime) if err != nil { // 尝试YYYY-MM-DD HH:MM格式 t, err = time.Parse("2006-01-02 15:04", startTime) } } if err == nil { // 解析成功,使用RFC3339格式(InfluxDB可以识别) processedStartTime = t.Format(time.RFC3339) } } } if len(endTime) > 0 { // 检查是否是相对时间(以-开头或为now()) isRelativeTime := strings.HasPrefix(endTime, "-") || endTime == "now()" if !isRelativeTime { // 是绝对时间,尝试解析为time.Time类型 t, err := time.Parse(time.RFC3339, endTime) if err != nil { // 尝试其他常见格式 t, err = time.Parse("2006-01-02T15:04", endTime) if err != nil { // 尝试YYYY-MM-DD HH:MM格式 t, err = time.Parse("2006-01-02 15:04", endTime) } } if err == nil { // 解析成功,使用RFC3339格式(InfluxDB可以识别) processedEndTime = t.Format(time.RFC3339) } } } // 构建查询语句,包含标签信息和字段过滤 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + processedStartTime + `, stop: ` + processedEndTime + `) |> filter(fn: (r) => r["_measurement"] == "metrics") |> filter(fn: (r) => r["_field"] == "value")` // 只有当deviceID非空时才添加device_id过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } query += ` |> filter(fn: (r) => r["type"] == "` + metricType + `") |> sort(columns: ["_time"])` // 执行查询 result, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer result.Close() var points []MetricPoint // 处理查询结果 for result.Next() { // 表结构变化也需要处理数据 // 获取记录 record := result.Record() // 获取所有标签 tags := make(map[string]string) for k, v := range record.Values() { // 跳过内置字段,只保留自定义标签 if k != "_time" && k != "_value" && k != "_field" && k != "_measurement" { if strValue, ok := v.(string); ok { tags[k] = strValue } } } // 转换为自定义MetricPoint point := MetricPoint{ Time: record.Time(), Value: record.Value().(float64), Tags: tags, } points = append(points, point) } if result.Err() != nil { return nil, result.Err() } return points, nil } // DeviceInfo 设备信息 func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句,获取所有唯一的device_id和对应的agent_name query := `from(bucket: "` + s.bucket + `") |> range(start: -30d) |> filter(fn: (r) => r["_measurement"] == "metrics") |> group(columns: ["device_id", "agent_name"]) |> last() |> keep(columns: ["device_id", "agent_name"])` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return []map[string]string{{"id": "default", "name": "Default"}}, nil } defer queryResult.Close() deviceMap := make(map[string]string) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() deviceID := record.ValueByKey("device_id").(string) agentName := "" if name, ok := record.ValueByKey("agent_name").(string); ok { agentName = name } if agentName == "" { agentName = deviceID } deviceMap[deviceID] = agentName } if queryResult.Err() != nil { return []map[string]string{{"id": "default", "name": "Default"}}, nil } // 转换为切片 deviceList := make([]map[string]string, 0, len(deviceMap)) for deviceID, agentName := range deviceMap { deviceList = append(deviceList, map[string]string{ "id": deviceID, "name": agentName, }) } // 如果没有设备,返回默认设备 if len(deviceList) == 0 { return []map[string]string{{"id": "default", "name": "Default"}}, nil } return deviceList, nil } // DeviceStatus 设备状态 func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句,获取设备最新的指标数据和agent名称 query := `from(bucket: "` + s.bucket + `") |> range(start: -1h) |> filter(fn: (r) => r["_measurement"] == "metrics") |> filter(fn: (r) => r["device_id"] == "` + deviceID + `") |> last()` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return "", nil, err } defer queryResult.Close() status := make(map[string]float64) agentName := deviceID hasData := false // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } hasData = true // 获取记录 record := queryResult.Record() metricType := record.ValueByKey("type").(string) value := record.Value().(float64) status[metricType] = value // 获取agent名称 if name, ok := record.ValueByKey("agent_name").(string); ok && name != "" { agentName = name } } if queryResult.Err() != nil { return "", nil, queryResult.Err() } if !hasData { // 如果没有数据,返回空状态 return "", map[string]float64{}, nil } return agentName, status, nil }