885 lines
25 KiB
Go
885 lines
25 KiB
Go
package storage
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"math/rand"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
influxdb2 "github.com/influxdata/influxdb-client-go"
|
||
"github.com/monitor/backend/config"
|
||
)
|
||
|
||
// formatTags 将标签映射格式化为InfluxDB行协议格式
|
||
func formatTags(tags map[string]string) string {
|
||
var tagList []string
|
||
for k, v := range tags {
|
||
// 跳过空值的标签,避免InfluxDB解析错误
|
||
if v == "" {
|
||
continue
|
||
}
|
||
tagList = append(tagList, fmt.Sprintf("%s=%s", k, escapeTagValue(v)))
|
||
}
|
||
return strings.Join(tagList, ",")
|
||
}
|
||
|
||
// escapeTagValue 转义标签值中的特殊字符
|
||
func escapeTagValue(value string) string {
|
||
// 替换逗号、空格和等号为转义后的形式
|
||
escaped := strings.ReplaceAll(value, ",", "\\,")
|
||
escaped = strings.ReplaceAll(escaped, " ", "\\ ")
|
||
escaped = strings.ReplaceAll(escaped, "=", "\\=")
|
||
return escaped
|
||
}
|
||
|
||
// MetricPoint 自定义监控指标点
|
||
type MetricPoint struct {
|
||
Time time.Time `json:"time"`
|
||
Value float64 `json:"value"`
|
||
Tags map[string]string `json:"tags,omitempty"`
|
||
}
|
||
|
||
type Storage struct {
|
||
client influxdb2.Client
|
||
org string
|
||
bucket string
|
||
}
|
||
|
||
// NewStorage 创建新的存储实例
|
||
func NewStorage(cfg *config.Config) *Storage {
|
||
var client influxdb2.Client
|
||
|
||
// InfluxDB 2.x版本使用Token认证
|
||
// 检查Token是否为空
|
||
if cfg.InfluxDB.Token != "" {
|
||
// 使用Token认证(适用于InfluxDB 2.x)
|
||
client = influxdb2.NewClient(cfg.InfluxDB.URL, cfg.InfluxDB.Token)
|
||
} else {
|
||
// Token为空,尝试使用默认客户端
|
||
// 注意:InfluxDB 2.x需要有效的Token才能写入数据
|
||
log.Printf("Warning: InfluxDB Token is empty, write operations may fail")
|
||
client = influxdb2.NewClient(cfg.InfluxDB.URL, "")
|
||
}
|
||
|
||
// 配置InfluxDB客户端选项
|
||
options := client.Options()
|
||
// 禁用InfluxDB客户端的调试日志
|
||
options.SetLogLevel(0)
|
||
|
||
return &Storage{
|
||
client: client,
|
||
org: cfg.InfluxDB.Org,
|
||
bucket: cfg.InfluxDB.Bucket,
|
||
}
|
||
}
|
||
|
||
// Close 关闭存储连接
|
||
func (s *Storage) Close() {
|
||
s.client.Close()
|
||
}
|
||
|
||
// 写入数据到InfluxDB,带重试机制
|
||
func (s *Storage) writeData(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, deviceID, metricType string, timestamp ...time.Time) error {
|
||
// 重试配置 - 调整重试策略,减少重试次数,增加单次写入超时
|
||
maxRetries := 1
|
||
baseDelay := 100 * time.Millisecond
|
||
|
||
for i := 0; i <= maxRetries; i++ {
|
||
// 创建独立的写入上下文,设置较短的超时时间
|
||
writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||
defer cancel()
|
||
|
||
// 写入数据点
|
||
writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket)
|
||
// 构建行协议字符串
|
||
var fieldList []string
|
||
for k, v := range fields {
|
||
var fieldStr string
|
||
// 根据字段类型格式化
|
||
switch v := v.(type) {
|
||
case string:
|
||
fieldStr = fmt.Sprintf("%s=%q", k, v)
|
||
case float64, int, int32, int64:
|
||
fieldStr = fmt.Sprintf("%s=%v", k, v)
|
||
case bool:
|
||
fieldStr = fmt.Sprintf("%s=%t", k, v)
|
||
default:
|
||
// 转换为字符串
|
||
fieldStr = fmt.Sprintf("%s=%q", k, fmt.Sprintf("%v", v))
|
||
}
|
||
fieldList = append(fieldList, fieldStr)
|
||
}
|
||
// 确定时间戳
|
||
var ts int64
|
||
if len(timestamp) > 0 {
|
||
ts = timestamp[0].UnixNano()
|
||
} else {
|
||
ts = time.Now().UnixNano()
|
||
}
|
||
line := fmt.Sprintf("%s,%s %s %d", measurement, formatTags(tags), strings.Join(fieldList, ","), ts)
|
||
err := writeAPI.WriteRecord(writeCtx, line)
|
||
|
||
if err == nil {
|
||
// 写入成功,直接返回
|
||
return nil
|
||
}
|
||
|
||
// 如果是最后一次重试,返回错误
|
||
if i == maxRetries {
|
||
return err
|
||
}
|
||
|
||
// 计算重试延迟(指数退避)
|
||
delay := baseDelay*time.Duration(1<<i) + time.Duration(rand.Intn(50))*time.Millisecond
|
||
log.Printf("Warning: InfluxDB write failed for device %s, metric %s, retrying in %v... (Attempt %d/%d)\nError: %v", deviceID, metricType, delay, i+1, maxRetries, err)
|
||
|
||
// 等待重试
|
||
select {
|
||
case <-time.After(delay):
|
||
// 继续重试
|
||
case <-ctx.Done():
|
||
// 上下文取消,返回错误
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// WriteMetric 写入监控指标,带重试机制
|
||
func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string, value float64, tags map[string]string) error {
|
||
// 创建标签映射,合并原有标签和新标签
|
||
allTags := make(map[string]string)
|
||
// 复制原有标签
|
||
for k, v := range tags {
|
||
allTags[k] = v
|
||
}
|
||
// 添加设备ID标签
|
||
allTags["device_id"] = deviceID
|
||
// 添加指标类型标签
|
||
allTags["type"] = metricType
|
||
|
||
// 创建字段映射
|
||
fields := map[string]interface{}{
|
||
"value": value,
|
||
}
|
||
|
||
// 使用新的writeData方法
|
||
return s.writeData(ctx, "metrics", allTags, fields, deviceID, metricType)
|
||
}
|
||
|
||
// WriteProcessMetric 写入进程指标
|
||
func (s *Storage) WriteProcessMetric(ctx context.Context, deviceID string, processName, username string, pid int32, cpu, memory float64, path, cmdline string, ports []int, tags map[string]string) error {
|
||
// 创建标签映射,合并原有标签和新标签
|
||
allTags := make(map[string]string)
|
||
// 复制原有标签
|
||
for k, v := range tags {
|
||
allTags[k] = v
|
||
}
|
||
// 添加设备ID标签
|
||
allTags["device_id"] = deviceID
|
||
// 添加进程相关标签
|
||
allTags["process_name"] = processName
|
||
allTags["username"] = username
|
||
allTags["pid"] = fmt.Sprintf("%d", pid)
|
||
|
||
// 处理端口标签,只取前5个端口
|
||
portsStr := make([]string, 0, len(ports))
|
||
for i, port := range ports {
|
||
if i >= 5 {
|
||
break
|
||
}
|
||
portsStr = append(portsStr, fmt.Sprintf("%d", port))
|
||
}
|
||
allTags["ports"] = strings.Join(portsStr, ",")
|
||
|
||
// 创建字段映射
|
||
fields := map[string]interface{}{
|
||
"cpu_usage": cpu,
|
||
"memory_usage": memory,
|
||
"path": path,
|
||
"cmdline": cmdline,
|
||
}
|
||
|
||
// 使用新的writeData方法
|
||
return s.writeData(ctx, "processes", allTags, fields, deviceID, "process")
|
||
}
|
||
|
||
// WriteDiskDetailMetric 写入磁盘详细信息
|
||
func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskPath, status, diskType string, sizeGB float64, model, vendor, interfaceType, fileSystem, diskUUID, description string, tags map[string]string) error {
|
||
// 创建标签映射,合并原有标签和新标签
|
||
allTags := make(map[string]string)
|
||
// 复制原有标签
|
||
for k, v := range tags {
|
||
allTags[k] = v
|
||
}
|
||
// 添加设备ID标签
|
||
allTags["device_id"] = deviceID
|
||
// 添加磁盘相关标签
|
||
allTags["disk_path"] = diskPath
|
||
allTags["status"] = status
|
||
allTags["type"] = diskType
|
||
allTags["model"] = model
|
||
allTags["vendor"] = vendor
|
||
allTags["interface_type"] = interfaceType
|
||
allTags["file_system"] = fileSystem
|
||
|
||
// 创建字段映射
|
||
fields := map[string]interface{}{
|
||
"size_gb": sizeGB,
|
||
"disk_uuid": diskUUID,
|
||
"description": description,
|
||
}
|
||
|
||
// 使用新的writeData方法
|
||
return s.writeData(ctx, "disk_details", allTags, fields, deviceID, "disk_detail")
|
||
}
|
||
|
||
// WriteLogMetric 写入日志指标
|
||
func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence int, source string, time time.Time, message string, tags map[string]string) error {
|
||
// 创建标签映射,合并原有标签和新标签
|
||
allTags := make(map[string]string)
|
||
// 复制原有标签
|
||
for k, v := range tags {
|
||
allTags[k] = v
|
||
}
|
||
// 添加设备ID标签
|
||
allTags["device_id"] = deviceID
|
||
// 添加日志来源标签
|
||
allTags["source"] = source
|
||
|
||
// 创建字段映射
|
||
fields := map[string]interface{}{
|
||
"sequence": sequence,
|
||
"message": message,
|
||
}
|
||
|
||
// 使用新的writeData方法,传入日志的实际时间
|
||
return s.writeData(ctx, "logs", allTags, fields, deviceID, "log", time)
|
||
}
|
||
|
||
// WriteHardwareMetric 写入硬件信息指标
|
||
func (s *Storage) WriteHardwareMetric(ctx context.Context, deviceID string, hardwareType string, hardwareData map[string]interface{}, tags map[string]string) error {
|
||
// 创建标签映射,合并原有标签和新标签
|
||
allTags := make(map[string]string)
|
||
// 复制原有标签
|
||
for k, v := range tags {
|
||
allTags[k] = v
|
||
}
|
||
// 添加设备ID标签
|
||
allTags["device_id"] = deviceID
|
||
// 添加硬件类型标签
|
||
allTags["type"] = hardwareType
|
||
|
||
// 使用新的writeData方法
|
||
return s.writeData(ctx, "hardware", allTags, hardwareData, deviceID, hardwareType)
|
||
}
|
||
|
||
// QueryMetrics 查询监控指标,支持采样
|
||
func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string, limit ...int) ([]MetricPoint, error) {
|
||
queryAPI := s.client.QueryAPI(s.org)
|
||
|
||
// 处理时间参数
|
||
processedStartTime := startTime
|
||
processedEndTime := endTime
|
||
|
||
// 检查并修复时间格式,确保InfluxDB能正确识别
|
||
// 对于相对时间(如-24h)或now(),直接使用
|
||
// 对于绝对时间,确保格式正确
|
||
if len(startTime) > 0 {
|
||
// 检查是否是相对时间(以-开头或为now())
|
||
isRelativeTime := strings.HasPrefix(startTime, "-") || startTime == "now()"
|
||
|
||
if !isRelativeTime {
|
||
// 是绝对时间,尝试解析为time.Time类型
|
||
t, err := time.Parse(time.RFC3339, startTime)
|
||
if err != nil {
|
||
// 尝试其他常见格式
|
||
t, err = time.Parse("2006-01-02T15:04", startTime)
|
||
if err != nil {
|
||
// 尝试YYYY-MM-DD HH:MM格式
|
||
t, err = time.Parse("2006-01-02 15:04", startTime)
|
||
}
|
||
}
|
||
|
||
if err == nil {
|
||
// 解析成功,使用RFC3339格式(InfluxDB可以识别)
|
||
processedStartTime = t.Format(time.RFC3339)
|
||
}
|
||
}
|
||
}
|
||
|
||
if len(endTime) > 0 {
|
||
// 检查是否是相对时间(以-开头或为now())
|
||
isRelativeTime := strings.HasPrefix(endTime, "-") || endTime == "now()"
|
||
|
||
if !isRelativeTime {
|
||
// 是绝对时间,尝试解析为time.Time类型
|
||
t, err := time.Parse(time.RFC3339, endTime)
|
||
if err != nil {
|
||
// 尝试其他常见格式
|
||
t, err = time.Parse("2006-01-02T15:04", endTime)
|
||
if err != nil {
|
||
// 尝试YYYY-MM-DD HH:MM格式
|
||
t, err = time.Parse("2006-01-02 15:04", endTime)
|
||
}
|
||
}
|
||
|
||
if err == nil {
|
||
// 解析成功,使用RFC3339格式(InfluxDB可以识别)
|
||
processedEndTime = t.Format(time.RFC3339)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 构建查询语句,包含标签信息和字段过滤
|
||
query := `from(bucket: "` + s.bucket + `")
|
||
|> range(start: ` + processedStartTime + `, stop: ` + processedEndTime + `)
|
||
|> filter(fn: (r) => r["_measurement"] == "metrics")
|
||
|> filter(fn: (r) => r["_field"] == "value")`
|
||
|
||
// 只有当deviceID非空时才添加device_id过滤
|
||
if deviceID != "" {
|
||
query += `
|
||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
|
||
}
|
||
|
||
query += `
|
||
|> filter(fn: (r) => r["type"] == "` + metricType + `")
|
||
|> sort(columns: ["_time"])`
|
||
|
||
// 设置默认限制
|
||
maxLimit := 10000
|
||
if len(limit) > 0 && limit[0] > 0 {
|
||
maxLimit = limit[0]
|
||
}
|
||
|
||
query += `
|
||
|> limit(n: ` + strconv.Itoa(maxLimit) + `)` // 限制返回的数据点数量,防止内存溢出
|
||
|
||
// 执行查询
|
||
result, err := queryAPI.Query(ctx, query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer result.Close()
|
||
|
||
var points []MetricPoint
|
||
|
||
// 处理查询结果
|
||
for result.Next() {
|
||
// 表结构变化也需要处理数据
|
||
|
||
// 获取记录
|
||
record := result.Record()
|
||
|
||
// 获取所有标签
|
||
tags := make(map[string]string)
|
||
for k, v := range record.Values() {
|
||
// 跳过内置字段,只保留自定义标签
|
||
if k != "_time" && k != "_value" && k != "_field" && k != "_measurement" {
|
||
if strValue, ok := v.(string); ok {
|
||
tags[k] = strValue
|
||
}
|
||
}
|
||
}
|
||
|
||
// 转换为自定义MetricPoint
|
||
point := MetricPoint{
|
||
Time: record.Time(),
|
||
Value: record.Value().(float64),
|
||
Tags: tags,
|
||
}
|
||
|
||
points = append(points, point)
|
||
}
|
||
|
||
if result.Err() != nil {
|
||
return nil, result.Err()
|
||
}
|
||
|
||
return points, nil
|
||
}
|
||
|
||
// DeviceInfo 设备信息
|
||
func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, error) {
|
||
queryAPI := s.client.QueryAPI(s.org)
|
||
|
||
// 构建查询语句,获取所有唯一的device_id和对应的agent_name
|
||
query := `from(bucket: "` + s.bucket + `")
|
||
|> range(start: -30d)
|
||
|> filter(fn: (r) => r["_measurement"] == "metrics")
|
||
|> group(columns: ["device_id", "agent_name"])
|
||
|> last()
|
||
|> keep(columns: ["device_id", "agent_name"])`
|
||
|
||
// 执行查询
|
||
queryResult, err := queryAPI.Query(ctx, query)
|
||
if err != nil {
|
||
return []map[string]string{{"id": "default", "name": "Default"}}, nil
|
||
}
|
||
defer queryResult.Close()
|
||
|
||
deviceMap := make(map[string]string)
|
||
|
||
// 处理查询结果
|
||
for queryResult.Next() {
|
||
if queryResult.TableChanged() {
|
||
// 表结构变化,跳过
|
||
continue
|
||
}
|
||
|
||
// 获取记录
|
||
record := queryResult.Record()
|
||
deviceID := record.ValueByKey("device_id").(string)
|
||
agentName := ""
|
||
if name, ok := record.ValueByKey("agent_name").(string); ok {
|
||
agentName = name
|
||
}
|
||
if agentName == "" {
|
||
agentName = deviceID
|
||
}
|
||
deviceMap[deviceID] = agentName
|
||
}
|
||
|
||
if queryResult.Err() != nil {
|
||
return []map[string]string{{"id": "default", "name": "Default"}}, nil
|
||
}
|
||
|
||
// 转换为切片
|
||
deviceList := make([]map[string]string, 0, len(deviceMap))
|
||
for deviceID, agentName := range deviceMap {
|
||
deviceList = append(deviceList, map[string]string{
|
||
"id": deviceID,
|
||
"name": agentName,
|
||
})
|
||
}
|
||
|
||
// 如果没有设备,返回默认设备
|
||
if len(deviceList) == 0 {
|
||
return []map[string]string{{"id": "default", "name": "Default"}}, nil
|
||
}
|
||
|
||
return deviceList, nil
|
||
}
|
||
|
||
// QueryHardwareMetrics 查询硬件信息指标
|
||
func (s *Storage) QueryHardwareMetrics(ctx context.Context, deviceID string) (map[string]interface{}, error) {
|
||
queryAPI := s.client.QueryAPI(s.org)
|
||
|
||
// 初始化硬件信息结果
|
||
hardwareInfo := make(map[string]interface{})
|
||
|
||
// 定义要查询的硬件类型列表
|
||
hardwareTypes := []string{"os", "cpu", "memory", "disk", "network"}
|
||
|
||
// 为每个硬件类型单独查询,避免类型冲突
|
||
for _, hardwareType := range hardwareTypes {
|
||
// 构建查询语句
|
||
var query string
|
||
if hardwareType == "os" || hardwareType == "cpu" || hardwareType == "memory" {
|
||
// 对于os、cpu、memory类型,我们只需要一个结果
|
||
query = `from(bucket: "` + s.bucket + `")
|
||
|> range(start: -24h)
|
||
|> filter(fn: (r) => r["_measurement"] == "hardware")
|
||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")
|
||
|> filter(fn: (r) => r["type"] == "` + hardwareType + `")
|
||
|> last()`
|
||
} else {
|
||
// 对于disk和network类型,我们需要获取所有设备的所有字段记录
|
||
query = `from(bucket: "` + s.bucket + `")
|
||
|> range(start: -24h)
|
||
|> filter(fn: (r) => r["_measurement"] == "hardware")
|
||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")
|
||
|> filter(fn: (r) => r["type"] == "` + hardwareType + `")`
|
||
}
|
||
|
||
// 执行查询
|
||
queryResult, err := queryAPI.Query(ctx, query)
|
||
if err != nil {
|
||
// 查询单个硬件类型失败,继续查询其他类型
|
||
log.Printf("Warning: Failed to query %s hardware metrics: %v", hardwareType, err)
|
||
continue
|
||
}
|
||
defer queryResult.Close()
|
||
|
||
// 对于os、cpu、memory类型,我们只需要一个结果
|
||
if hardwareType == "os" || hardwareType == "cpu" || hardwareType == "memory" {
|
||
// 获取字段值映射
|
||
fieldMap := make(map[string]interface{})
|
||
|
||
// 遍历所有结果行,收集所有字段
|
||
for queryResult.Next() {
|
||
record := queryResult.Record()
|
||
fieldName := record.Field()
|
||
fieldValue := record.Value()
|
||
fieldMap[fieldName] = fieldValue
|
||
}
|
||
|
||
// 如果有字段数据,添加到硬件信息结果中
|
||
if len(fieldMap) > 0 {
|
||
hardwareInfo[hardwareType] = fieldMap
|
||
}
|
||
} else {
|
||
// 对于disk和network类型,我们可能有多个结果,每个字段作为单独的行返回
|
||
var allResults []map[string]interface{}
|
||
|
||
// 创建一个map来存储每个设备的所有字段值
|
||
// key是设备ID或index
|
||
deviceFields := make(map[string]map[string]interface{})
|
||
|
||
// 遍历所有结果行,收集所有结果
|
||
for queryResult.Next() {
|
||
// 获取记录
|
||
record := queryResult.Record()
|
||
|
||
// 获取字段名和值
|
||
fieldName := record.Field()
|
||
fieldValue := record.Value()
|
||
|
||
// 获取设备唯一标识
|
||
deviceKey := ""
|
||
// 优先使用id字段作为设备标识
|
||
if id, isString := fieldValue.(string); fieldName == "id" && isString {
|
||
deviceKey = id
|
||
// 创建设备字段映射
|
||
deviceFields[deviceKey] = make(map[string]interface{})
|
||
deviceFields[deviceKey]["id"] = id
|
||
} else {
|
||
// 否则,查找已存在的设备映射
|
||
// 遍历所有设备映射,查找当前记录对应的设备
|
||
for key, fields := range deviceFields {
|
||
// 如果设备映射中没有id字段,或者id字段为空,跳过
|
||
if id, ok := fields["id"].(string); ok && id != "" {
|
||
// 假设当前记录属于该设备
|
||
deviceKey = key
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 如果没有找到设备标识,使用index作为设备标识
|
||
if deviceKey == "" {
|
||
index := 0
|
||
if idx, ok := record.ValueByKey("index").(int); ok {
|
||
index = idx
|
||
}
|
||
deviceKey = fmt.Sprintf("%d", index)
|
||
// 创建设备字段映射
|
||
deviceFields[deviceKey] = make(map[string]interface{})
|
||
}
|
||
|
||
// 添加字段值到设备字段映射中
|
||
deviceFields[deviceKey][fieldName] = fieldValue
|
||
}
|
||
|
||
// 将deviceFields转换为切片
|
||
for _, fields := range deviceFields {
|
||
allResults = append(allResults, fields)
|
||
}
|
||
|
||
// 按id排序
|
||
sort.Slice(allResults, func(i, j int) bool {
|
||
iID, iOk := allResults[i]["id"].(string)
|
||
jID, jOk := allResults[j]["id"].(string)
|
||
if iOk && jOk {
|
||
return iID < jID
|
||
}
|
||
return false
|
||
})
|
||
|
||
// 如果有结果,添加到硬件信息结果中
|
||
if len(allResults) > 0 {
|
||
hardwareInfo[hardwareType] = allResults
|
||
}
|
||
}
|
||
|
||
// 检查查询结果是否有错误
|
||
if queryResult.Err() != nil {
|
||
// 查询单个硬件类型失败,继续查询其他类型
|
||
log.Printf("Warning: Failed to process %s hardware query result: %v", hardwareType, queryResult.Err())
|
||
}
|
||
}
|
||
|
||
return hardwareInfo, nil
|
||
}
|
||
|
||
// DeviceStatus 设备状态
|
||
func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) {
|
||
queryAPI := s.client.QueryAPI(s.org)
|
||
|
||
// 构建查询语句,获取设备最新的指标数据和agent名称
|
||
query := `from(bucket: "` + s.bucket + `")
|
||
|> range(start: -1h)
|
||
|> filter(fn: (r) => r["_measurement"] == "metrics")
|
||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")
|
||
|> last()`
|
||
|
||
// 执行查询
|
||
queryResult, err := queryAPI.Query(ctx, query)
|
||
if err != nil {
|
||
return "", nil, err
|
||
}
|
||
defer queryResult.Close()
|
||
|
||
status := make(map[string]float64)
|
||
agentName := deviceID
|
||
hasData := false
|
||
|
||
// 处理查询结果
|
||
for queryResult.Next() {
|
||
if queryResult.TableChanged() {
|
||
// 表结构变化,跳过
|
||
continue
|
||
}
|
||
|
||
hasData = true
|
||
// 获取记录
|
||
record := queryResult.Record()
|
||
metricType := record.ValueByKey("type").(string)
|
||
value := record.Value().(float64)
|
||
status[metricType] = value
|
||
|
||
// 获取agent名称
|
||
if name, ok := record.ValueByKey("agent_name").(string); ok && name != "" {
|
||
agentName = name
|
||
}
|
||
}
|
||
|
||
if queryResult.Err() != nil {
|
||
return "", nil, queryResult.Err()
|
||
}
|
||
|
||
if !hasData {
|
||
// 如果没有数据,返回空状态
|
||
return "", map[string]float64{}, nil
|
||
}
|
||
|
||
return agentName, status, nil
|
||
}
|
||
|
||
// QueryProcessMetrics 查询进程指标
|
||
func (s *Storage) QueryProcessMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
|
||
queryAPI := s.client.QueryAPI(s.org)
|
||
|
||
// 构建查询语句
|
||
query := `from(bucket: "` + s.bucket + `")
|
||
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|
||
|> filter(fn: (r) => r["_measurement"] == "processes")`
|
||
|
||
// 如果指定了设备ID,添加设备ID过滤
|
||
if deviceID != "" {
|
||
query += `
|
||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
|
||
}
|
||
|
||
// 获取最新的进程数据
|
||
query += `
|
||
|> last()`
|
||
|
||
// 执行查询
|
||
queryResult, err := queryAPI.Query(ctx, query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer queryResult.Close()
|
||
|
||
// 存储进程数据
|
||
processes := make([]map[string]interface{}, 0)
|
||
|
||
// 处理查询结果
|
||
for queryResult.Next() {
|
||
if queryResult.TableChanged() {
|
||
// 表结构变化,跳过
|
||
continue
|
||
}
|
||
|
||
// 获取记录
|
||
record := queryResult.Record()
|
||
|
||
// 构建进程数据
|
||
processData := map[string]interface{}{
|
||
"time": record.Time(),
|
||
"device_id": record.ValueByKey("device_id"),
|
||
"process_name": record.ValueByKey("process_name"),
|
||
"username": record.ValueByKey("username"),
|
||
"pid": record.ValueByKey("pid"),
|
||
"cpu_usage": record.ValueByKey("cpu_usage"),
|
||
"memory_usage": record.ValueByKey("memory_usage"),
|
||
"path": record.ValueByKey("path"),
|
||
"cmdline": record.ValueByKey("cmdline"),
|
||
"ports": record.ValueByKey("ports"),
|
||
"agent_name": record.ValueByKey("agent_name"),
|
||
}
|
||
|
||
// 添加到进程列表
|
||
processes = append(processes, processData)
|
||
}
|
||
|
||
if queryResult.Err() != nil {
|
||
return nil, queryResult.Err()
|
||
}
|
||
|
||
return processes, nil
|
||
}
|
||
|
||
// QueryLogMetrics 查询日志指标
|
||
func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
|
||
queryAPI := s.client.QueryAPI(s.org)
|
||
|
||
// 构建查询语句
|
||
query := `from(bucket: "` + s.bucket + `")
|
||
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|
||
|> filter(fn: (r) => r["_measurement"] == "logs")`
|
||
|
||
// 如果指定了设备ID,添加设备ID过滤
|
||
if deviceID != "" {
|
||
query += `
|
||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
|
||
}
|
||
|
||
// 按时间倒序排列,获取最新的日志
|
||
query += `
|
||
|> sort(columns: ["_time"], desc: true)
|
||
|> limit(n: 200)` // 限制返回200条记录(因为message和sequence是分开存储的)
|
||
|
||
// 执行查询
|
||
queryResult, err := queryAPI.Query(ctx, query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer queryResult.Close()
|
||
|
||
// 使用map存储日志数据,key是时间戳和source的组合
|
||
logMap := make(map[string]map[string]interface{})
|
||
|
||
// 处理查询结果
|
||
for queryResult.Next() {
|
||
// 获取记录
|
||
record := queryResult.Record()
|
||
|
||
// 获取时间和source作为唯一键
|
||
timeStr := record.Time().Format(time.RFC3339Nano)
|
||
source := record.ValueByKey("source").(string)
|
||
key := timeStr + "-" + source
|
||
|
||
// 检查是否已经有这个日志条目的基础信息
|
||
logData, exists := logMap[key]
|
||
if !exists {
|
||
// 创建新的日志条目
|
||
logData = map[string]interface{}{
|
||
"time": record.Time(),
|
||
"device_id": record.ValueByKey("device_id"),
|
||
"source": source,
|
||
"sequence": nil,
|
||
"message": nil,
|
||
"agent_name": record.ValueByKey("agent_name"),
|
||
}
|
||
logMap[key] = logData
|
||
}
|
||
|
||
// 根据字段类型更新相应的值
|
||
field := record.Field()
|
||
switch field {
|
||
case "message":
|
||
logData["message"] = record.Value()
|
||
case "sequence":
|
||
logData["sequence"] = record.Value()
|
||
}
|
||
}
|
||
|
||
if queryResult.Err() != nil {
|
||
return nil, queryResult.Err()
|
||
}
|
||
|
||
// 将map转换为切片
|
||
logs := make([]map[string]interface{}, 0, len(logMap))
|
||
for _, logData := range logMap {
|
||
logs = append(logs, logData)
|
||
}
|
||
|
||
// 按时间倒序排序
|
||
sort.Slice(logs, func(i, j int) bool {
|
||
timeI := logs[i]["time"].(time.Time)
|
||
timeJ := logs[j]["time"].(time.Time)
|
||
return timeI.After(timeJ)
|
||
})
|
||
|
||
// 限制返回100条最新日志
|
||
if len(logs) > 100 {
|
||
logs = logs[:100]
|
||
}
|
||
|
||
return logs, nil
|
||
}
|
||
|
||
// QueryDiskDetails 查询磁盘详细信息
|
||
func (s *Storage) QueryDiskDetails(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
|
||
queryAPI := s.client.QueryAPI(s.org)
|
||
|
||
// 构建查询语句
|
||
query := `from(bucket: "` + s.bucket + `")
|
||
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|
||
|> filter(fn: (r) => r["_measurement"] == "disk_details")`
|
||
|
||
// 如果指定了设备ID,添加设备ID过滤
|
||
if deviceID != "" {
|
||
query += `
|
||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
|
||
}
|
||
|
||
// 获取最新的磁盘详细信息
|
||
query += `
|
||
|> last()`
|
||
|
||
// 执行查询
|
||
queryResult, err := queryAPI.Query(ctx, query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer queryResult.Close()
|
||
|
||
// 存储磁盘详细信息
|
||
diskDetails := make([]map[string]interface{}, 0)
|
||
|
||
// 处理查询结果
|
||
for queryResult.Next() {
|
||
if queryResult.TableChanged() {
|
||
// 表结构变化,跳过
|
||
continue
|
||
}
|
||
|
||
// 获取记录
|
||
record := queryResult.Record()
|
||
|
||
// 构建磁盘详细信息
|
||
diskData := map[string]interface{}{
|
||
"time": record.Time(),
|
||
"device_id": record.ValueByKey("device_id"),
|
||
"disk_id": record.ValueByKey("disk_id"),
|
||
"status": record.ValueByKey("status"),
|
||
"type": record.ValueByKey("type"),
|
||
"size_gb": record.ValueByKey("size_gb"),
|
||
"model": record.ValueByKey("model"),
|
||
"vendor": record.ValueByKey("vendor"),
|
||
"interface_type": record.ValueByKey("interface_type"),
|
||
"file_system": record.ValueByKey("file_system"),
|
||
"description": record.ValueByKey("description"),
|
||
"agent_name": record.ValueByKey("agent_name"),
|
||
}
|
||
|
||
// 添加到磁盘详细信息列表
|
||
diskDetails = append(diskDetails, diskData)
|
||
}
|
||
|
||
if queryResult.Err() != nil {
|
||
return nil, queryResult.Err()
|
||
}
|
||
|
||
return diskDetails, nil
|
||
}
|