修复图表时间和查询区间不匹配问题

This commit is contained in:
Alex Yang
2025-12-02 23:20:10 +08:00
commit 4d66bdf633
49 changed files with 16275 additions and 0 deletions

View File

@@ -0,0 +1,96 @@
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
)
// AgentCollector Agent采集器
type AgentCollector struct {
agentURL string
client *http.Client
}
// AgentMetrics Agent返回的监控指标
type AgentMetrics struct {
CPU float64 `json:"cpu"`
Memory float64 `json:"memory"`
Disk float64 `json:"disk"`
Network struct {
BytesSent uint64 `json:"bytes_sent"`
BytesReceived uint64 `json:"bytes_received"`
} `json:"network"`
}
// NewAgentCollector 创建新的Agent采集器
func NewAgentCollector(agentURL string) *AgentCollector {
return &AgentCollector{
agentURL: agentURL,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// GetMetrics 获取Agent监控指标
func (c *AgentCollector) GetMetrics() (*AgentMetrics, error) {
// 发送HTTP请求到Agent
resp, err := c.client.Get(fmt.Sprintf("%s/metrics", c.agentURL))
if err != nil {
return nil, err
}
defer resp.Body.Close()
// 检查响应状态码
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("agent returned status code %d", resp.StatusCode)
}
// 读取响应体
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// 解析JSON响应
var metrics AgentMetrics
if err := json.Unmarshal(body, &metrics); err != nil {
return nil, err
}
return &metrics, nil
}
// GetCPUUsage 获取CPU使用率
func (c *AgentCollector) GetCPUUsage() (float64, error) {
metrics, err := c.GetMetrics()
if err != nil {
return 0, err
}
return metrics.CPU, nil
}
// GetMemoryUsage 获取内存使用率
func (c *AgentCollector) GetMemoryUsage() (float64, error) {
metrics, err := c.GetMetrics()
if err != nil {
return 0, err
}
return metrics.Memory, nil
}
// GetDiskUsage 获取磁盘使用率
func (c *AgentCollector) GetDiskUsage(partition string) (float64, error) {
metrics, err := c.GetMetrics()
if err != nil {
return 0, err
}
// 目前Agent只返回整体磁盘使用率忽略partition参数
return metrics.Disk, nil
}

View File

@@ -0,0 +1,89 @@
package collector
import (
"github.com/gosnmp/gosnmp"
)
// SNMPCollector SNMP采集器
type SNMPCollector struct {
client *gosnmp.GoSNMP
}
// NewSNMPCollector 创建新的SNMP采集器
func NewSNMPCollector(target, community string, version gosnmp.SnmpVersion) (*SNMPCollector, error) {
client := &gosnmp.GoSNMP{
Target: target,
Community: community,
Version: version,
Port: 161,
}
// 连接SNMP设备
if err := client.Connect(); err != nil {
return nil, err
}
return &SNMPCollector{
client: client,
}, nil
}
// Close 关闭SNMP连接
func (c *SNMPCollector) Close() error {
return c.client.Conn.Close()
}
// GetCPUUsage 获取CPU使用率
func (c *SNMPCollector) GetCPUUsage() (float64, error) {
// 这里使用UCD-SNMP-MIB::ssCpuIdle.0 OID来获取CPU空闲率
// 实际使用时需要根据设备类型和SNMP MIB进行调整
oids := []string{"1.3.6.1.4.1.2021.11.11.0"}
result, err := c.client.Get(oids)
if err != nil {
return 0, err
}
// CPU使用率 = 100 - 空闲率
idle := float64(result.Variables[0].Value.(uint))
return 100 - idle, nil
}
// GetMemoryUsage 获取内存使用率
func (c *SNMPCollector) GetMemoryUsage() (float64, error) {
// 这里使用UCD-SNMP-MIB::memTotalReal.0和memAvailReal.0 OID来获取内存使用情况
oids := []string{
"1.3.6.1.4.1.2021.4.5.0", // 总内存
"1.3.6.1.4.1.2021.4.6.0", // 可用内存
}
result, err := c.client.Get(oids)
if err != nil {
return 0, err
}
total := float64(result.Variables[0].Value.(uint))
available := float64(result.Variables[1].Value.(uint))
return ((total - available) / total) * 100, nil
}
// GetDiskUsage 获取磁盘使用率
func (c *SNMPCollector) GetDiskUsage(partition string) (float64, error) {
// 这里使用UCD-SNMP-MIB::dskTotal和dskAvail OID来获取磁盘使用情况
// 实际使用时需要根据分区索引进行调整
oids := []string{
"1.3.6.1.4.1.2021.9.1.6.1", // 总磁盘空间
"1.3.6.1.4.1.2021.9.1.7.1", // 可用磁盘空间
}
result, err := c.client.Get(oids)
if err != nil {
return 0, err
}
total := float64(result.Variables[0].Value.(uint))
available := float64(result.Variables[1].Value.(uint))
return ((total - available) / total) * 100, nil
}

View File

@@ -0,0 +1,131 @@
package collector
import (
"bytes"
"fmt"
"strconv"
"golang.org/x/crypto/ssh"
)
// SSHCollector SSH采集器
type SSHCollector struct {
client *ssh.Client
}
// SSHConfig SSH连接配置
type SSHConfig struct {
Host string
Port int
Username string
Password string
KeyPath string
}
// NewSSHCollector 创建新的SSH采集器
func NewSSHCollector(cfg *SSHConfig) (*SSHCollector, error) {
// 构建SSH客户端配置
sshConfig := &ssh.ClientConfig{
User: cfg.Username,
Auth: []ssh.AuthMethod{},
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // 生产环境中应该使用更安全的HostKeyCallback
}
// 添加认证方式
if cfg.Password != "" {
sshConfig.Auth = append(sshConfig.Auth, ssh.Password(cfg.Password))
}
// 连接SSH服务器
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
client, err := ssh.Dial("tcp", addr, sshConfig)
if err != nil {
return nil, err
}
return &SSHCollector{
client: client,
}, nil
}
// Close 关闭SSH连接
func (c *SSHCollector) Close() error {
return c.client.Close()
}
// executeCommand 执行SSH命令
func (c *SSHCollector) executeCommand(cmd string) (string, error) {
// 创建SSH会话
session, err := c.client.NewSession()
if err != nil {
return "", err
}
defer session.Close()
// 执行命令并获取输出
var stdout bytes.Buffer
session.Stdout = &stdout
if err := session.Run(cmd); err != nil {
return "", err
}
return stdout.String(), nil
}
// GetCPUUsage 获取CPU使用率
func (c *SSHCollector) GetCPUUsage() (float64, error) {
// 使用top命令获取CPU使用率
cmd := "top -bn1 | grep 'Cpu(s)' | awk '{print $2 + $4}'"
output, err := c.executeCommand(cmd)
if err != nil {
return 0, err
}
// 解析输出
usage, err := strconv.ParseFloat(output, 64)
if err != nil {
return 0, err
}
return usage, nil
}
// GetMemoryUsage 获取内存使用率
func (c *SSHCollector) GetMemoryUsage() (float64, error) {
// 使用free命令获取内存使用情况
cmd := "free | grep Mem | awk '{print ($3/$2)*100}'"
output, err := c.executeCommand(cmd)
if err != nil {
return 0, err
}
// 解析输出
usage, err := strconv.ParseFloat(output, 64)
if err != nil {
return 0, err
}
return usage, nil
}
// GetDiskUsage 获取磁盘使用率
func (c *SSHCollector) GetDiskUsage(partition string) (float64, error) {
// 使用df命令获取磁盘使用情况
cmd := fmt.Sprintf("df -h %s | tail -1 | awk '{print $5}' | sed 's/%%//'", partition)
output, err := c.executeCommand(cmd)
if err != nil {
return 0, err
}
// 解析输出
usage, err := strconv.ParseFloat(output, 64)
if err != nil {
return 0, err
}
return usage, nil
}

39
backend/internal/db/db.go Normal file
View File

@@ -0,0 +1,39 @@
package db
import (
"context"
"database/sql"
)
// DB 定义数据库连接接口
type DB interface {
// 获取原始数据库连接
GetDB() *sql.DB
// 执行查询,返回多行结果
Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
// 执行查询,返回单行结果
QueryRow(ctx context.Context, query string, args ...interface{}) *sql.Row
// 执行更新操作INSERT、UPDATE、DELETE
Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
// 开始事务
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
// 关闭数据库连接
Close() error
}
// Config 数据库配置结构体
type Config struct {
Type string
Host string
Port int
Username string
Password string
Database string
SSLMode string
Charset string
}

View File

@@ -0,0 +1,49 @@
package db
import (
"context"
"database/sql"
"log"
)
// NewDB 根据配置创建数据库连接实例
func NewDB(cfg Config) (DB, error) {
// 只支持MySQL数据库
return NewMySQLDB(cfg)
}
// 基础数据库连接结构体
type baseDB struct {
db *sql.DB
}
// GetDB 获取原始数据库连接
func (b *baseDB) GetDB() *sql.DB {
return b.db
}
// Query 执行查询,返回多行结果
func (b *baseDB) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return b.db.QueryContext(ctx, query, args...)
}
// QueryRow 执行查询,返回单行结果
func (b *baseDB) QueryRow(ctx context.Context, query string, args ...interface{}) *sql.Row {
return b.db.QueryRowContext(ctx, query, args...)
}
// Exec 执行更新操作INSERT、UPDATE、DELETE
func (b *baseDB) Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return b.db.ExecContext(ctx, query, args...)
}
// BeginTx 开始事务
func (b *baseDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
return b.db.BeginTx(ctx, opts)
}
// Close 关闭数据库连接
func (b *baseDB) Close() error {
log.Println("Closing database connection")
return b.db.Close()
}

View File

@@ -0,0 +1,51 @@
package db
import (
"context"
"database/sql"
"fmt"
"log"
"time"
// 导入MySQL驱动
_ "github.com/go-sql-driver/mysql"
)
// MySQLDB MySQL数据库连接结构体
type MySQLDB struct {
baseDB
}
// NewMySQLDB 创建MySQL数据库连接实例
func NewMySQLDB(cfg Config) (DB, error) {
// 构建MySQL连接字符串
// 格式: username:password@tcp(host:port)/database?charset=utf8mb4&parseTime=True&loc=Local
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=%s&parseTime=True&loc=Local",
cfg.Username, cfg.Password, cfg.Host, cfg.Port, cfg.Database, cfg.Charset)
// 打开数据库连接
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open mysql connection: %w", err)
}
// 配置连接池
db.SetMaxOpenConns(20) // 最大打开连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(30 * time.Minute) // 连接最大生命周期
db.SetConnMaxIdleTime(10 * time.Minute) // 连接最大空闲时间
// 测试连接
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("failed to ping mysql: %w", err)
}
log.Println("MySQL connection established successfully")
return &MySQLDB{
baseDB: baseDB{db: db},
}, nil
}

View File

@@ -0,0 +1,135 @@
package device
import (
"sync"
)
// Device 设备信息
type Device struct {
ID string `json:"id"`
Name string `json:"name"`
IP string `json:"ip"`
Token string `json:"token"` // 设备认证令牌
Status string `json:"status"` // active, inactive, offline
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
// Storage 设备存储接口
type Storage interface {
// GetDevices 获取所有设备
GetDevices() []Device
// GetDevice 获取指定设备
GetDevice(id string) (Device, bool)
// GetDeviceByToken 通过令牌获取设备
GetDeviceByToken(token string) (Device, bool)
// AddDevice 添加设备
AddDevice(device Device) error
// UpdateDevice 更新设备
UpdateDevice(device Device) error
// DeleteDevice 删除设备
DeleteDevice(id string) error
// UpdateDeviceStatus 更新设备状态
UpdateDeviceStatus(id string, status string) error
}
// MemoryStorage 内存设备存储
type MemoryStorage struct {
devices map[string]Device
mutex sync.RWMutex
}
// NewMemoryStorage 创建内存设备存储实例
func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{
devices: make(map[string]Device),
}
}
// GetDevices 获取所有设备
func (s *MemoryStorage) GetDevices() []Device {
s.mutex.RLock()
defer s.mutex.RUnlock()
devices := make([]Device, 0, len(s.devices))
for _, device := range s.devices {
devices = append(devices, device)
}
return devices
}
// GetDevice 获取指定设备
func (s *MemoryStorage) GetDevice(id string) (Device, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()
device, ok := s.devices[id]
return device, ok
}
// AddDevice 添加设备
func (s *MemoryStorage) AddDevice(device Device) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// 检查设备是否已存在
if _, ok := s.devices[device.ID]; ok {
return nil // 设备已存在,不重复添加
}
s.devices[device.ID] = device
return nil
}
// UpdateDevice 更新设备
func (s *MemoryStorage) UpdateDevice(device Device) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// 检查设备是否存在
if _, ok := s.devices[device.ID]; !ok {
return nil // 设备不存在,不更新
}
s.devices[device.ID] = device
return nil
}
// DeleteDevice 删除设备
func (s *MemoryStorage) DeleteDevice(id string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.devices, id)
return nil
}
// GetDeviceByToken 通过令牌获取设备
func (s *MemoryStorage) GetDeviceByToken(token string) (Device, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()
for _, device := range s.devices {
if device.Token == token {
return device, true
}
}
return Device{}, false
}
// UpdateDeviceStatus 更新设备状态
func (s *MemoryStorage) UpdateDeviceStatus(id string, status string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
device, ok := s.devices[id]
if !ok {
return nil // 设备不存在,不更新
}
device.Status = status
s.devices[id] = device
return nil
}

View File

@@ -0,0 +1,116 @@
package device
import (
"database/sql"
"time"
)
// MySQLStorage MySQL设备存储
type MySQLStorage struct {
db *sql.DB
}
// NewMySQLStorage 创建MySQL设备存储实例
func NewMySQLStorage(db *sql.DB) (*MySQLStorage, error) {
// 创建表
if err := createDeviceTable(db); err != nil {
return nil, err
}
return &MySQLStorage{db: db}, nil
}
// createDeviceTable 创建设备表
func createDeviceTable(db *sql.DB) error {
query := `
CREATE TABLE IF NOT EXISTS devices (
id VARCHAR(64) PRIMARY KEY,
name VARCHAR(128) NOT NULL,
ip VARCHAR(64),
token VARCHAR(64) NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'inactive',
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
UNIQUE KEY uk_token (token)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
`
_, err := db.Exec(query)
return err
}
// GetDevices 获取所有设备
func (s *MySQLStorage) GetDevices() []Device {
query := "SELECT id, name, ip, token, status, created_at, updated_at FROM devices"
rows, err := s.db.Query(query)
if err != nil {
return []Device{}
}
defer rows.Close()
devices := make([]Device, 0)
for rows.Next() {
var device Device
if err := rows.Scan(&device.ID, &device.Name, &device.IP, &device.Token, &device.Status, &device.CreatedAt, &device.UpdatedAt); err != nil {
continue
}
devices = append(devices, device)
}
return devices
}
// GetDevice 获取指定设备
func (s *MySQLStorage) GetDevice(id string) (Device, bool) {
query := "SELECT id, name, ip, token, status, created_at, updated_at FROM devices WHERE id = ?"
row := s.db.QueryRow(query, id)
var device Device
err := row.Scan(&device.ID, &device.Name, &device.IP, &device.Token, &device.Status, &device.CreatedAt, &device.UpdatedAt)
if err != nil {
return Device{}, false
}
return device, true
}
// GetDeviceByToken 通过令牌获取设备
func (s *MySQLStorage) GetDeviceByToken(token string) (Device, bool) {
query := "SELECT id, name, ip, token, status, created_at, updated_at FROM devices WHERE token = ?"
row := s.db.QueryRow(query, token)
var device Device
err := row.Scan(&device.ID, &device.Name, &device.IP, &device.Token, &device.Status, &device.CreatedAt, &device.UpdatedAt)
if err != nil {
return Device{}, false
}
return device, true
}
// AddDevice 添加设备
func (s *MySQLStorage) AddDevice(device Device) error {
query := "INSERT INTO devices (id, name, ip, token, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE name = VALUES(name), ip = VALUES(ip), updated_at = VALUES(updated_at)"
_, err := s.db.Exec(query, device.ID, device.Name, device.IP, device.Token, device.Status, device.CreatedAt, device.UpdatedAt)
return err
}
// UpdateDevice 更新设备
func (s *MySQLStorage) UpdateDevice(device Device) error {
query := "UPDATE devices SET name = ?, ip = ?, status = ?, updated_at = ? WHERE id = ?"
_, err := s.db.Exec(query, device.Name, device.IP, device.Status, device.UpdatedAt, device.ID)
return err
}
// DeleteDevice 删除设备
func (s *MySQLStorage) DeleteDevice(id string) error {
query := "DELETE FROM devices WHERE id = ?"
_, err := s.db.Exec(query, id)
return err
}
// UpdateDeviceStatus 更新设备状态
func (s *MySQLStorage) UpdateDeviceStatus(id string, status string) error {
query := "UPDATE devices SET status = ?, updated_at = ? WHERE id = ?"
_, err := s.db.Exec(query, status, time.Now().Unix(), id)
return err
}

View File

@@ -0,0 +1,670 @@
package handler
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/monitor/backend/internal/device"
"github.com/monitor/backend/internal/storage"
)
// Handler API处理器
type Handler struct {
storage *storage.Storage
}
// NewHandler 创建新的API处理器
func NewHandler(storage *storage.Storage) *Handler {
return &Handler{
storage: storage,
}
}
// RegisterRoutes 注册所有路由
func RegisterRoutes(r *gin.Engine) {
// API路由组
api := r.Group("/api")
{
// 监控数据路由
metrics := api.Group("/metrics")
{
metrics.GET("/cpu", GetCPUMetrics)
metrics.GET("/memory", GetMemoryMetrics)
metrics.GET("/disk", GetDiskMetrics)
metrics.GET("/network", GetNetworkMetrics)
// 添加POST端点接收Agent发送的指标数据
metrics.POST("/", HandleMetricsPost)
}
// 设备管理路由
devices := api.Group("/devices")
{
devices.GET("/", GetDevices) // 获取活跃设备列表
devices.GET("/all", GetAllDevices) // 获取所有设备
devices.GET("/:id", GetDevice) // 获取单个设备详情
devices.POST("/", AddDevice) // 添加设备
devices.PUT("/:id", UpdateDevice) // 更新设备信息
devices.DELETE("/:id", DeleteDevice) // 删除设备
// 添加获取设备状态的端点
devices.GET("/status", GetAllDeviceStatus) // 获取所有活跃设备状态
devices.GET("/:id/status", GetDeviceStatus) // 获取单个设备状态
}
// WebSocket端点
api.GET("/ws", WebSocketHandler)
// 添加设备状态概览端点,作为/api/devices/status的别名
api.GET("/status", GetAllDeviceStatus)
}
}
// MetricsRequest 指标请求结构
type MetricsRequest struct {
CPU float64 `json:"cpu"`
Memory float64 `json:"memory"`
Disk map[string]float64 `json:"disk"`
Network struct {
BytesSent uint64 `json:"bytes_sent"`
BytesReceived uint64 `json:"bytes_received"`
} `json:"network"`
}
// HandleMetricsPost 处理Agent发送的指标数据
func HandleMetricsPost(c *gin.Context) {
// 获取设备令牌
token := c.GetHeader("X-Device-Token")
if token == "" {
c.JSON(http.StatusUnauthorized, gin.H{
"error": "Device token is required",
})
return
}
// 通过令牌获取设备
device, ok := deviceStorage.GetDeviceByToken(token)
if !ok {
c.JSON(http.StatusUnauthorized, gin.H{
"error": "Invalid device token",
})
return
}
// 获取设备ID用于后续处理
deviceID := device.ID
// 获取Agent名称
agentName := c.GetHeader("X-Agent-Name")
if agentName == "" {
// 如果没有提供名称,使用设备名称
agentName = device.Name
}
// 解析请求体
var req MetricsRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Invalid request body",
})
return
}
// 更新设备状态为active
if err := deviceStorage.UpdateDeviceStatus(deviceID, "active"); err != nil {
// 只记录警告,不影响指标处理
log.Printf("Warning: Failed to update device status: %v", err)
}
// 创建基础标签包含Agent名称
baseTags := map[string]string{
"agent_name": agentName,
}
// 写入CPU指标
if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "cpu", req.CPU, baseTags); err != nil {
// 只记录警告,不影响后续指标处理
log.Printf("Warning: Failed to write CPU metrics: %v", err)
}
// 写入内存指标
if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "memory", req.Memory, baseTags); err != nil {
// 只记录警告,不影响后续指标处理
log.Printf("Warning: Failed to write memory metrics: %v", err)
}
// 写入磁盘指标,支持多个挂载点
for mountpoint, usage := range req.Disk {
// 为每个挂载点创建标签,包含基础标签和挂载点
tags := make(map[string]string)
// 复制基础标签
for k, v := range baseTags {
tags[k] = v
}
// 添加挂载点标签
tags["mountpoint"] = mountpoint
// 写入磁盘指标
if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "disk", usage, tags); err != nil {
// 只记录警告,不影响后续指标处理
log.Printf("Warning: Failed to write disk metrics for mountpoint %s: %v", mountpoint, err)
}
}
// 写入网络发送指标
if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "network_sent", float64(req.Network.BytesSent), baseTags); err != nil {
// 只记录警告,不影响后续指标处理
log.Printf("Warning: Failed to write network sent metrics: %v", err)
}
// 写入网络接收指标
if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "network_received", float64(req.Network.BytesReceived), baseTags); err != nil {
// 只记录警告,不影响后续指标处理
log.Printf("Warning: Failed to write network received metrics: %v", err)
}
// 广播指标更新消息
metrics := map[string]interface{}{
"cpu": req.CPU,
"memory": req.Memory,
"disk": req.Disk,
"network": map[string]uint64{
"bytes_sent": req.Network.BytesSent,
"bytes_received": req.Network.BytesReceived,
},
}
broadcastMetricsUpdate(deviceID, metrics)
// 返回成功响应
c.JSON(http.StatusOK, gin.H{
"message": "Metrics received successfully",
})
}
// 全局存储实例(简化处理,实际应该使用依赖注入)
var globalStorage *storage.Storage
var deviceStorage device.Storage
// SetStorage 设置全局存储实例
func SetStorage(s *storage.Storage) {
globalStorage = s
}
// SetDeviceStorage 设置设备存储实例
func SetDeviceStorage(s device.Storage) {
deviceStorage = s
}
// 生成安全的设备令牌
func generateDeviceToken() string {
// 生成32字节的随机数据
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
// 如果生成随机数失败使用时间戳和PID作为备选方案
return hex.EncodeToString([]byte(time.Now().String() + "-" + string(os.Getpid())))
}
// 转换为32位的十六进制字符串
return hex.EncodeToString(bytes)
}
// WebSocket相关配置
var (
// 升级器用于将HTTP连接升级为WebSocket连接
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源,生产环境应该限制
},
}
// 客户端连接管理
clients = make(map[*websocket.Conn]bool)
clientsLock sync.Mutex
// 消息广播通道
broadcast = make(chan map[string]interface{})
)
// 启动WebSocket消息处理
func init() {
go handleMessages()
}
// 处理WebSocket消息
func handleMessages() {
for {
// 从广播通道接收消息
message := <-broadcast
// 序列化消息
jsonMessage, err := json.Marshal(message)
if err != nil {
continue
}
// 广播消息给所有客户端
clientsLock.Lock()
for client := range clients {
if err := client.WriteMessage(websocket.TextMessage, jsonMessage); err != nil {
// 连接出错,关闭连接并从客户端列表中移除
client.Close()
delete(clients, client)
}
}
clientsLock.Unlock()
}
}
// WebSocket端点用于客户端连接
func WebSocketHandler(c *gin.Context) {
// 升级HTTP连接为WebSocket连接
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
defer conn.Close()
// 将客户端添加到客户端列表
clientsLock.Lock()
clients[conn] = true
clientsLock.Unlock()
// 处理客户端消息(这里我们只发送消息,不处理接收的消息)
for {
// 读取消息(如果客户端发送消息,我们不处理,直接忽略)
_, _, err := conn.ReadMessage()
if err != nil {
// 连接出错,从客户端列表中移除
clientsLock.Lock()
delete(clients, conn)
clientsLock.Unlock()
break
}
}
}
// 广播指标更新消息
func broadcastMetricsUpdate(deviceID string, metrics map[string]interface{}) {
message := map[string]interface{}{
"type": "metrics_update",
"device_id": deviceID,
"metrics": metrics,
}
broadcast <- message
}
// GetCPUMetrics 获取CPU指标
func GetCPUMetrics(c *gin.Context) {
// 获取查询参数
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
startTime := c.DefaultQuery("start_time", "-24h")
endTime := c.DefaultQuery("end_time", "now()")
aggregation := c.DefaultQuery("aggregation", "average")
interval := c.DefaultQuery("interval", "10s") // 添加interval参数默认10秒
// 查询数据
points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "cpu", startTime, endTime)
if err != nil {
// 只记录警告,返回空数据
log.Printf("Warning: Failed to query CPU metrics: %v", err)
c.JSON(http.StatusOK, gin.H{
"data": []MetricData{},
})
return
}
// 处理数据传递interval、startTime和endTime参数
processedData := ProcessMetrics(points, aggregation, interval, startTime, endTime)
c.JSON(http.StatusOK, gin.H{
"data": processedData,
})
}
// GetMemoryMetrics 获取内存指标
func GetMemoryMetrics(c *gin.Context) {
// 获取查询参数
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
startTime := c.DefaultQuery("start_time", "-24h")
endTime := c.DefaultQuery("end_time", "now()")
aggregation := c.DefaultQuery("aggregation", "average")
interval := c.DefaultQuery("interval", "10s") // 添加interval参数默认10秒
// 查询数据
points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "memory", startTime, endTime)
if err != nil {
// 只记录警告,返回空数据
log.Printf("Warning: Failed to query memory metrics: %v", err)
c.JSON(http.StatusOK, gin.H{
"data": []MetricData{},
})
return
}
// 处理数据传递interval、startTime和endTime参数
processedData := ProcessMetrics(points, aggregation, interval, startTime, endTime)
c.JSON(http.StatusOK, gin.H{
"data": processedData,
})
}
// GetDiskMetrics 获取磁盘指标
func GetDiskMetrics(c *gin.Context) {
// 获取查询参数
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
startTime := c.DefaultQuery("start_time", "-24h")
endTime := c.DefaultQuery("end_time", "now()")
aggregation := c.DefaultQuery("aggregation", "average")
interval := c.DefaultQuery("interval", "10s") // 添加interval参数默认10秒
// 查询数据
points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "disk", startTime, endTime)
if err != nil {
// 只记录警告,返回空数据
log.Printf("Warning: Failed to query disk metrics: %v", err)
c.JSON(http.StatusOK, gin.H{
"data": map[string][]MetricData{},
})
return
}
// 按挂载点分组
mountpointData := make(map[string][]storage.MetricPoint)
for _, point := range points {
// 获取挂载点标签
mountpoint := "unknown"
if point.Tags != nil && point.Tags["mountpoint"] != "" {
mountpoint = point.Tags["mountpoint"]
}
mountpointData[mountpoint] = append(mountpointData[mountpoint], point)
}
// 处理数据,为每个挂载点创建独立的数据集
result := make(map[string][]MetricData)
for mountpoint, mountpointPoints := range mountpointData {
processedData := ProcessMetrics(mountpointPoints, aggregation, interval, startTime, endTime)
result[mountpoint] = processedData
}
c.JSON(http.StatusOK, gin.H{
"data": result,
})
}
// GetNetworkMetrics 获取网络指标
func GetNetworkMetrics(c *gin.Context) {
// 获取查询参数
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
startTime := c.DefaultQuery("start_time", "-24h")
endTime := c.DefaultQuery("end_time", "now()")
aggregation := c.DefaultQuery("aggregation", "average")
interval := c.DefaultQuery("interval", "10s") // 添加interval参数默认10秒
// 查询发送和接收的网络指标
sentPoints, err1 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_sent", startTime, endTime)
receivedPoints, err2 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_received", startTime, endTime)
// 处理错误
if err1 != nil {
log.Printf("Warning: Failed to query network sent metrics: %v", err1)
sentPoints = []storage.MetricPoint{}
}
if err2 != nil {
log.Printf("Warning: Failed to query network received metrics: %v", err2)
receivedPoints = []storage.MetricPoint{}
}
// 处理数据传递interval、startTime和endTime参数
processedSentData := ProcessMetrics(sentPoints, aggregation, interval, startTime, endTime)
processedReceivedData := ProcessMetrics(receivedPoints, aggregation, interval, startTime, endTime)
c.JSON(http.StatusOK, gin.H{
"data": map[string][]MetricData{
"sent": processedSentData,
"received": processedReceivedData,
},
})
}
// GetDevices 获取设备列表
func GetDevices(c *gin.Context) {
// 从设备存储获取所有设备
allDevices := deviceStorage.GetDevices()
// 转换为前端需要的格式
deviceList := make([]map[string]string, 0, len(allDevices))
for _, device := range allDevices {
// 只返回活跃设备
if device.Status == "active" {
deviceList = append(deviceList, map[string]string{
"id": device.ID,
"name": device.Name,
})
}
}
c.JSON(http.StatusOK, gin.H{
"devices": deviceList,
})
}
// GetDeviceStatus 获取设备状态
func GetDeviceStatus(c *gin.Context) {
// 获取设备ID
deviceID := c.Param("id")
if deviceID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Device ID is required",
})
return
}
// 从设备存储获取设备信息
device, ok := deviceStorage.GetDevice(deviceID)
if !ok {
c.JSON(http.StatusNotFound, gin.H{
"error": "Device not found",
})
return
}
// 查询设备监控数据
_, status, err := globalStorage.QueryDeviceStatus(context.Background(), deviceID)
if err != nil {
// 如果查询监控数据失败,返回设备基本信息和空状态
c.JSON(http.StatusOK, gin.H{
"device_id": deviceID,
"name": device.Name,
"status": make(map[string]float64),
})
return
}
c.JSON(http.StatusOK, gin.H{
"device_id": deviceID,
"name": device.Name,
"status": status,
})
}
// GetAllDeviceStatus 获取所有设备状态
func GetAllDeviceStatus(c *gin.Context) {
// 从设备存储获取所有设备
allDevices := deviceStorage.GetDevices()
// 查询每个设备的状态
result := make([]map[string]interface{}, 0, len(allDevices))
for _, device := range allDevices {
// 查询设备监控数据
_, status, _ := globalStorage.QueryDeviceStatus(context.Background(), device.ID)
// 总是返回设备信息,无论是否有监控数据
result = append(result, map[string]interface{}{
"device_id": device.ID,
"name": device.Name,
"status": status,
})
}
c.JSON(http.StatusOK, gin.H{
"devices": result,
})
}
// AddDevice 添加设备
func AddDevice(c *gin.Context) {
var req struct {
ID string `json:"id" binding:"required"`
Name string `json:"name" binding:"required"`
IP string `json:"ip"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Invalid request: " + err.Error(),
})
return
}
// 生成设备令牌
token := generateDeviceToken()
newDevice := device.Device{
ID: req.ID,
Name: req.Name,
IP: req.IP,
Token: token,
Status: "inactive",
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
}
if err := deviceStorage.AddDevice(newDevice); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": "Failed to add device: " + err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"message": "Device added successfully",
"device": newDevice,
})
}
// DeleteDevice 删除设备
func DeleteDevice(c *gin.Context) {
deviceID := c.Param("id")
if deviceID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Device ID is required",
})
return
}
if err := deviceStorage.DeleteDevice(deviceID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": "Failed to delete device: " + err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"message": "Device deleted successfully",
})
}
// UpdateDevice 更新设备
func UpdateDevice(c *gin.Context) {
deviceID := c.Param("id")
if deviceID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Device ID is required",
})
return
}
var req struct {
Name string `json:"name"`
IP string `json:"ip"`
Status string `json:"status"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Invalid request: " + err.Error(),
})
return
}
// 获取现有设备
existingDevice, ok := deviceStorage.GetDevice(deviceID)
if !ok {
c.JSON(http.StatusNotFound, gin.H{
"error": "Device not found",
})
return
}
// 更新设备信息保留原有token
if req.Name != "" {
existingDevice.Name = req.Name
}
if req.IP != "" {
existingDevice.IP = req.IP
}
if req.Status != "" {
existingDevice.Status = req.Status
}
existingDevice.UpdatedAt = time.Now().Unix()
if err := deviceStorage.UpdateDevice(existingDevice); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": "Failed to update device: " + err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"message": "Device updated successfully",
"device": existingDevice,
})
}
// GetDevice 获取单个设备
func GetDevice(c *gin.Context) {
deviceID := c.Param("id")
if deviceID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Device ID is required",
})
return
}
device, ok := deviceStorage.GetDevice(deviceID)
if !ok {
c.JSON(http.StatusNotFound, gin.H{
"error": "Device not found",
})
return
}
c.JSON(http.StatusOK, gin.H{
"device": device,
})
}
// GetAllDevices 获取所有设备
func GetAllDevices(c *gin.Context) {
devices := deviceStorage.GetDevices()
c.JSON(http.StatusOK, gin.H{
"devices": devices,
})
}

View File

@@ -0,0 +1,437 @@
package handler
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/monitor/backend/internal/storage"
)
// MetricData 处理后的监控数据
type MetricData struct {
Time time.Time `json:"time"`
Value float64 `json:"value"`
}
// ParseInterval 解析时间区间字符串,返回秒数
// 支持的格式10s, 1m, 5m, 1h, 1d等
func ParseInterval(intervalStr string) (int, error) {
if intervalStr == "" {
return 10, nil // 默认10秒
}
intervalStr = strings.ToLower(intervalStr)
var multiplier int
// 解析时间单位
switch {
case strings.HasSuffix(intervalStr, "s"):
multiplier = 1
intervalStr = strings.TrimSuffix(intervalStr, "s")
case strings.HasSuffix(intervalStr, "m"):
multiplier = 60
intervalStr = strings.TrimSuffix(intervalStr, "m")
case strings.HasSuffix(intervalStr, "h"):
multiplier = 3600
intervalStr = strings.TrimSuffix(intervalStr, "h")
case strings.HasSuffix(intervalStr, "d"):
multiplier = 86400
intervalStr = strings.TrimSuffix(intervalStr, "d")
default:
return 0, fmt.Errorf("unsupported interval format: %s", intervalStr)
}
// 解析数字部分
value, err := strconv.Atoi(intervalStr)
if err != nil {
return 0, fmt.Errorf("invalid interval value: %s", intervalStr)
}
return value * multiplier, nil
}
// FormatTimeByInterval 根据时间区间格式化时间
func FormatTimeByInterval(t time.Time, intervalSeconds int) string {
// 按指定秒数对齐时间
rounded := t.Truncate(time.Duration(intervalSeconds) * time.Second)
// 根据区间长度选择不同的格式化字符串
if intervalSeconds < 60 {
// 秒级,显示到秒
return rounded.Format("2006-01-02 15:04:05")
} else if intervalSeconds < 3600 {
// 分钟级,显示到分钟
return rounded.Format("2006-01-02 15:04:00")
} else {
// 小时级,显示到小时
return rounded.Format("2006-01-02 15:00:00")
}
}
// ProcessMetrics 处理监控数据,支持动态时间区间
func ProcessMetrics(points []storage.MetricPoint, aggregation string, intervalStr string, startTime, endTime string) []MetricData {
// 解析时间区间
intervalSeconds, err := ParseInterval(intervalStr)
if err != nil {
// 解析失败使用默认值10秒
intervalSeconds = 10
}
// 解析开始和结束时间
var start, end time.Time
var parseErr error
// 解析开始时间
if strings.HasPrefix(startTime, "-") || startTime == "now()" {
// 相对时间,使用当前时间作为基准
now := time.Now()
if startTime == "now()" {
start = now
} else {
// 相对时间,如-24h
relDuration, parseErr := time.ParseDuration(startTime)
if parseErr == nil {
start = now.Add(relDuration)
} else {
// 解析失败,使用默认时间
start = now.Add(-24 * time.Hour)
}
}
} else {
// 绝对时间,尝试解析
start, parseErr = time.Parse(time.RFC3339, startTime)
if parseErr != nil {
// 尝试其他格式
start, parseErr = time.Parse("2006-01-02T15:04", startTime)
if parseErr != nil {
// 尝试YYYY-MM-DD HH:MM格式
start, parseErr = time.Parse("2006-01-02 15:04", startTime)
if parseErr != nil {
// 解析失败,使用默认时间
start = time.Now().Add(-24 * time.Hour)
}
}
}
}
// 解析结束时间
if endTime == "now()" {
end = time.Now()
} else if strings.HasPrefix(endTime, "-") {
// 相对时间,使用当前时间作为基准
now := time.Now()
relDuration, parseErr := time.ParseDuration(endTime)
if parseErr == nil {
end = now.Add(relDuration)
} else {
// 解析失败,使用当前时间
end = now
}
} else {
// 绝对时间,尝试解析
end, parseErr = time.Parse(time.RFC3339, endTime)
if parseErr != nil {
// 尝试其他格式
end, parseErr = time.Parse("2006-01-02T15:04", endTime)
if parseErr != nil {
// 尝试YYYY-MM-DD HH:MM格式
end, parseErr = time.Parse("2006-01-02 15:04", endTime)
if parseErr != nil {
// 解析失败,使用当前时间
end = time.Now()
}
}
}
}
// 如果没有数据点,生成覆盖整个时间范围的空数据点
if len(points) == 0 {
// 生成空数据点,确保覆盖整个时间范围
result := make([]MetricData, 0)
currentTime := start
// 循环生成数据点,直到超过结束时间
for currentTime.Before(end) {
result = append(result, MetricData{
Time: currentTime,
Value: 0, // 使用0表示无数据
})
currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second)
}
return result
}
// 根据时间区段精细程度自动调整聚合策略
// 如果时间区段精细程度为1s显示全部数据去重后
// 如果时间区段精细程度>1s根据聚合类型进行聚合
if intervalSeconds == 1 {
// 1秒时间区段返回全部原始数据去重后
return processRawData(points)
} else {
// 大于1秒时间区段根据聚合类型处理
switch aggregation {
case "raw":
// 对于raw聚合类型返回去重后的原始数据
return processRawData(points)
case "average":
return processAverageData(points, intervalSeconds, start, end)
case "max":
return processMaxData(points, intervalSeconds, start, end)
case "min":
return processMinData(points, intervalSeconds, start, end)
case "sum":
return processSumData(points, intervalSeconds, start, end)
default:
// 默认返回平均值
return processAverageData(points, intervalSeconds, start, end)
}
}
}
// processRawData 处理原始数据,添加去重逻辑
func processRawData(points []storage.MetricPoint) []MetricData {
// 使用map进行去重key为时间戳精确到秒+值的组合
deduplicated := make(map[string]storage.MetricPoint)
for _, point := range points {
// 精确到秒的时间戳
key := fmt.Sprintf("%d_%.2f", point.Time.Unix(), point.Value)
deduplicated[key] = point
}
// 将去重后的数据转换为切片
result := make([]MetricData, 0, len(deduplicated))
for _, point := range deduplicated {
result = append(result, MetricData{
Time: point.Time,
Value: point.Value,
})
}
// 按时间排序
sort.Slice(result, func(i, j int) bool {
return result[i].Time.Before(result[j].Time)
})
return result
}
// processAverageData 处理平均值数据
func processAverageData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData {
// 按指定时间区间聚合的平均值
// 按指定区间分组保留UTC时区信息
intervalData := make(map[string][]float64)
for _, point := range points {
// 使用UTC时间按指定区间对齐
utcTime := point.Time.UTC()
// 格式化时间键
intervalKey := FormatTimeByInterval(utcTime, intervalSeconds)
value := point.Value
intervalData[intervalKey] = append(intervalData[intervalKey], value)
}
// 生成覆盖整个请求时间范围的完整时间序列
result := make([]MetricData, 0)
currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second)
// 循环生成每个时间区间的数据点
for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) {
// 格式化当前时间为区间键
intervalKey := FormatTimeByInterval(currentTime, intervalSeconds)
// 计算当前区间的平均值
var value float64 = 0
if values, exists := intervalData[intervalKey]; exists && len(values) > 0 {
// 计算平均值
sum := 0.0
for _, v := range values {
sum += v
}
value = sum / float64(len(values))
}
// 添加到结果
result = append(result, MetricData{
Time: currentTime,
Value: value,
})
// 移动到下一个时间区间
currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second)
}
return result
}
// processMaxData 处理最大值数据
func processMaxData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData {
// 按指定时间区间聚合的最大值
// 按指定区间分组保留UTC时区信息
intervalData := make(map[string][]float64)
for _, point := range points {
// 使用UTC时间按指定区间对齐
utcTime := point.Time.UTC()
// 格式化时间键
intervalKey := FormatTimeByInterval(utcTime, intervalSeconds)
value := point.Value
intervalData[intervalKey] = append(intervalData[intervalKey], value)
}
// 生成覆盖整个请求时间范围的完整时间序列
result := make([]MetricData, 0)
currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second)
// 循环生成每个时间区间的数据点
for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) {
// 格式化当前时间为区间键
intervalKey := FormatTimeByInterval(currentTime, intervalSeconds)
// 计算当前区间的最大值
var value float64 = 0
if values, exists := intervalData[intervalKey]; exists && len(values) > 0 {
// 计算最大值
max := values[0]
for _, v := range values {
if v > max {
max = v
}
}
value = max
}
// 添加到结果
result = append(result, MetricData{
Time: currentTime,
Value: value,
})
// 移动到下一个时间区间
currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second)
}
return result
}
// processMinData 处理最小值数据
func processMinData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData {
// 按指定时间区间聚合的最小值
// 按指定区间分组保留UTC时区信息
intervalData := make(map[string][]float64)
for _, point := range points {
// 使用UTC时间按指定区间对齐
utcTime := point.Time.UTC()
// 格式化时间键
intervalKey := FormatTimeByInterval(utcTime, intervalSeconds)
value := point.Value
intervalData[intervalKey] = append(intervalData[intervalKey], value)
}
// 生成覆盖整个请求时间范围的完整时间序列
result := make([]MetricData, 0)
currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second)
// 循环生成每个时间区间的数据点
for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) {
// 格式化当前时间为区间键
intervalKey := FormatTimeByInterval(currentTime, intervalSeconds)
// 计算当前区间的最小值
var value float64 = 0
if values, exists := intervalData[intervalKey]; exists && len(values) > 0 {
// 计算最小值
min := values[0]
for _, v := range values {
if v < min {
min = v
}
}
value = min
}
// 添加到结果
result = append(result, MetricData{
Time: currentTime,
Value: value,
})
// 移动到下一个时间区间
currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second)
}
return result
}
// processSumData 处理总和数据
func processSumData(points []storage.MetricPoint, intervalSeconds int, start, end time.Time) []MetricData {
// 按指定时间区间聚合的总和
// 按指定区间分组保留UTC时区信息
intervalData := make(map[string][]storage.MetricPoint)
for _, point := range points {
// 使用UTC时间按指定区间对齐
utcTime := point.Time.UTC()
// 格式化时间键
intervalKey := FormatTimeByInterval(utcTime, intervalSeconds)
intervalData[intervalKey] = append(intervalData[intervalKey], point)
}
// 生成覆盖整个请求时间范围的完整时间序列
result := make([]MetricData, 0)
currentTime := start.Truncate(time.Duration(intervalSeconds) * time.Second)
// 循环生成每个时间区间的数据点
for currentTime.Before(end.Add(time.Duration(intervalSeconds) * time.Second)) {
// 格式化当前时间为区间键
intervalKey := FormatTimeByInterval(currentTime, intervalSeconds)
// 计算当前区间的总和
sum := 0.0
if intervalPoints, exists := intervalData[intervalKey]; exists {
// 如果数据点数量小于2直接返回第一个数据点的值
if len(intervalPoints) < 2 {
if len(intervalPoints) > 0 {
sum = intervalPoints[0].Value
}
} else {
// 按时间排序
sort.Slice(intervalPoints, func(i, j int) bool {
return intervalPoints[i].Time.Before(intervalPoints[j].Time)
})
for i := 1; i < len(intervalPoints); i++ {
// 计算时间差(秒)
timeDiff := intervalPoints[i].Time.Sub(intervalPoints[i-1].Time).Seconds()
if timeDiff > 0 {
// 计算这个时间段的平均速率
averageRate := (intervalPoints[i].Value + intervalPoints[i-1].Value) / 2
// 计算这个时间段的流量
flow := averageRate * timeDiff
// 累加到总和
sum += flow
}
}
}
}
// 添加到结果
result = append(result, MetricData{
Time: currentTime,
Value: sum,
})
// 移动到下一个时间区间
currentTime = currentTime.Add(time.Duration(intervalSeconds) * time.Second)
}
return result
}

View File

@@ -0,0 +1,317 @@
package storage
import (
"context"
"log"
"strings"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/monitor/backend/config"
)
// MetricPoint 自定义监控指标点
type MetricPoint struct {
Time time.Time `json:"time"`
Value float64 `json:"value"`
Tags map[string]string `json:"tags,omitempty"`
}
type Storage struct {
client influxdb2.Client
org string
bucket string
}
// NewStorage 创建新的存储实例
func NewStorage(cfg *config.Config) *Storage {
var client influxdb2.Client
// InfluxDB 2.x版本使用Token认证
// 检查Token是否为空
if cfg.InfluxDB.Token != "" {
// 使用Token认证适用于InfluxDB 2.x
client = influxdb2.NewClient(cfg.InfluxDB.URL, cfg.InfluxDB.Token)
} else {
// Token为空尝试使用默认客户端
// 注意InfluxDB 2.x需要有效的Token才能写入数据
log.Printf("Warning: InfluxDB Token is empty, write operations may fail")
client = influxdb2.NewClient(cfg.InfluxDB.URL, "")
}
// 禁用InfluxDB客户端的调试日志
client.Options().SetLogLevel(0)
return &Storage{
client: client,
org: cfg.InfluxDB.Org,
bucket: cfg.InfluxDB.Bucket,
}
}
// Close 关闭存储连接
func (s *Storage) Close() {
s.client.Close()
}
// WriteMetric 写入监控指标
func (s *Storage) WriteMetric(ctx context.Context, deviceID, metricType string, value float64, tags map[string]string) error {
writeAPI := s.client.WriteAPIBlocking(s.org, s.bucket)
// 创建标签映射,合并原有标签和新标签
allTags := make(map[string]string)
// 复制原有标签
for k, v := range tags {
allTags[k] = v
}
// 添加设备ID标签
allTags["device_id"] = deviceID
// 添加指标类型标签
allTags["type"] = metricType
// 创建数据点
point := influxdb2.NewPoint(
"metrics",
allTags,
map[string]interface{}{
"value": value,
},
time.Now(),
)
// 写入数据点
return writeAPI.WritePoint(ctx, point)
}
// QueryMetrics 查询监控指标
func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string) ([]MetricPoint, error) {
queryAPI := s.client.QueryAPI(s.org)
// 处理时间参数
processedStartTime := startTime
processedEndTime := endTime
// 检查并修复时间格式确保InfluxDB能正确识别
// 对于相对时间(如-24h或now(),直接使用
// 对于绝对时间,确保格式正确
if len(startTime) > 0 {
// 检查是否是相对时间(以-开头或为now()
isRelativeTime := strings.HasPrefix(startTime, "-") || startTime == "now()"
if !isRelativeTime {
// 是绝对时间尝试解析为time.Time类型
t, err := time.Parse(time.RFC3339, startTime)
if err != nil {
// 尝试其他常见格式
t, err = time.Parse("2006-01-02T15:04", startTime)
if err != nil {
// 尝试YYYY-MM-DD HH:MM格式
t, err = time.Parse("2006-01-02 15:04", startTime)
}
}
if err == nil {
// 解析成功使用RFC3339格式InfluxDB可以识别
processedStartTime = t.Format(time.RFC3339)
}
}
}
if len(endTime) > 0 {
// 检查是否是相对时间(以-开头或为now()
isRelativeTime := strings.HasPrefix(endTime, "-") || endTime == "now()"
if !isRelativeTime {
// 是绝对时间尝试解析为time.Time类型
t, err := time.Parse(time.RFC3339, endTime)
if err != nil {
// 尝试其他常见格式
t, err = time.Parse("2006-01-02T15:04", endTime)
if err != nil {
// 尝试YYYY-MM-DD HH:MM格式
t, err = time.Parse("2006-01-02 15:04", endTime)
}
}
if err == nil {
// 解析成功使用RFC3339格式InfluxDB可以识别
processedEndTime = t.Format(time.RFC3339)
}
}
}
// 构建查询语句,包含标签信息和字段过滤
query := `from(bucket: "` + s.bucket + `")
|> range(start: ` + processedStartTime + `, stop: ` + processedEndTime + `)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> filter(fn: (r) => r["_field"] == "value")`
// 只有当deviceID非空时才添加device_id过滤
if deviceID != "" {
query += `
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
}
query += `
|> filter(fn: (r) => r["type"] == "` + metricType + `")
|> sort(columns: ["_time"])`
// 执行查询
result, err := queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
defer result.Close()
var points []MetricPoint
// 处理查询结果
for result.Next() {
// 表结构变化也需要处理数据
// 获取记录
record := result.Record()
// 获取所有标签
tags := make(map[string]string)
for k, v := range record.Values() {
// 跳过内置字段,只保留自定义标签
if k != "_time" && k != "_value" && k != "_field" && k != "_measurement" {
if strValue, ok := v.(string); ok {
tags[k] = strValue
}
}
}
// 转换为自定义MetricPoint
point := MetricPoint{
Time: record.Time(),
Value: record.Value().(float64),
Tags: tags,
}
points = append(points, point)
}
if result.Err() != nil {
return nil, result.Err()
}
return points, nil
}
// DeviceInfo 设备信息
func (s *Storage) QueryAllDevices(ctx context.Context) ([]map[string]string, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句获取所有唯一的device_id和对应的agent_name
query := `from(bucket: "` + s.bucket + `")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> group(columns: ["device_id", "agent_name"])
|> last()
|> keep(columns: ["device_id", "agent_name"])`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
defer queryResult.Close()
deviceMap := make(map[string]string)
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
// 获取记录
record := queryResult.Record()
deviceID := record.ValueByKey("device_id").(string)
agentName := ""
if name, ok := record.ValueByKey("agent_name").(string); ok {
agentName = name
}
if agentName == "" {
agentName = deviceID
}
deviceMap[deviceID] = agentName
}
if queryResult.Err() != nil {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
// 转换为切片
deviceList := make([]map[string]string, 0, len(deviceMap))
for deviceID, agentName := range deviceMap {
deviceList = append(deviceList, map[string]string{
"id": deviceID,
"name": agentName,
})
}
// 如果没有设备,返回默认设备
if len(deviceList) == 0 {
return []map[string]string{{"id": "default", "name": "Default"}}, nil
}
return deviceList, nil
}
// DeviceStatus 设备状态
func (s *Storage) QueryDeviceStatus(ctx context.Context, deviceID string) (string, map[string]float64, error) {
queryAPI := s.client.QueryAPI(s.org)
// 构建查询语句获取设备最新的指标数据和agent名称
query := `from(bucket: "` + s.bucket + `")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "metrics")
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")
|> last()`
// 执行查询
queryResult, err := queryAPI.Query(ctx, query)
if err != nil {
return "", nil, err
}
defer queryResult.Close()
status := make(map[string]float64)
agentName := deviceID
hasData := false
// 处理查询结果
for queryResult.Next() {
if queryResult.TableChanged() {
// 表结构变化,跳过
continue
}
hasData = true
// 获取记录
record := queryResult.Record()
metricType := record.ValueByKey("type").(string)
value := record.Value().(float64)
status[metricType] = value
// 获取agent名称
if name, ok := record.ValueByKey("agent_name").(string); ok && name != "" {
agentName = name
}
}
if queryResult.Err() != nil {
return "", nil, queryResult.Err()
}
if !hasData {
// 如果没有数据,返回空状态
return "", map[string]float64{}, nil
}
return agentName, status, nil
}