Files
monitor/backend/internal/storage/storage.go
2025-12-05 00:03:44 +08:00

676 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package storage
import (
"context"
"fmt"
"log"
"math/rand"
"strings"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/monitor/backend/config"
)
// formatTags 将标签映射格式化为InfluxDB行协议格式
func formatTags(tags map[string]string) string {
var tagList []string
for k, v := range tags {
// 跳过空值的标签避免InfluxDB解析错误
if v == "" {
continue
}
tagList = append(tagList, fmt.Sprintf("%s=%s", k, escapeTagValue(v)))
}
return strings.Join(tagList, ",")
}
// escapeTagValue 转义标签值中的特殊字符
func escapeTagValue(value string) string {
// 替换逗号、空格和等号为转义后的形式
escaped := strings.ReplaceAll(value, ",", "\\,")
escaped = strings.ReplaceAll(escaped, " ", "\\ ")
escaped = strings.ReplaceAll(escaped, "=", "\\=")
return escaped
}
// MetricPoint 自定义监控指标点
type MetricPoint struct {
Time time.Time `json:"time"`
Value float64 `json:"value"`
Tags map[string]string `json:"tags,omitempty"`
}
type Storage struct {
client influxdb2.Client
org string
bucket string
}
// NewStorage 创建新的存储实例
func NewStorage(cfg *config.Config) *Storage {
var client influxdb2.Client
// InfluxDB 2.x版本使用Token认证
// 检查Token是否为空
if cfg.InfluxDB.Token != "" {
// 使用Token认证适用于InfluxDB 2.x
client = influxdb2.NewClient(cfg.InfluxDB.URL, cfg.InfluxDB.Token)
} else {
// Token为空尝试使用默认客户端
// 注意InfluxDB 2.x需要有效的Token才能写入数据
log.Printf("Warning: InfluxDB Token is empty, write operations may fail")
client = influxdb2.NewClient(cfg.InfluxDB.URL, "")
}
// 配置InfluxDB客户端选项
options := client.Options()
// 禁用InfluxDB客户端的调试日志
options.SetLogLevel(0)
return &Storage{
client: client,
org: cfg.InfluxDB.Org,
bucket: cfg.InfluxDB.Bucket,
}
}
// Close 关闭存储连接
func (s *Storage) Close() {
s.client.Close()
}
// 写入数据到InfluxDB带重试机制
func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string) error {
// 重试配置 - 减少重试次数和延迟,确保在超时时间内完成
maxRetries := 2
baseDelay := 200 * time.Millisecond
for i := 0; i <= maxRetries; i++ {
// 如果上下文已取消,直接返回
if ctx.Err() != nil {
return ctx.Err()
}
// 写入数据点
writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket)
// 构建行协议字符串
var fieldList []string
for k, v := range fields {
var fieldStr string
// 根据字段类型格式化
switch v := v.(type) {
case string:
fieldStr = fmt.Sprintf("%s=%q", k, v)
case float64, int, int32, int64:
fieldStr = fmt.Sprintf("%s=%v", k, v)
case bool:
fieldStr = fmt.Sprintf("%s=%t", k, v)
default:
// 转换为字符串
fieldStr = fmt.Sprintf("%s=%q", k, fmt.Sprintf("%v", v))
}
fieldList = append(fieldList, fieldStr)
}
line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), time.Now().UnixNano())
err := writeAPI.WriteRecord(ctx, line)
if err == nil {
// 写入成功,直接返回
return nil
}
// 如果是最后一次重试,返回错误
if i == maxRetries {
return err
}
// 计算重试延迟(指数退避)
delay := baseDelay*time.Duration(1<<i) + time.Duration(rand.Intn(50))*time.Millisecond
log.Printf("Warning: InfluxDB write failed for device %s, metric %s, retrying in %v... (Attempt %d/%d)\nError: %v", deviceID, metricType, delay, i+1, maxRetries, err)
// 等待重试
select {
case <-time.After(delay):
// 继续重试
case <-ctx.Done():
// 上下文取消,返回错误
return ctx.Err()
}
}
return nil
}
// WriteMetric 写入监控指标,带重试机制
func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string, value float64, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加指标类型标签
allTags["type"] = metricType
// 创建字段映射
fields := map[string]interface{}{
"value": value,
}
// 使用新的writeData方法
return s.writeData(ctx, "metrics", allTags, fields, deviceID, metricType)
}
// WriteProcessMetric 写入进程指标
func (s *Storage) WriteProcessMetric(ctx context.Context, deviceID string, processName, username string, pid int32, cpu, memory float64, path, cmdline string, ports []int, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加进程相关标签
allTags["process_name"] = processName
allTags["username"] = username
allTags["pid"] = fmt.Sprintf("%d", pid)
// 处理端口标签只取前5个端口
portsStr := make([]string, 0, len(ports))
for i, port := range ports {
if i >= 5 {
break
}
portsStr = append(portsStr, fmt.Sprintf("%d", port))
}
allTags["ports"] = strings.Join(portsStr, ",")
// 创建字段映射
fields := map[string]interface{}{
"cpu_usage": cpu,
"memory_usage": memory,
"path": path,
"cmdline": cmdline,
}
// 使用新的writeData方法
return s.writeData(ctx, "processes", allTags, fields, deviceID, "process")
}
// WriteDiskDetailMetric 写入磁盘详细信息
func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskDeviceID, status, diskType string, sizeGB float64, model, interfaceType, description string, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加磁盘相关标签
allTags["disk_id"] = diskDeviceID
allTags["status"] = status
allTags["type"] = diskType
allTags["model"] = model
allTags["interface_type"] = interfaceType
// 创建字段映射
fields := map[string]interface{}{
"size_gb": sizeGB,
"description": description,
}
// 使用新的writeData方法
return s.writeData(ctx, "disk_details", allTags, fields, deviceID, "disk_detail")
}
// WriteLogMetric 写入日志指标
func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence int, source string, time time.Time, message string, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加日志来源标签
allTags["source"] = source
// 创建字段映射
fields := map[string]interface{}{
"sequence": sequence,
"message": message,
}
// 使用新的writeData方法
return s.writeData(ctx, "logs", allTags, fields, deviceID, "log")
}
// QueryMetrics 查询监控指标
func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string) ([]MetricPoint, error) {
queryAPI := s.client.QueryAPI(s.org)
// 处理时间参数
processedStartTime := startTime
processedEndTime := endTime
// 检查并修复时间格式确保InfluxDB能正确识别
// 对于相对时间(如-24h或now(),直接使用
// 对于绝对时间,确保格式正确
if len(startTime) > 0 {
// 检查是否是相对时间(以-开头或为now()
isRelativeTime := strings.HasPrefix(startTime, "-") || startTime == "now()"
if !isRelativeTime {
// 是绝对时间尝试解析为time.Time类型
t, err := time.Parse(time.RFC3339, startTime)
if err != nil {
// 尝试其他常见格式
t, err = time.Parse("2006-01-02T15:04", startTime)
if err != nil {
// 尝试YYYY-MM-DD HH:MM格式
t, err = time.Parse("2006-01-02 15:04", startTime)
}
}
if err == nil {
// 解析成功使用RFC3339格式InfluxDB可以识别
processedStartTime = t.Format(time.RFC3339)
}
}
}
if len(endTime) > 0 {
// 检查是否是相对时间(以-开头或为now()
isRelativeTime := strings.HasPrefix(endTime, "-") || endTime == "now()"
if !isRelativeTime {
// 是绝对时间尝试解析为time.Time类型
t, err := time.Parse(time.RFC3339, endTime)
if err != nil {
// 尝试其他常见格式
t, err = time.Parse("2006-01-02T15:04", endTime)
if err != nil {
// 尝试YYYY-MM-DD HH:MM格式
t, err = time.Parse("2006-01-02 15:04", endTime)
}
}
if err == nil {
// 解析成功使用RFC3339格式InfluxDB可以识别
processedEndTime = t.Format(time.RFC3339)
}
}
}
// 构建查询语句,包含标签信息和字段过滤
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + processedStartTime + `, stop: ` + processedEndTime + `)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> filter(fn: (r) => r["_field"] == "value")`
// 只有当deviceID非空时才添加device_id过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
query += `
|> filter(fn: (r) => r["type"] == "` + metricType + `")
|> sort(columns: ["_time"])`
// 执行查询
result, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer result.Close()
var points []MetricPoint
// 处理查询结果
for result.Next() {
// 表结构变化也需要处理数据
// 获取记录
record := result.Record()
// 获取所有标签
tags := make(map[string]string)
for k, v := range record.Values() {
// 跳过内置字段,只保留自定义标签
if k != "_time" && k != "_value" && k != "_field" && k != "_measurement" {
if strValue, ok := v.(string); ok {
tags[k] = strValue
}
}
}
// 转换为自定义MetricPoint
point := MetricPoint{
Time: record.Time(),
Value: record.Value().(float64),
Tags: tags,
}
points = append(points, point)
}
if result.Err() != nil {
return nil, result.Err()
}
return points, nil
}
// DeviceInfo 设备信息
func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句获取所有唯一的device_id和对应的agent_name
query := `from(bucket: "` + s.bucket + `")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> group(columns: ["device_id", "agent_name"])
|> last()
|> keep(columns: ["device_id", "agent_name"])`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
defer queryResult.Close()
deviceMap := make(map[string]string)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
deviceID := record.ValueByKey("device_id").(string)
agentName := ""
if name, ok := record.ValueByKey("agent_name").(string); ok {
agentName = name
}
if agentName == "" {
agentName = deviceID
}
deviceMap[deviceID] = agentName
}
if queryResult.Err() != nil {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
// 转换为切片
deviceList := make([]map[string]string, 0, len(deviceMap))
for deviceID, agentName := range deviceMap {
deviceList = append(deviceList, map[string]string{
"id": deviceID,
"name": agentName,
})
}
// 如果没有设备,返回默认设备
if len(deviceList) == 0 {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
return deviceList, nil
}
// DeviceStatus 设备状态
func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句获取设备最新的指标数据和agent名称
query := `from(bucket: "` + s.bucket + `")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return "", nil, err
}
defer queryResult.Close()
status := make(map[string]float64)
agentName := deviceID
hasData := false
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
hasData = true
// 获取记录
record := queryResult.Record()
metricType := record.ValueByKey("type").(string)
value := record.Value().(float64)
status[metricType] = value
// 获取agent名称
if name, ok := record.ValueByKey("agent_name").(string); ok && name != "" {
agentName = name
}
}
if queryResult.Err() != nil {
return "", nil, queryResult.Err()
}
if !hasData {
// 如果没有数据,返回空状态
return "", map[string]float64{}, nil
}
return agentName, status, nil
}
// QueryProcessMetrics 查询进程指标
func (s *Storage) QueryProcessMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|> filter(fn: (r) => r["_measurement"] == "processes")`
// 如果指定了设备ID添加设备ID过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
// 获取最新的进程数据
query += `
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer queryResult.Close()
// 存储进程数据
processes := make([]map[string]interface{}, 0)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
// 构建进程数据
processData := map[string]interface{}{
"time": record.Time(),
"device_id": record.ValueByKey("device_id"),
"process_name": record.ValueByKey("process_name"),
"username": record.ValueByKey("username"),
"pid": record.ValueByKey("pid"),
"cpu_usage": record.ValueByKey("cpu_usage"),
"memory_usage": record.ValueByKey("memory_usage"),
"path": record.ValueByKey("path"),
"cmdline": record.ValueByKey("cmdline"),
"ports": record.ValueByKey("ports"),
"agent_name": record.ValueByKey("agent_name"),
}
// 添加到进程列表
processes = append(processes, processData)
}
if queryResult.Err() != nil {
return nil, queryResult.Err()
}
return processes, nil
}
// QueryLogMetrics 查询日志指标
func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|> filter(fn: (r) => r["_measurement"] == "logs")`
// 如果指定了设备ID添加设备ID过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
// 按时间倒序排列,获取最新的日志
query += `
|> sort(columns: ["_time"], desc: true)
|> limit(n: 100)` // 限制返回100条最新日志
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer queryResult.Close()
// 存储日志数据
logs := make([]map[string]interface{}, 0)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
// 构建日志数据
logData := map[string]interface{}{
"time": record.Time(),
"device_id": record.ValueByKey("device_id"),
"source": record.ValueByKey("source"),
"sequence": record.ValueByKey("sequence"),
"message": record.ValueByKey("message"),
"agent_name": record.ValueByKey("agent_name"),
}
// 添加到日志列表
logs = append(logs, logData)
}
if queryResult.Err() != nil {
return nil, queryResult.Err()
}
return logs, nil
}
// QueryDiskDetails 查询磁盘详细信息
func (s *Storage) QueryDiskDetails(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|> filter(fn: (r) => r["_measurement"] == "disk_details")`
// 如果指定了设备ID添加设备ID过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
// 获取最新的磁盘详细信息
query += `
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer queryResult.Close()
// 存储磁盘详细信息
diskDetails := make([]map[string]interface{}, 0)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
// 构建磁盘详细信息
diskData := map[string]interface{}{
"time": record.Time(),
"device_id": record.ValueByKey("device_id"),
"disk_id": record.ValueByKey("disk_id"),
"status": record.ValueByKey("status"),
"type": record.ValueByKey("type"),
"size_gb": record.ValueByKey("size_gb"),
"model": record.ValueByKey("model"),
"interface_type": record.ValueByKey("interface_type"),
"description": record.ValueByKey("description"),
"agent_name": record.ValueByKey("agent_name"),
}
// 添加到磁盘详细信息列表
diskDetails = append(diskDetails, diskData)
}
if queryResult.Err() != nil {
return nil, queryResult.Err()
}
return diskDetails, nil
}