package storage import ( "context" "fmt" "log" "math/rand" "sort" "strconv" "strings" "time" influxdb2 "github.com/influxdata/influxdb-client-go" "github.com/monitor/backend/config" ) // formatTags 将标签映射格式化为InfluxDB行协议格式 func formatTags(tags map[string]string) string { var tagList []string for k, v := range tags { // 跳过空值的标签,避免InfluxDB解析错误 if v == "" { continue } tagList = append(tagList, fmt.Sprintf("%s=%s", k, escapeTagValue(v))) } return strings.Join(tagList, ",") } // escapeTagValue 转义标签值中的特殊字符 func escapeTagValue(value string) string { // 替换逗号、空格和等号为转义后的形式 escaped := strings.ReplaceAll(value, ",", "\\,") escaped = strings.ReplaceAll(escaped, " ", "\\ ") escaped = strings.ReplaceAll(escaped, "=", "\\=") return escaped } // MetricPoint 自定义监控指标点 type MetricPoint struct { Time time.Time `json:"time"` Value float64 `json:"value"` Tags map[string]string `json:"tags,omitempty"` } type Storage struct { client influxdb2.Client org string bucket string } // NewStorage 创建新的存储实例 func NewStorage(cfg *config.Config) *Storage { var client influxdb2.Client // InfluxDB 2.x版本使用Token认证 // 检查Token是否为空 if cfg.InfluxDB.Token != "" { // 使用Token认证(适用于InfluxDB 2.x) client = influxdb2.NewClient(cfg.InfluxDB.URL, cfg.InfluxDB.Token) } else { // Token为空,尝试使用默认客户端 // 注意:InfluxDB 2.x需要有效的Token才能写入数据 log.Printf("Warning: InfluxDB Token is empty, write operations may fail") client = influxdb2.NewClient(cfg.InfluxDB.URL, "") } // 配置InfluxDB客户端选项 options := client.Options() // 禁用InfluxDB客户端的调试日志 options.SetLogLevel(0) return &Storage{ client: client, org: cfg.InfluxDB.Org, bucket: cfg.InfluxDB.Bucket, } } // Close 关闭存储连接 func (s *Storage) Close() { s.client.Close() } // 写入数据到InfluxDB,带重试机制 func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string, timestamp ...time.Time) error { // 重试配置 - 调整重试策略,减少重试次数,增加单次写入超时 maxRetries := 1 baseDelay := 100 * time.Millisecond for i := 0; i <= maxRetries; i++ { // 创建独立的写入上下文,设置较短的超时时间 writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // 写入数据点 writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket) // 构建行协议字符串 var fieldList []string for k, v := range fields { var fieldStr string // 根据字段类型格式化 switch v := v.(type) { case string: fieldStr = fmt.Sprintf("%s=%q", k, v) case float64, int, int32, int64: fieldStr = fmt.Sprintf("%s=%v", k, v) case bool: fieldStr = fmt.Sprintf("%s=%t", k, v) default: // 转换为字符串 fieldStr = fmt.Sprintf("%s=%q", k, fmt.Sprintf("%v", v)) } fieldList = append(fieldList, fieldStr) } // 确定时间戳 var ts int64 if len(timestamp) > 0 { ts = timestamp[0].UnixNano() } else { ts = time.Now().UnixNano() } line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), ts) err := writeAPI.WriteRecord(writeCtx, line) if err == nil { // 写入成功,直接返回 return nil } // 如果是最后一次重试,返回错误 if i == maxRetries { return err } // 计算重试延迟(指数退避) delay := baseDelay*time.Duration(1<= 5 { break } portsStr = append(portsStr, fmt.Sprintf("%d", port)) } allTags["ports"] = strings.Join(portsStr, ",") // 创建字段映射 fields := map[string]interface{}{ "cpu_usage": cpu, "memory_usage": memory, "path": path, "cmdline": cmdline, } // 使用新的writeData方法 return s.writeData(ctx, "processes", allTags, fields, deviceID, "process") } // WriteDiskDetailMetric 写入磁盘详细信息 func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskPath, status, diskType string, sizeGB float64, model, vendor, interfaceType, fileSystem, diskUUID, description string, tags map[string]string) error { // 创建标签映射,合并原有标签和新标签 allTags := make(map[string]string) // 复制原有标签 for k, v := range tags { allTags[k] = v } // 添加设备ID标签 allTags["device_id"] = deviceID // 添加磁盘相关标签 allTags["disk_path"] = diskPath allTags["status"] = status allTags["type"] = diskType allTags["model"] = model allTags["vendor"] = vendor allTags["interface_type"] = interfaceType allTags["file_system"] = fileSystem // 创建字段映射 fields := map[string]interface{}{ "size_gb": sizeGB, "disk_uuid": diskUUID, "description": description, } // 使用新的writeData方法 return s.writeData(ctx, "disk_details", allTags, fields, deviceID, "disk_detail") } // WriteLogMetric 写入日志指标 func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence int, source string, time time.Time, message string, tags map[string]string) error { // 创建标签映射,合并原有标签和新标签 allTags := make(map[string]string) // 复制原有标签 for k, v := range tags { allTags[k] = v } // 添加设备ID标签 allTags["device_id"] = deviceID // 添加日志来源标签 allTags["source"] = source // 创建字段映射 fields := map[string]interface{}{ "sequence": sequence, "message": message, } // 使用新的writeData方法,传入日志的实际时间 return s.writeData(ctx, "logs", allTags, fields, deviceID, "log", time) } // WriteHardwareMetric 写入硬件信息指标 func (s *Storage) WriteHardwareMetric(ctx context.Context, deviceID string, hardwareType string, hardwareData map[string]interface{}, tags map[string]string) error { // 创建标签映射,合并原有标签和新标签 allTags := make(map[string]string) // 复制原有标签 for k, v := range tags { allTags[k] = v } // 添加设备ID标签 allTags["device_id"] = deviceID // 添加硬件类型标签 allTags["type"] = hardwareType // 使用新的writeData方法 return s.writeData(ctx, "hardware", allTags, hardwareData, deviceID, hardwareType) } // QueryMetrics 查询监控指标,支持采样 func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string, limit ...int) ([]MetricPoint, error) { queryAPI := s.client.QueryAPI(s.org) // 处理时间参数 processedStartTime := startTime processedEndTime := endTime // 检查并修复时间格式,确保InfluxDB能正确识别 // 对于相对时间(如-24h)或now(),直接使用 // 对于绝对时间,确保格式正确 if len(startTime) > 0 { // 检查是否是相对时间(以-开头或为now()) isRelativeTime := strings.HasPrefix(startTime, "-") || startTime == "now()" if !isRelativeTime { // 是绝对时间,尝试解析为time.Time类型 t, err := time.Parse(time.RFC3339, startTime) if err != nil { // 尝试其他常见格式 t, err = time.Parse("2006-01-02T15:04", startTime) if err != nil { // 尝试YYYY-MM-DD HH:MM格式 t, err = time.Parse("2006-01-02 15:04", startTime) } } if err == nil { // 解析成功,使用RFC3339格式(InfluxDB可以识别) processedStartTime = t.Format(time.RFC3339) } } } if len(endTime) > 0 { // 检查是否是相对时间(以-开头或为now()) isRelativeTime := strings.HasPrefix(endTime, "-") || endTime == "now()" if !isRelativeTime { // 是绝对时间,尝试解析为time.Time类型 t, err := time.Parse(time.RFC3339, endTime) if err != nil { // 尝试其他常见格式 t, err = time.Parse("2006-01-02T15:04", endTime) if err != nil { // 尝试YYYY-MM-DD HH:MM格式 t, err = time.Parse("2006-01-02 15:04", endTime) } } if err == nil { // 解析成功,使用RFC3339格式(InfluxDB可以识别) processedEndTime = t.Format(time.RFC3339) } } } // 构建查询语句,包含标签信息和字段过滤 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + processedStartTime + `, stop: ` + processedEndTime + `) |> filter(fn: (r) => r["_measurement"] == "metrics") |> filter(fn: (r) => r["_field"] == "value")` // 只有当deviceID非空时才添加device_id过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } query += ` |> filter(fn: (r) => r["type"] == "` + metricType + `") |> sort(columns: ["_time"])` // 设置默认限制 maxLimit := 10000 if len(limit) > 0 && limit[0] > 0 { maxLimit = limit[0] } query += ` |> limit(n: ` + strconv.Itoa(maxLimit) + `)` // 限制返回的数据点数量,防止内存溢出 // 执行查询 result, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer result.Close() var points []MetricPoint // 处理查询结果 for result.Next() { // 表结构变化也需要处理数据 // 获取记录 record := result.Record() // 获取所有标签 tags := make(map[string]string) for k, v := range record.Values() { // 跳过内置字段,只保留自定义标签 if k != "_time" && k != "_value" && k != "_field" && k != "_measurement" { if strValue, ok := v.(string); ok { tags[k] = strValue } } } // 转换为自定义MetricPoint point := MetricPoint{ Time: record.Time(), Value: record.Value().(float64), Tags: tags, } points = append(points, point) } if result.Err() != nil { return nil, result.Err() } return points, nil } // DeviceInfo 设备信息 func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句,获取所有唯一的device_id和对应的agent_name query := `from(bucket: "` + s.bucket + `") |> range(start: -30d) |> filter(fn: (r) => r["_measurement"] == "metrics") |> group(columns: ["device_id", "agent_name"]) |> last() |> keep(columns: ["device_id", "agent_name"])` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return []map[string]string{{"id": "default", "name": "Default"}}, nil } defer queryResult.Close() deviceMap := make(map[string]string) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() deviceID := record.ValueByKey("device_id").(string) agentName := "" if name, ok := record.ValueByKey("agent_name").(string); ok { agentName = name } if agentName == "" { agentName = deviceID } deviceMap[deviceID] = agentName } if queryResult.Err() != nil { return []map[string]string{{"id": "default", "name": "Default"}}, nil } // 转换为切片 deviceList := make([]map[string]string, 0, len(deviceMap)) for deviceID, agentName := range deviceMap { deviceList = append(deviceList, map[string]string{ "id": deviceID, "name": agentName, }) } // 如果没有设备,返回默认设备 if len(deviceList) == 0 { return []map[string]string{{"id": "default", "name": "Default"}}, nil } return deviceList, nil } // QueryHardwareMetrics 查询硬件信息指标 func (s *Storage) QueryHardwareMetrics(ctx context.Context, deviceID string) (map[string]interface{}, error) { queryAPI := s.client.QueryAPI(s.org) // 初始化硬件信息结果 hardwareInfo := make(map[string]interface{}) // 定义要查询的硬件类型列表 hardwareTypes := []string{"os", "cpu", "memory", "disk", "network"} // 为每个硬件类型单独查询,避免类型冲突 for _, hardwareType := range hardwareTypes { // 构建查询语句 var query string if hardwareType == "os" || hardwareType == "cpu" || hardwareType == "memory" { // 对于os、cpu、memory类型,我们只需要一个结果 query = `from(bucket: "` + s.bucket + `") |> range(start: -24h) |> filter(fn: (r) => r["_measurement"] == "hardware") |> filter(fn: (r) => r["device_id"] == "` + deviceID + `") |> filter(fn: (r) => r["type"] == "` + hardwareType + `") |> last()` } else { // 对于disk和network类型,我们需要获取所有设备的所有字段记录 query = `from(bucket: "` + s.bucket + `") |> range(start: -24h) |> filter(fn: (r) => r["_measurement"] == "hardware") |> filter(fn: (r) => r["device_id"] == "` + deviceID + `") |> filter(fn: (r) => r["type"] == "` + hardwareType + `")` } // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { // 查询单个硬件类型失败,继续查询其他类型 log.Printf("Warning: Failed to query %s hardware metrics: %v", hardwareType, err) continue } defer queryResult.Close() // 对于os、cpu、memory类型,我们只需要一个结果 if hardwareType == "os" || hardwareType == "cpu" || hardwareType == "memory" { // 获取字段值映射 fieldMap := make(map[string]interface{}) // 遍历所有结果行,收集所有字段 for queryResult.Next() { record := queryResult.Record() fieldName := record.Field() fieldValue := record.Value() fieldMap[fieldName] = fieldValue } // 如果有字段数据,添加到硬件信息结果中 if len(fieldMap) > 0 { hardwareInfo[hardwareType] = fieldMap } } else { // 对于disk和network类型,我们可能有多个结果,每个字段作为单独的行返回 var allResults []map[string]interface{} // 创建一个map来存储每个设备的所有字段值 // key是设备ID或index deviceFields := make(map[string]map[string]interface{}) // 遍历所有结果行,收集所有结果 for queryResult.Next() { // 获取记录 record := queryResult.Record() // 获取字段名和值 fieldName := record.Field() fieldValue := record.Value() // 获取设备唯一标识 deviceKey := "" // 优先使用id字段作为设备标识 if id, isString := fieldValue.(string); fieldName == "id" && isString { deviceKey = id // 创建设备字段映射 deviceFields[deviceKey] = make(map[string]interface{}) deviceFields[deviceKey]["id"] = id } else { // 否则,查找已存在的设备映射 // 遍历所有设备映射,查找当前记录对应的设备 for key, fields := range deviceFields { // 如果设备映射中没有id字段,或者id字段为空,跳过 if id, ok := fields["id"].(string); ok && id != "" { // 假设当前记录属于该设备 deviceKey = key break } } } // 如果没有找到设备标识,使用index作为设备标识 if deviceKey == "" { index := 0 if idx, ok := record.ValueByKey("index").(int); ok { index = idx } deviceKey = fmt.Sprintf("%d", index) // 创建设备字段映射 deviceFields[deviceKey] = make(map[string]interface{}) } // 添加字段值到设备字段映射中 deviceFields[deviceKey][fieldName] = fieldValue } // 将deviceFields转换为切片 for _, fields := range deviceFields { allResults = append(allResults, fields) } // 按id排序 sort.Slice(allResults, func(i, j int) bool { iID, iOk := allResults[i]["id"].(string) jID, jOk := allResults[j]["id"].(string) if iOk && jOk { return iID < jID } return false }) // 如果有结果,添加到硬件信息结果中 if len(allResults) > 0 { hardwareInfo[hardwareType] = allResults } } // 检查查询结果是否有错误 if queryResult.Err() != nil { // 查询单个硬件类型失败,继续查询其他类型 log.Printf("Warning: Failed to process %s hardware query result: %v", hardwareType, queryResult.Err()) } } return hardwareInfo, nil } // DeviceStatus 设备状态 func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句,获取设备最新的指标数据和agent名称 query := `from(bucket: "` + s.bucket + `") |> range(start: -1h) |> filter(fn: (r) => r["_measurement"] == "metrics") |> filter(fn: (r) => r["device_id"] == "` + deviceID + `") |> last()` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return "", nil, err } defer queryResult.Close() status := make(map[string]float64) agentName := deviceID hasData := false // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } hasData = true // 获取记录 record := queryResult.Record() metricType := record.ValueByKey("type").(string) value := record.Value().(float64) status[metricType] = value // 获取agent名称 if name, ok := record.ValueByKey("agent_name").(string); ok && name != "" { agentName = name } } if queryResult.Err() != nil { return "", nil, queryResult.Err() } if !hasData { // 如果没有数据,返回空状态 return "", map[string]float64{}, nil } return agentName, status, nil } // QueryProcessMetrics 查询进程指标 func (s *Storage) QueryProcessMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + startTime + `, stop: ` + endTime + `) |> filter(fn: (r) => r["_measurement"] == "processes")` // 如果指定了设备ID,添加设备ID过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } // 获取最新的进程数据 query += ` |> last()` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer queryResult.Close() // 存储进程数据 processes := make([]map[string]interface{}, 0) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() // 构建进程数据 processData := map[string]interface{}{ "time": record.Time(), "device_id": record.ValueByKey("device_id"), "process_name": record.ValueByKey("process_name"), "username": record.ValueByKey("username"), "pid": record.ValueByKey("pid"), "cpu_usage": record.ValueByKey("cpu_usage"), "memory_usage": record.ValueByKey("memory_usage"), "path": record.ValueByKey("path"), "cmdline": record.ValueByKey("cmdline"), "ports": record.ValueByKey("ports"), "agent_name": record.ValueByKey("agent_name"), } // 添加到进程列表 processes = append(processes, processData) } if queryResult.Err() != nil { return nil, queryResult.Err() } return processes, nil } // QueryLogMetrics 查询日志指标 func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + startTime + `, stop: ` + endTime + `) |> filter(fn: (r) => r["_measurement"] == "logs")` // 如果指定了设备ID,添加设备ID过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } // 按时间倒序排列,获取最新的日志 query += ` |> sort(columns: ["_time"], desc: true) |> limit(n: 200)` // 限制返回200条记录(因为message和sequence是分开存储的) // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer queryResult.Close() // 使用map存储日志数据,key是时间戳和source的组合 logMap := make(map[string]map[string]interface{}) // 处理查询结果 for queryResult.Next() { // 获取记录 record := queryResult.Record() // 获取时间和source作为唯一键 timeStr := record.Time().Format(time.RFC3339Nano) source := record.ValueByKey("source").(string) key := timeStr + "-" + source // 检查是否已经有这个日志条目的基础信息 logData, exists := logMap[key] if !exists { // 创建新的日志条目 logData = map[string]interface{}{ "time": record.Time(), "device_id": record.ValueByKey("device_id"), "source": source, "sequence": nil, "message": nil, "agent_name": record.ValueByKey("agent_name"), } logMap[key] = logData } // 根据字段类型更新相应的值 field := record.Field() switch field { case "message": logData["message"] = record.Value() case "sequence": logData["sequence"] = record.Value() } } if queryResult.Err() != nil { return nil, queryResult.Err() } // 将map转换为切片 logs := make([]map[string]interface{}, 0, len(logMap)) for _, logData := range logMap { logs = append(logs, logData) } // 按时间倒序排序 sort.Slice(logs, func(i, j int) bool { timeI := logs[i]["time"].(time.Time) timeJ := logs[j]["time"].(time.Time) return timeI.After(timeJ) }) // 限制返回100条最新日志 if len(logs) > 100 { logs = logs[:100] } return logs, nil } // QueryDiskDetails 查询磁盘详细信息 func (s *Storage) QueryDiskDetails(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) { queryAPI := s.client.QueryAPI(s.org) // 构建查询语句 query := `from(bucket: "` + s.bucket + `") |> range(start: ` + startTime + `, stop: ` + endTime + `) |> filter(fn: (r) => r["_measurement"] == "disk_details")` // 如果指定了设备ID,添加设备ID过滤 if deviceID != "" { query += ` |> filter(fn: (r) => r["device_id"] == "` + deviceID + `")` } // 获取最新的磁盘详细信息 query += ` |> last()` // 执行查询 queryResult, err := queryAPI.Query(ctx, query) if err != nil { return nil, err } defer queryResult.Close() // 存储磁盘详细信息 diskDetails := make([]map[string]interface{}, 0) // 处理查询结果 for queryResult.Next() { if queryResult.TableChanged() { // 表结构变化,跳过 continue } // 获取记录 record := queryResult.Record() // 构建磁盘详细信息 diskData := map[string]interface{}{ "time": record.Time(), "device_id": record.ValueByKey("device_id"), "disk_id": record.ValueByKey("disk_id"), "status": record.ValueByKey("status"), "type": record.ValueByKey("type"), "size_gb": record.ValueByKey("size_gb"), "model": record.ValueByKey("model"), "vendor": record.ValueByKey("vendor"), "interface_type": record.ValueByKey("interface_type"), "file_system": record.ValueByKey("file_system"), "description": record.ValueByKey("description"), "agent_name": record.ValueByKey("agent_name"), } // 添加到磁盘详细信息列表 diskDetails = append(diskDetails, diskData) } if queryResult.Err() != nil { return nil, queryResult.Err() } return diskDetails, nil }