增加日志进程等信息采集

This commit is contained in:
Alex Yang
2025-12-04 16:29:05 +08:00
parent 1a80c5acb8
commit 057a2ea9ee
15 changed files with 2090 additions and 379 deletions

View File

@@ -2,7 +2,9 @@ package storage
import (
"context"
"fmt"
"log"
"math/rand"
"strings"
"time"
@@ -10,6 +12,28 @@ import (
"github.com/monitor/backend/config"
)
// formatTags 将标签映射格式化为InfluxDB行协议格式
func formatTags(tags map[string]string) string {
var tagList []string
for k, v := range tags {
// 跳过空值的标签避免InfluxDB解析错误
if v == "" {
continue
}
tagList = append(tagList, fmt.Sprintf("%s=%s", k, escapeTagValue(v)))
}
return strings.Join(tagList, ",")
}
// escapeTagValue 转义标签值中的特殊字符
func escapeTagValue(value string) string {
// 替换逗号、空格和等号为转义后的形式
escaped := strings.ReplaceAll(value, ",", "\\,")
escaped = strings.ReplaceAll(escaped, " ", "\\ ")
escaped = strings.ReplaceAll(escaped, "=", "\\=")
return escaped
}
// MetricPoint 自定义监控指标点
type MetricPoint struct {
Time time.Time `json:"time"`
@@ -39,8 +63,10 @@ func NewStorage(cfg *config.Config) *Storage {
client = influxdb2.NewClient(cfg.InfluxDB.URL, "")
}
// 配置InfluxDB客户端选项
options := client.Options()
// 禁用InfluxDB客户端的调试日志
client.Options().SetLogLevel(0)
options.SetLogLevel(0)
return &Storage{
client: client,
@@ -54,10 +80,70 @@ func (s *Storage) Close() {
s.client.Close()
}
// WriteMetric 写入监控指标
func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string, value float64, tags map[string]string) error {
writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket)
// 写入数据到InfluxDB带重试机制
func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string) error {
// 重试配置 - 减少重试次数和延迟,确保在超时时间内完成
maxRetries := 2
baseDelay := 200 * time.Millisecond
for i := 0; i <= maxRetries; i++ {
// 如果上下文已取消,直接返回
if ctx.Err() != nil {
return ctx.Err()
}
// 写入数据点
writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket)
// 构建行协议字符串
var fieldList []string
for k, v := range fields {
var fieldStr string
// 根据字段类型格式化
switch v := v.(type) {
case string:
fieldStr = fmt.Sprintf("%s=%q", k, v)
case float64, int, int32, int64:
fieldStr = fmt.Sprintf("%s=%v", k, v)
case bool:
fieldStr = fmt.Sprintf("%s=%t", k, v)
default:
// 转换为字符串
fieldStr = fmt.Sprintf("%s=%q", k, fmt.Sprintf("%v", v))
}
fieldList = append(fieldList, fieldStr)
}
line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), time.Now().UnixNano())
err := writeAPI.WriteRecord(ctx, line)
if err == nil {
// 写入成功,直接返回
return nil
}
// 如果是最后一次重试,返回错误
if i == maxRetries {
return err
}
// 计算重试延迟(指数退避)
delay := baseDelay*time.Duration(1<<i) + time.Duration(rand.Intn(50))*time.Millisecond
log.Printf("Warning: InfluxDB write failed for device %s, metric %s, retrying in %v... (Attempt %d/%d)\nError: %v", deviceID, metricType, delay, i+1, maxRetries, err)
// 等待重试
select {
case <-time.After(delay):
// 继续重试
case <-ctx.Done():
// 上下文取消,返回错误
return ctx.Err()
}
}
return nil
}
// WriteMetric 写入监控指标,带重试机制
func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string, value float64, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
@@ -69,18 +155,77 @@ func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string,
// 添加指标类型标签
allTags["type"] = metricType
// 创建数据点
point := influxdb2.NewPoint(
"metrics",
allTags,
map[string]interface{}{
"value": value,
},
time.Now(),
)
// 创建字段映射
fields := map[string]interface{}{
"value": value,
}
// 写入数据点
return writeAPI.WritePoint(ctx, point)
// 使用新的writeData方法
return s.writeData(ctx, "metrics", allTags, fields, deviceID, metricType)
}
// WriteProcessMetric 写入进程指标
func (s *Storage) WriteProcessMetric(ctx context.Context, deviceID string, processName, username string, pid int32, cpu, memory float64, path, cmdline string, ports []int, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加进程相关标签
allTags["process_name"] = processName
allTags["username"] = username
allTags["pid"] = fmt.Sprintf("%d", pid)
// 处理端口标签只取前5个端口
portsStr := make([]string, 0, len(ports))
for i, port := range ports {
if i >= 5 {
break
}
portsStr = append(portsStr, fmt.Sprintf("%d", port))
}
allTags["ports"] = strings.Join(portsStr, ",")
// 创建字段映射
fields := map[string]interface{}{
"cpu_usage": cpu,
"memory_usage": memory,
"path": path,
"cmdline": cmdline,
}
// 使用新的writeData方法
return s.writeData(ctx, "processes", allTags, fields, deviceID, "process")
}
// WriteDiskDetailMetric 写入磁盘详细信息
func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskDeviceID, status, diskType string, sizeGB float64, model, interfaceType, description string, tags map[string]string) error {
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加磁盘相关标签
allTags["disk_id"] = diskDeviceID
allTags["status"] = status
allTags["type"] = diskType
allTags["model"] = model
allTags["interface_type"] = interfaceType
// 创建字段映射
fields := map[string]interface{}{
"size_gb": sizeGB,
"description": description,
}
// 使用新的writeData方法
return s.writeData(ctx, "disk_details", allTags, fields, deviceID, "disk_detail")
}
// QueryMetrics 查询监控指标
@@ -315,3 +460,132 @@ func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (strin
return agentName, status, nil
}
// QueryProcessMetrics 查询进程指标
func (s *Storage) QueryProcessMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|> filter(fn: (r) => r["_measurement"] == "processes")`
// 如果指定了设备ID添加设备ID过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
// 获取最新的进程数据
query += `
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer queryResult.Close()
// 存储进程数据
processes := make([]map[string]interface{}, 0)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
// 构建进程数据
processData := map[string]interface{}{
"time": record.Time(),
"device_id": record.ValueByKey("device_id"),
"process_name": record.ValueByKey("process_name"),
"username": record.ValueByKey("username"),
"pid": record.ValueByKey("pid"),
"cpu_usage": record.ValueByKey("cpu_usage"),
"memory_usage": record.ValueByKey("memory_usage"),
"path": record.ValueByKey("path"),
"cmdline": record.ValueByKey("cmdline"),
"ports": record.ValueByKey("ports"),
"agent_name": record.ValueByKey("agent_name"),
}
// 添加到进程列表
processes = append(processes, processData)
}
if queryResult.Err() != nil {
return nil, queryResult.Err()
}
return processes, nil
}
// QueryDiskDetails 查询磁盘详细信息
func (s *Storage) QueryDiskDetails(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|> filter(fn: (r) => r["_measurement"] == "disk_details")`
// 如果指定了设备ID添加设备ID过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
// 获取最新的磁盘详细信息
query += `
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer queryResult.Close()
// 存储磁盘详细信息
diskDetails := make([]map[string]interface{}, 0)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
// 构建磁盘详细信息
diskData := map[string]interface{}{
"time": record.Time(),
"device_id": record.ValueByKey("device_id"),
"disk_id": record.ValueByKey("disk_id"),
"status": record.ValueByKey("status"),
"type": record.ValueByKey("type"),
"size_gb": record.ValueByKey("size_gb"),
"model": record.ValueByKey("model"),
"interface_type": record.ValueByKey("interface_type"),
"description": record.ValueByKey("description"),
"agent_name": record.ValueByKey("agent_name"),
}
// 添加到磁盘详细信息列表
diskDetails = append(diskDetails, diskData)
}
if queryResult.Err() != nil {
return nil, queryResult.Err()
}
return diskDetails, nil
}