package storage import ( "context" "fmt" "log" "math/rand" "strconv" "strings" "time" influxdb2 "github.com/influxdata/influxdb-client-go" "github.com/monitor/backend/config" ) // formatTags 将标签映射格式化为InfluxDB行协议格式 func formatTags(tags map[string]string) string { var tagList []string for k, v := range tags { // 跳过空值的标签,避免InfluxDB解析错误 if v == "" { continue } tagList = append(tagList, fmt.Sprintf("%s=%s", k, escapeTagValue(v))) } return strings.Join(tagList, ",") } // escapeTagValue 转义标签值中的特殊字符 func escapeTagValue(value string) string { // 替换逗号、空格和等号为转义后的形式 escaped := strings.ReplaceAll(value, ",", "\\,") escaped = strings.ReplaceAll(escaped, " ", "\\ ") escaped = strings.ReplaceAll(escaped, "=", "\\=") return escaped } // MetricPoint 自定义监控指标点 type MetricPoint struct { Time time.Time `json:"time"` Value float64 `json:"value"` Tags map[string]string `json:"tags,omitempty"` } type Storage struct { client influxdb2.Client org string bucket string } // NewStorage 创建新的存储实例 func NewStorage(cfg *config.Config) *Storage { var client influxdb2.Client // InfluxDB 2.x版本使用Token认证 // 检查Token是否为空 if cfg.InfluxDB.Token != "" { // 使用Token认证(适用于InfluxDB 2.x) client = influxdb2.NewClient(cfg.InfluxDB.URL, cfg.InfluxDB.Token) } else { // Token为空,尝试使用默认客户端 // 注意:InfluxDB 2.x需要有效的Token才能写入数据 log.Printf("Warning: InfluxDB Token is empty, write operations may fail") client = influxdb2.NewClient(cfg.InfluxDB.URL, "") } // 配置InfluxDB客户端选项 options := client.Options() // 禁用InfluxDB客户端的调试日志 options.SetLogLevel(0) return &Storage{ client: client, org: cfg.InfluxDB.Org, bucket: cfg.InfluxDB.Bucket, } } // Close 关闭存储连接 func (s *Storage) Close() { s.client.Close() } // 写入数据到InfluxDB,带重试机制 func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string) error { // 重试配置 - 减少重试次数和延迟,确保在超时时间内完成 maxRetries := 2 baseDelay := 200 * time.Millisecond for i := 0; i <= maxRetries; i++ { // 如果上下文已取消,直接返回 if ctx.Err() != nil { return ctx.Err() } // 写入数据点 writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket) // 构建行协议字符串 var fieldList []string for k, v := range fields { var fieldStr string // 根据字段类型格式化 switch v := v.(type) { case string: fieldStr = fmt.Sprintf("%s=%q", k, v) case float64, int, int32, int64: fieldStr = fmt.Sprintf("%s=%v", k, v) case bool: fieldStr = fmt.Sprintf("%s=%t", k, v) default: // 转换为字符串 fieldStr = fmt.Sprintf("%s=%q", k, fmt.Sprintf("%v", v)) } fieldList = append(fieldList, fieldStr) } line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), time.Now().UnixNano()) err := writeAPI.WriteRecord(ctx, line) if err == nil { // 写入成功,直接返回 return nil } // 如果是最后一次重试,返回错误 if i == maxRetries { return err } // 计算重试延迟(指数退避) delay := baseDelay*time.Duration(1<= 5 { break } portsStr = append(portsStr, fmt.Sprintf("%d", port)) } allTags["ports"] = strings.Join(portsStr, ",") // 创建字段映射 fields := map[string]interface{}{ "cpu_usage": cpu, "memory_usage": memory, "path": path, "cmdline": cmdline, } // 使用新的writeData方法 return s.writeData(ctx, "processes", allTags, fields, deviceID, "process") } // WriteDiskDetailMetric 写入磁盘详细信息 func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskDeviceID, status, diskType string, sizeGB float64, model, interfaceType, description string, tags map[string]string) error { // 创建标签映射,合并原有标签和新标签 allTags := make(map[string]string) // 复制原有标签 for k, v := range tags { allTags[k] = v } // 添加设备ID标签 allTags["device_id"] = deviceID // 添加磁盘相关标签 allTags["disk_id"] = diskDeviceID allTags["status"] = status allTags["type"] = diskType allTags["model"] = model allTags["interface_type"] = interfaceType // 创建字段映射 fields := map[string]interface{}{ "size_gb": sizeGB, "description": description, } // 使用新的writeData方法 return s.writeData(ctx, "disk_details", allTags, fields, deviceID, "disk_detail") } // WriteLogMetric 写入日志指标 func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence int, source string, time time.Time, message string, tags map[string]string) error { // 创建标签映射,合并原有标签和新标签 allTags := make(map[string]string) // 复制原有标签 for k, v := range tags { allTags[k] = v } // 添加设备ID标签 allTags["device_id"] = deviceID // 添加日志来源标签 allTags["source"] = source // 创建字段映射 fields := map[string]interface{}{ "sequence": sequence, "message": message, } // 使用新的writeData方法 return s.writeData(ctx, "logs", allTags, fields, deviceID, "log") } // QueryMetrics 查询监控指标,支持采样 func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string, limit ...int) ([]MetricPoint, error) { queryAPI := s.client.QueryAPI(s.org) // 处理时间参数 processedStartTime := startTime processedEndTime := endTime // 检查并修复时间格式,确保InfluxDB能正确识别 // 对于相对时间(如-24h)或now(),直接使用 // 对于绝对时间,确保格式正确 if len(startTime) > 0 { // 检查是否是相对时间(以-开头或为now()) isRelativeTime := strings.HasPrefix(startTime, "-") || startTime == "now()" if !isRelativeTime { // 是绝对时间,尝试解析为time.Time类型 t, err := time.Parse(time.RFC3339, startTime) if err != nil { // 尝试其他常见格式 t, err = time.Parse("2006-01-02T15:04", startTime) if err != nil { // 尝试YYYY-MM-DD HH:MM格式 t, err = time.Parse("2006-01-02 15:04", startTime) } } if err == nil { // 解析成功,使用RFC3339格式(InfluxDB可以识别) processedStartTime = t.Format(time.RFC3339) } } } if len(endTime) > 0 { // 检查是否是相对时间(以-开头或为now()) isRelativeTime := strings.HasPrefix(endTime, "-") || endTime == "now()" if !isRelativeTime { // 是绝对时间,尝试解析为time.Time类型 t, err := time.Parse(time.RFC3339, endTime) if err != nil { // 尝试其他常见格式 t, err = time.Parse("2006-01-02T15:04", endTime) if err != nil { // 尝试YYYY-MM-DD HH:MM格式 t, err = time.Parse("2006-01-02 15:04", endTime) } } if err == nil { // 解析成功,使用RFC3339格式(InfluxDB可以识别) processedEndTime = t.Format(time.RFC3339) } } } // 构建查询语句,包含标签信息和字段过滤 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + processedStartTime + `, stop: ` + processedEndTime + `) |> filter(fn: (r) => r["_measurement"] == "metrics") |> filter(fn: (r) => r["_field"] == "value")` // 只有当deviceID非空时才添加device_id过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } query += ` |> filter(fn: (r) => r["type"] == "` + metricType + `") |> sort(columns: ["_time"])` // 设置默认限制 maxLimit := 10000 if len(limit) > 0 && limit[0] > 0 { maxLimit = limit[0] } query += ` |> limit(n: ` + strconv.Itoa(maxLimit) + `)` // 限制返回的数据点数量,防止内存溢出 // 执行查询 result, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer result.Close() var points []MetricPoint // 处理查询结果 for result.Next() { // 表结构变化也需要处理数据 // 获取记录 record := result.Record() // 获取所有标签 tags := make(map[string]string) for k, v := range record.Values() { // 跳过内置字段,只保留自定义标签 if k != "_time" && k != "_value" && k != "_field" && k != "_measurement" { if strValue, ok := v.(string); ok { tags[k] = strValue } } } // 转换为自定义MetricPoint point := MetricPoint{ Time: record.Time(), Value: record.Value().(float64), Tags: tags, } points = append(points, point) } if result.Err() != nil { return nil, result.Err() } return points, nil } // DeviceInfo 设备信息 func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句,获取所有唯一的device_id和对应的agent_name query := `from(bucket: "` + s.bucket + `") |> range(start: -30d) |> filter(fn: (r) => r["_measurement"] == "metrics") |> group(columns: ["device_id", "agent_name"]) |> last() |> keep(columns: ["device_id", "agent_name"])` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return []map[string]string{{"id": "default", "name": "Default"}}, nil } defer queryResult.Close() deviceMap := make(map[string]string) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() deviceID := record.ValueByKey("device_id").(string) agentName := "" if name, ok := record.ValueByKey("agent_name").(string); ok { agentName = name } if agentName == "" { agentName = deviceID } deviceMap[deviceID] = agentName } if queryResult.Err() != nil { return []map[string]string{{"id": "default", "name": "Default"}}, nil } // 转换为切片 deviceList := make([]map[string]string, 0, len(deviceMap)) for deviceID, agentName := range deviceMap { deviceList = append(deviceList, map[string]string{ "id": deviceID, "name": agentName, }) } // 如果没有设备,返回默认设备 if len(deviceList) == 0 { return []map[string]string{{"id": "default", "name": "Default"}}, nil } return deviceList, nil } // DeviceStatus 设备状态 func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句,获取设备最新的指标数据和agent名称 query := `from(bucket: "` + s.bucket + `") |> range(start: -1h) |> filter(fn: (r) => r["_measurement"] == "metrics") |> filter(fn: (r) => r["device_id"] == "` + deviceID + `") |> last()` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return "", nil, err } defer queryResult.Close() status := make(map[string]float64) agentName := deviceID hasData := false // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } hasData = true // 获取记录 record := queryResult.Record() metricType := record.ValueByKey("type").(string) value := record.Value().(float64) status[metricType] = value // 获取agent名称 if name, ok := record.ValueByKey("agent_name").(string); ok && name != "" { agentName = name } } if queryResult.Err() != nil { return "", nil, queryResult.Err() } if !hasData { // 如果没有数据,返回空状态 return "", map[string]float64{}, nil } return agentName, status, nil } // QueryProcessMetrics 查询进程指标 func (s *Storage) QueryProcessMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + startTime + `, stop: ` + endTime + `) |> filter(fn: (r) => r["_measurement"] == "processes")` // 如果指定了设备ID,添加设备ID过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } // 获取最新的进程数据 query += ` |> last()` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer queryResult.Close() // 存储进程数据 processes := make([]map[string]interface{}, 0) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() // 构建进程数据 processData := map[string]interface{}{ "time": record.Time(), "device_id": record.ValueByKey("device_id"), "process_name": record.ValueByKey("process_name"), "username": record.ValueByKey("username"), "pid": record.ValueByKey("pid"), "cpu_usage": record.ValueByKey("cpu_usage"), "memory_usage": record.ValueByKey("memory_usage"), "path": record.ValueByKey("path"), "cmdline": record.ValueByKey("cmdline"), "ports": record.ValueByKey("ports"), "agent_name": record.ValueByKey("agent_name"), } // 添加到进程列表 processes = append(processes, processData) } if queryResult.Err() != nil { return nil, queryResult.Err() } return processes, nil } // QueryLogMetrics 查询日志指标 func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + startTime + `, stop: ` + endTime + `) |> filter(fn: (r) => r["_measurement"] == "logs")` // 如果指定了设备ID,添加设备ID过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } // 按时间倒序排列,获取最新的日志 query += ` |> sort(columns: ["_time"], desc: true) |> limit(n: 100)` // 限制返回100条最新日志 // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer queryResult.Close() // 存储日志数据 logs := make([]map[string]interface{}, 0) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() // 构建日志数据 logData := map[string]interface{}{ "time": record.Time(), "device_id": record.ValueByKey("device_id"), "source": record.ValueByKey("source"), "sequence": record.ValueByKey("sequence"), "message": record.ValueByKey("message"), "agent_name": record.ValueByKey("agent_name"), } // 添加到日志列表 logs = append(logs, logData) } if queryResult.Err() != nil { return nil, queryResult.Err() } return logs, nil } // QueryDiskDetails 查询磁盘详细信息 func (s *Storage) QueryDiskDetails(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + startTime + `, stop: ` + endTime + `) |> filter(fn: (r) => r["_measurement"] == "disk_details")` // 如果指定了设备ID,添加设备ID过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } // 获取最新的磁盘详细信息 query += ` |> last()` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer queryResult.Close() // 存储磁盘详细信息 diskDetails := make([]map[string]interface{}, 0) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() // 构建磁盘详细信息 diskData := map[string]interface{}{ "time": record.Time(), "device_id": record.ValueByKey("device_id"), "disk_id": record.ValueByKey("disk_id"), "status": record.ValueByKey("status"), "type": record.ValueByKey("type"), "size_gb": record.ValueByKey("size_gb"), "model": record.ValueByKey("model"), "interface_type": record.ValueByKey("interface_type"), "description": record.ValueByKey("description"), "agent_name": record.ValueByKey("agent_name"), } // 添加到磁盘详细信息列表 diskDetails = append(diskDetails, diskData) } if queryResult.Err() != nil { return nil, queryResult.Err() } return diskDetails, nil }