1061 lines
32 KiB
Go
1061 lines
32 KiB
Go
package handler
|
||
|
||
import (
|
||
"context"
|
||
"crypto/rand"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"strconv"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/gorilla/websocket"
|
||
"github.com/monitor/backend/internal/device"
|
||
"github.com/monitor/backend/internal/storage"
|
||
)
|
||
|
||
// Handler API处理器
|
||
type Handler struct {
|
||
storage *storage.Storage
|
||
}
|
||
|
||
// NewHandler 创建新的API处理器
|
||
func NewHandler(storage *storage.Storage) *Handler {
|
||
return &Handler{
|
||
storage: storage,
|
||
}
|
||
}
|
||
|
||
// RegisterRoutes 注册所有路由
|
||
func RegisterRoutes(r *gin.Engine) {
|
||
// API路由组
|
||
api := r.Group("/api")
|
||
{
|
||
// 监控数据路由
|
||
metrics := api.Group("/metrics")
|
||
{
|
||
metrics.GET("/cpu", GetCPUMetrics) // 添加CPU信息查询端点
|
||
metrics.GET("/memory", GetMemoryMetrics) // 添加内存信息查询端点
|
||
metrics.GET("/disk", GetDiskMetrics) // 添加磁盘信息查询端点
|
||
metrics.GET("/network", GetNetworkMetrics) // 添加网络信息查询端点
|
||
metrics.GET("/processes", GetProcessMetrics) // 添加进程信息查询端点
|
||
metrics.GET("/disk_details", GetDiskDetails) // 添加磁盘详细信息查询端点
|
||
metrics.GET("/logs", GetLogs) // 添加系统日志查询端点
|
||
// 添加POST端点,接收Agent发送的指标数据
|
||
metrics.POST("/", HandleMetricsPost)
|
||
}
|
||
|
||
// 设备管理路由
|
||
devices := api.Group("/devices")
|
||
{
|
||
devices.GET("/", GetDevices) // 获取活跃设备列表
|
||
devices.GET("/all", GetAllDevices) // 获取所有设备
|
||
devices.GET("/:id", GetDevice) // 获取单个设备详情
|
||
devices.POST("/", AddDevice) // 添加设备
|
||
devices.PUT("/:id", UpdateDevice) // 更新设备信息
|
||
devices.DELETE("/:id", DeleteDevice) // 删除设备
|
||
// 添加获取设备状态的端点
|
||
devices.GET("/status", GetAllDeviceStatus) // 获取所有活跃设备状态
|
||
devices.GET("/:id/status", GetDeviceStatus) // 获取单个设备状态
|
||
}
|
||
|
||
// WebSocket端点
|
||
api.GET("/ws", WebSocketHandler)
|
||
|
||
// 添加设备状态概览端点,作为/api/devices/status的别名
|
||
api.GET("/status", GetAllDeviceStatus)
|
||
}
|
||
}
|
||
|
||
// DiskMetrics 磁盘监控指标
|
||
type DiskMetrics struct {
|
||
UsedPercent float64 `json:"used_percent"` // 使用率百分比
|
||
Total uint64 `json:"total"` // 总容量 (bytes)
|
||
}
|
||
|
||
// ProcessMetrics 进程监控指标
|
||
type ProcessMetrics struct {
|
||
Name string `json:"name"` // 进程名
|
||
Username string `json:"username"` // 用户名
|
||
PID int32 `json:"pid"` // 进程ID
|
||
CPU float64 `json:"cpu"` // CPU使用率
|
||
Memory float64 `json:"memory"` // 内存使用率
|
||
Path string `json:"path"` // 路径
|
||
Cmdline string `json:"cmdline"` // 命令行
|
||
Ports []int `json:"ports"` // 占用端口
|
||
}
|
||
|
||
// DiskDetailMetrics 磁盘详细信息
|
||
type DiskDetailMetrics struct {
|
||
DeviceID string `json:"device_id"` // 设备ID
|
||
Status string `json:"status"` // 设备状态
|
||
Type string `json:"type"` // 设备类型
|
||
SizeGB float64 `json:"size_gb"` // 设备大小(GB)
|
||
Model string `json:"model"` // 设备型号
|
||
InterfaceType string `json:"interface_type"` // 接口类型
|
||
Description string `json:"description"` // 设备描述
|
||
}
|
||
|
||
// LogMetrics 系统日志指标
|
||
type LogMetrics struct {
|
||
Source string `json:"source"` // 日志来源
|
||
Time string `json:"time"` // 日志时间
|
||
Message string `json:"message"` // 日志内容
|
||
}
|
||
|
||
// NetworkInterfaceMetrics 网卡监控指标
|
||
type NetworkInterfaceMetrics struct {
|
||
BytesSent uint64 `json:"bytes_sent"` // 发送速率 (bytes/s)
|
||
BytesReceived uint64 `json:"bytes_received"` // 接收速率 (bytes/s)
|
||
TxBytes uint64 `json:"tx_bytes"` // 累计发送字节数
|
||
RxBytes uint64 `json:"rx_bytes"` // 累计接收字节数
|
||
}
|
||
|
||
// LogEntry 系统日志条目
|
||
type LogEntry struct {
|
||
Sequence int `json:"sequence"` // 日志序号
|
||
Source string `json:"source"` // 来源
|
||
Time time.Time `json:"time"` // 发生时间
|
||
Message string `json:"message"` // 内容
|
||
}
|
||
|
||
// MetricsRequest 指标请求结构
|
||
type MetricsRequest struct {
|
||
CPU float64 `json:"cpu"`
|
||
CPUHz float64 `json:"cpu_hz"` // CPU频率 (MHz)
|
||
Memory float64 `json:"memory"`
|
||
Disk map[string]DiskMetrics `json:"disk"`
|
||
DiskDetails []DiskDetailMetrics `json:"disk_details"` // 磁盘详细信息
|
||
Network map[string]NetworkInterfaceMetrics `json:"network"`
|
||
Processes []ProcessMetrics `json:"processes"` // 进程信息
|
||
Logs []LogEntry `json:"logs"` // 系统日志
|
||
RxTotal uint64 `json:"rx_total"` // 所有网卡累计接收字节数总和
|
||
TxTotal uint64 `json:"tx_total"` // 所有网卡累计发送字节数总和
|
||
RxRate uint64 `json:"rx_rate"` // 所有网卡实时接收速率总和 (bytes/s)
|
||
TxRate uint64 `json:"tx_rate"` // 所有网卡实时发送速率总和 (bytes/s)
|
||
}
|
||
|
||
// HandleMetricsPost 处理Agent发送的指标数据
|
||
func HandleMetricsPost(c *gin.Context) {
|
||
// 获取设备令牌
|
||
token := c.GetHeader("X-Device-Token")
|
||
if token == "" {
|
||
c.JSON(http.StatusUnauthorized, gin.H{
|
||
"error": "Device token is required",
|
||
})
|
||
return
|
||
}
|
||
|
||
// 通过令牌获取设备
|
||
device, ok := deviceStorage.GetDeviceByToken(token)
|
||
if !ok {
|
||
c.JSON(http.StatusUnauthorized, gin.H{
|
||
"error": "Invalid device token",
|
||
})
|
||
return
|
||
}
|
||
|
||
// 获取设备ID,用于后续处理
|
||
deviceID := device.ID
|
||
|
||
// 获取Agent名称
|
||
agentName := c.GetHeader("X-Agent-Name")
|
||
if agentName == "" {
|
||
// 如果没有提供名称,使用设备名称
|
||
agentName = device.Name
|
||
}
|
||
|
||
// 确保设备状态为active
|
||
if device.Status != "active" {
|
||
// 更新设备状态为active
|
||
if err := deviceStorage.UpdateDeviceStatus(deviceID, "active"); err != nil {
|
||
// 只记录警告,不影响指标处理
|
||
log.Printf("Warning: Failed to update device status: %v", err)
|
||
} else {
|
||
log.Printf("Device %s activated successfully", deviceID)
|
||
}
|
||
}
|
||
|
||
// 创建基础标签,包含Agent名称
|
||
baseTags := map[string]string{
|
||
"agent_name": agentName,
|
||
}
|
||
|
||
// 处理请求体,支持单指标对象和指标数组
|
||
var metricsList []MetricsRequest
|
||
|
||
// 读取请求体到缓冲区,以便多次解析
|
||
body, err := c.GetRawData()
|
||
if err != nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Failed to read request body",
|
||
})
|
||
return
|
||
}
|
||
|
||
// 首先尝试解析为数组
|
||
if err := json.Unmarshal(body, &metricsList); err != nil {
|
||
// 如果解析数组失败,尝试解析为单个对象
|
||
var singleMetric MetricsRequest
|
||
if err := json.Unmarshal(body, &singleMetric); err != nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Invalid request body",
|
||
})
|
||
return
|
||
}
|
||
// 将单个对象添加到列表中
|
||
metricsList = append(metricsList, singleMetric)
|
||
}
|
||
|
||
// 创建单独的上下文用于InfluxDB写入,避免HTTP请求结束时上下文被取消
|
||
writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
|
||
// 处理所有指标
|
||
for i, req := range metricsList {
|
||
// 写入CPU使用率指标
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "cpu", req.CPU, baseTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write CPU metrics: %v", err)
|
||
}
|
||
|
||
// 写入CPU频率指标(如果有)
|
||
if req.CPUHz > 0 {
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "cpu_hz", req.CPUHz, baseTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write CPU Hz metrics: %v", err)
|
||
}
|
||
}
|
||
|
||
// 写入内存指标
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "memory", req.Memory, baseTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write memory metrics: %v", err)
|
||
}
|
||
|
||
// 写入磁盘指标,支持多个挂载点
|
||
for mountpoint, diskMetrics := range req.Disk {
|
||
// 为每个挂载点创建标签,包含基础标签和挂载点
|
||
tags := make(map[string]string)
|
||
// 复制基础标签
|
||
for k, v := range baseTags {
|
||
tags[k] = v
|
||
}
|
||
// 添加挂载点标签
|
||
tags["mountpoint"] = mountpoint
|
||
|
||
// 写入磁盘使用率指标
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "disk", diskMetrics.UsedPercent, tags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write disk metrics for mountpoint %s: %v", mountpoint, err)
|
||
}
|
||
}
|
||
|
||
// 写入网络指标,支持多个网卡
|
||
var totalBytesSent, totalBytesReceived uint64
|
||
var totalTxBytes, totalRxBytes uint64 // 累计总流量
|
||
for interfaceName, networkMetrics := range req.Network {
|
||
// 跳过空名称的网卡
|
||
if interfaceName == "" {
|
||
continue
|
||
}
|
||
// 为每个网卡创建标签,包含基础标签和网卡名称
|
||
interfaceTags := make(map[string]string)
|
||
// 复制基础标签
|
||
for k, v := range baseTags {
|
||
interfaceTags[k] = v
|
||
}
|
||
// 添加网卡标签
|
||
interfaceTags["interface"] = interfaceName
|
||
|
||
// 写入网络发送速率指标
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "network_sent", float64(networkMetrics.BytesSent), interfaceTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write network sent metrics for interface %s: %v", interfaceName, err)
|
||
}
|
||
|
||
// 写入网络接收速率指标
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "network_received", float64(networkMetrics.BytesReceived), interfaceTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write network received metrics for interface %s: %v", interfaceName, err)
|
||
}
|
||
|
||
// 写入累计发送字节数指标
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "network_tx_bytes", float64(networkMetrics.TxBytes), interfaceTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write network tx_bytes metrics for interface %s: %v", interfaceName, err)
|
||
}
|
||
|
||
// 写入累计接收字节数指标
|
||
if err := globalStorage.WriteMetric(writeCtx, deviceID, "network_rx_bytes", float64(networkMetrics.RxBytes), interfaceTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write network rx_bytes metrics for interface %s: %v", interfaceName, err)
|
||
}
|
||
|
||
// 累加总流量速率
|
||
totalBytesSent += networkMetrics.BytesSent
|
||
totalBytesReceived += networkMetrics.BytesReceived
|
||
|
||
// 累加累计总流量
|
||
totalTxBytes += networkMetrics.TxBytes
|
||
totalRxBytes += networkMetrics.RxBytes
|
||
}
|
||
|
||
// 写入进程信息
|
||
for _, proc := range req.Processes {
|
||
if err := globalStorage.WriteProcessMetric(writeCtx, deviceID, proc.Name, proc.Username, proc.PID, proc.CPU, proc.Memory, proc.Path, proc.Cmdline, proc.Ports, baseTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write process metrics for PID %d: %v", proc.PID, err)
|
||
}
|
||
}
|
||
|
||
// 写入磁盘详细信息
|
||
for _, diskDetail := range req.DiskDetails {
|
||
if err := globalStorage.WriteDiskDetailMetric(writeCtx, deviceID, diskDetail.DeviceID, diskDetail.Status, diskDetail.Type, diskDetail.SizeGB, diskDetail.Model, diskDetail.InterfaceType, diskDetail.Description, baseTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write disk details for device %s: %v", diskDetail.DeviceID, err)
|
||
}
|
||
}
|
||
|
||
// 写入日志数据
|
||
for _, logEntry := range req.Logs {
|
||
if err := globalStorage.WriteLogMetric(writeCtx, deviceID, logEntry.Sequence, logEntry.Source, logEntry.Time, logEntry.Message, baseTags); err != nil {
|
||
// 只记录警告,不影响后续指标处理
|
||
log.Printf("Warning: Failed to write log for device %s: %v", deviceID, err)
|
||
}
|
||
}
|
||
|
||
// 广播指标更新消息,只广播最后一个指标
|
||
if i == len(metricsList)-1 {
|
||
// 准备广播的磁盘使用率数据(兼容旧格式)
|
||
compatDisk := make(map[string]float64)
|
||
for mountpoint, diskMetrics := range req.Disk {
|
||
compatDisk[mountpoint] = diskMetrics.UsedPercent
|
||
}
|
||
|
||
metrics := map[string]interface{}{
|
||
"cpu": req.CPU,
|
||
"cpu_hz": req.CPUHz,
|
||
"memory": req.Memory,
|
||
"disk": compatDisk, // 使用兼容格式的磁盘数据
|
||
"network": map[string]uint64{
|
||
"bytes_sent": totalBytesSent,
|
||
"bytes_received": totalBytesReceived,
|
||
"tx_bytes": totalTxBytes,
|
||
"rx_bytes": totalRxBytes,
|
||
},
|
||
"network_interfaces": req.Network,
|
||
}
|
||
broadcastMetricsUpdate(deviceID, metrics)
|
||
}
|
||
}
|
||
|
||
// 返回成功响应
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"message": "Metrics received successfully",
|
||
"count": len(metricsList),
|
||
})
|
||
}
|
||
|
||
// 全局存储实例(简化处理,实际应该使用依赖注入)
|
||
var globalStorage *storage.Storage
|
||
var deviceStorage device.Storage
|
||
|
||
// SetStorage 设置全局存储实例
|
||
func SetStorage(s *storage.Storage) {
|
||
globalStorage = s
|
||
}
|
||
|
||
// SetDeviceStorage 设置设备存储实例
|
||
func SetDeviceStorage(s device.Storage) {
|
||
deviceStorage = s
|
||
}
|
||
|
||
// 生成安全的设备令牌
|
||
func generateDeviceToken() string {
|
||
// 生成32字节的随机数据
|
||
bytes := make([]byte, 16)
|
||
if _, err := rand.Read(bytes); err != nil {
|
||
// 如果生成随机数失败,使用时间戳和PID作为备选方案
|
||
return hex.EncodeToString([]byte(time.Now().String() + "-" + string(os.Getpid())))
|
||
}
|
||
// 转换为32位的十六进制字符串
|
||
return hex.EncodeToString(bytes)
|
||
}
|
||
|
||
// WebSocket相关配置
|
||
var (
|
||
// 升级器,用于将HTTP连接升级为WebSocket连接
|
||
upgrader = websocket.Upgrader{
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
return true // 允许所有来源,生产环境应该限制
|
||
},
|
||
}
|
||
|
||
// 客户端连接管理
|
||
clients = make(map[*websocket.Conn]bool)
|
||
clientsLock sync.Mutex
|
||
|
||
// 消息广播通道
|
||
broadcast = make(chan map[string]interface{})
|
||
)
|
||
|
||
// 启动WebSocket消息处理
|
||
func init() {
|
||
go handleMessages()
|
||
}
|
||
|
||
// 处理WebSocket消息
|
||
func handleMessages() {
|
||
for {
|
||
// 从广播通道接收消息
|
||
message := <-broadcast
|
||
|
||
// 序列化消息
|
||
jsonMessage, err := json.Marshal(message)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
// 广播消息给所有客户端
|
||
clientsLock.Lock()
|
||
for client := range clients {
|
||
if err := client.WriteMessage(websocket.TextMessage, jsonMessage); err != nil {
|
||
// 连接出错,关闭连接并从客户端列表中移除
|
||
client.Close()
|
||
delete(clients, client)
|
||
}
|
||
}
|
||
clientsLock.Unlock()
|
||
}
|
||
}
|
||
|
||
// WebSocket端点,用于客户端连接
|
||
func WebSocketHandler(c *gin.Context) {
|
||
// 升级HTTP连接为WebSocket连接
|
||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||
if err != nil {
|
||
return
|
||
}
|
||
defer conn.Close()
|
||
|
||
// 将客户端添加到客户端列表
|
||
clientsLock.Lock()
|
||
clients[conn] = true
|
||
clientsLock.Unlock()
|
||
|
||
// 处理客户端消息(这里我们只发送消息,不处理接收的消息)
|
||
for {
|
||
// 读取消息(如果客户端发送消息,我们不处理,直接忽略)
|
||
_, _, err := conn.ReadMessage()
|
||
if err != nil {
|
||
// 连接出错,从客户端列表中移除
|
||
clientsLock.Lock()
|
||
delete(clients, conn)
|
||
clientsLock.Unlock()
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 广播指标更新消息
|
||
func broadcastMetricsUpdate(deviceID string, metrics map[string]interface{}) {
|
||
message := map[string]interface{}{
|
||
"type": "metrics_update",
|
||
"device_id": deviceID,
|
||
"metrics": metrics,
|
||
}
|
||
broadcast <- message
|
||
}
|
||
|
||
// GetCPUMetrics 获取CPU指标
|
||
func GetCPUMetrics(c *gin.Context) {
|
||
// 获取查询参数
|
||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||
startTime := c.DefaultQuery("start_time", "-24h")
|
||
endTime := c.DefaultQuery("end_time", "now()")
|
||
aggregation := c.DefaultQuery("aggregation", "average")
|
||
interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒
|
||
|
||
// 查询数据
|
||
points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "cpu", startTime, endTime)
|
||
if err != nil {
|
||
// 只记录警告,返回空数据
|
||
log.Printf("Warning: Failed to query CPU metrics: %v", err)
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": []MetricData{},
|
||
})
|
||
return
|
||
}
|
||
|
||
// 处理数据,传递interval、startTime和endTime参数
|
||
processedData := ProcessMetricData(points, aggregation, interval, startTime, endTime)
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": processedData,
|
||
})
|
||
}
|
||
|
||
// GetMemoryMetrics 获取内存指标
|
||
func GetMemoryMetrics(c *gin.Context) {
|
||
// 获取查询参数
|
||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||
startTime := c.DefaultQuery("start_time", "-24h")
|
||
endTime := c.DefaultQuery("end_time", "now()")
|
||
aggregation := c.DefaultQuery("aggregation", "average")
|
||
interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒
|
||
|
||
// 查询数据
|
||
points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "memory", startTime, endTime)
|
||
if err != nil {
|
||
// 只记录警告,返回空数据
|
||
log.Printf("Warning: Failed to query memory metrics: %v", err)
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": []MetricData{},
|
||
})
|
||
return
|
||
}
|
||
|
||
// 处理数据,传递interval、startTime和endTime参数
|
||
processedData := ProcessMetricData(points, aggregation, interval, startTime, endTime)
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": processedData,
|
||
})
|
||
}
|
||
|
||
// GetDiskMetrics 获取磁盘指标
|
||
func GetDiskMetrics(c *gin.Context) {
|
||
// 获取查询参数
|
||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||
startTime := c.DefaultQuery("start_time", "-24h")
|
||
endTime := c.DefaultQuery("end_time", "now()")
|
||
aggregation := c.DefaultQuery("aggregation", "average")
|
||
interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒
|
||
|
||
// 查询数据
|
||
points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "disk", startTime, endTime)
|
||
if err != nil {
|
||
// 只记录警告,返回空数据
|
||
log.Printf("Warning: Failed to query disk metrics: %v", err)
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": map[string][]MetricData{},
|
||
})
|
||
return
|
||
}
|
||
|
||
// 按挂载点分组
|
||
mountpointData := make(map[string][]storage.MetricPoint)
|
||
for _, point := range points {
|
||
// 获取挂载点标签
|
||
mountpoint := "unknown"
|
||
if point.Tags != nil && point.Tags["mountpoint"] != "" {
|
||
mountpoint = point.Tags["mountpoint"]
|
||
}
|
||
mountpointData[mountpoint] = append(mountpointData[mountpoint], point)
|
||
}
|
||
|
||
// 处理数据,为每个挂载点创建独立的数据集
|
||
result := make(map[string][]MetricData)
|
||
for mountpoint, mountpointPoints := range mountpointData {
|
||
processedData := ProcessMetricData(mountpointPoints, aggregation, interval, startTime, endTime)
|
||
result[mountpoint] = processedData
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": result,
|
||
})
|
||
}
|
||
|
||
// GetNetworkMetrics 获取网络指标
|
||
func GetNetworkMetrics(c *gin.Context) {
|
||
// 获取查询参数
|
||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||
startTime := c.DefaultQuery("start_time", "-24h")
|
||
endTime := c.DefaultQuery("end_time", "now()")
|
||
aggregation := c.DefaultQuery("aggregation", "average")
|
||
interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒
|
||
|
||
// 查询发送和接收的网络速率指标
|
||
sentPoints, err1 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_sent", startTime, endTime)
|
||
receivedPoints, err2 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_received", startTime, endTime)
|
||
|
||
// 查询发送和接收的累积总流量指标
|
||
txBytesPoints, err3 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_tx_bytes", startTime, endTime)
|
||
rxBytesPoints, err4 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_rx_bytes", startTime, endTime)
|
||
|
||
// 处理错误
|
||
if err1 != nil {
|
||
log.Printf("Warning: Failed to query network sent metrics: %v", err1)
|
||
sentPoints = []storage.MetricPoint{}
|
||
}
|
||
if err2 != nil {
|
||
log.Printf("Warning: Failed to query network received metrics: %v", err2)
|
||
receivedPoints = []storage.MetricPoint{}
|
||
}
|
||
if err3 != nil {
|
||
log.Printf("Warning: Failed to query network_total_tx_bytes metrics: %v", err3)
|
||
txBytesPoints = []storage.MetricPoint{}
|
||
}
|
||
if err4 != nil {
|
||
log.Printf("Warning: Failed to query network_total_rx_bytes metrics: %v", err4)
|
||
rxBytesPoints = []storage.MetricPoint{}
|
||
}
|
||
|
||
// 按网卡名称分组发送和接收的速率指标
|
||
sentByInterface := make(map[string][]storage.MetricPoint)
|
||
receivedByInterface := make(map[string][]storage.MetricPoint)
|
||
|
||
// 按网卡名称分组发送和接收的累积总流量指标
|
||
txBytesByInterface := make(map[string][]storage.MetricPoint)
|
||
rxBytesByInterface := make(map[string][]storage.MetricPoint)
|
||
|
||
// 分组发送的网络速率指标
|
||
for _, point := range sentPoints {
|
||
// 获取网卡名称,默认使用"all"表示所有网卡
|
||
interfaceName := point.Tags["interface"]
|
||
if interfaceName == "" {
|
||
interfaceName = "all"
|
||
}
|
||
sentByInterface[interfaceName] = append(sentByInterface[interfaceName], point)
|
||
}
|
||
|
||
// 分组接收的网络速率指标
|
||
for _, point := range receivedPoints {
|
||
// 获取网卡名称,默认使用"all"表示所有网卡
|
||
interfaceName := point.Tags["interface"]
|
||
if interfaceName == "" {
|
||
interfaceName = "all"
|
||
}
|
||
receivedByInterface[interfaceName] = append(receivedByInterface[interfaceName], point)
|
||
}
|
||
|
||
// 分组发送的累积总流量指标
|
||
for _, point := range txBytesPoints {
|
||
// 获取网卡名称,默认使用"all"表示所有网卡
|
||
interfaceName := point.Tags["interface"]
|
||
if interfaceName == "" {
|
||
interfaceName = "all"
|
||
}
|
||
txBytesByInterface[interfaceName] = append(txBytesByInterface[interfaceName], point)
|
||
}
|
||
|
||
// 分组接收的累积总流量指标
|
||
for _, point := range rxBytesPoints {
|
||
// 获取网卡名称,默认使用"all"表示所有网卡
|
||
interfaceName := point.Tags["interface"]
|
||
if interfaceName == "" {
|
||
interfaceName = "all"
|
||
}
|
||
rxBytesByInterface[interfaceName] = append(rxBytesByInterface[interfaceName], point)
|
||
}
|
||
|
||
// 处理数据,为每个网卡创建独立的数据集
|
||
result := make(map[string]map[string][]MetricData)
|
||
|
||
// 合并所有网卡名称
|
||
allInterfaces := make(map[string]bool)
|
||
for iface := range sentByInterface {
|
||
allInterfaces[iface] = true
|
||
}
|
||
for iface := range receivedByInterface {
|
||
allInterfaces[iface] = true
|
||
}
|
||
for iface := range txBytesByInterface {
|
||
allInterfaces[iface] = true
|
||
}
|
||
for iface := range rxBytesByInterface {
|
||
allInterfaces[iface] = true
|
||
}
|
||
|
||
// 为每个网卡处理数据
|
||
for iface := range allInterfaces {
|
||
// 获取该网卡的速率指标
|
||
ifaceSentPoints := sentByInterface[iface]
|
||
ifaceReceivedPoints := receivedByInterface[iface]
|
||
|
||
// 获取该网卡的累积总流量指标
|
||
ifaceTxBytesPoints := txBytesByInterface[iface]
|
||
ifaceRxBytesPoints := rxBytesByInterface[iface]
|
||
|
||
// 处理速率数据
|
||
processedSentData := ProcessMetricData(ifaceSentPoints, aggregation, interval, startTime, endTime)
|
||
processedReceivedData := ProcessMetricData(ifaceReceivedPoints, aggregation, interval, startTime, endTime)
|
||
|
||
// 处理累积总流量数据
|
||
processedTxBytesData := ProcessMetricData(ifaceTxBytesPoints, aggregation, interval, startTime, endTime)
|
||
processedRxBytesData := ProcessMetricData(ifaceRxBytesPoints, aggregation, interval, startTime, endTime)
|
||
|
||
// 保存结果
|
||
result[iface] = map[string][]MetricData{
|
||
"sent": processedSentData, // 发送速率数据
|
||
"received": processedReceivedData, // 接收速率数据
|
||
"tx_bytes": processedTxBytesData, // 发送累积总流量数据
|
||
"rx_bytes": processedRxBytesData, // 接收累积总流量数据
|
||
}
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": result,
|
||
})
|
||
}
|
||
|
||
// GetDevices 获取设备列表
|
||
func GetDevices(c *gin.Context) {
|
||
// 从设备存储获取所有设备
|
||
allDevices := deviceStorage.GetDevices()
|
||
|
||
// 转换为前端需要的格式
|
||
deviceList := make([]map[string]string, 0, len(allDevices))
|
||
for _, device := range allDevices {
|
||
// 只返回活跃设备
|
||
if device.Status == "active" {
|
||
deviceList = append(deviceList, map[string]string{
|
||
"id": device.ID,
|
||
"name": device.Name,
|
||
})
|
||
}
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"devices": deviceList,
|
||
})
|
||
}
|
||
|
||
// GetDeviceStatus 获取设备状态
|
||
func GetDeviceStatus(c *gin.Context) {
|
||
// 获取设备ID
|
||
deviceID := c.Param("id")
|
||
if deviceID == "" {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Device ID is required",
|
||
})
|
||
return
|
||
}
|
||
|
||
// 从设备存储获取设备信息
|
||
device, ok := deviceStorage.GetDevice(deviceID)
|
||
if !ok {
|
||
c.JSON(http.StatusNotFound, gin.H{
|
||
"error": "Device not found",
|
||
})
|
||
return
|
||
}
|
||
|
||
// 查询设备监控数据
|
||
_, status, err := globalStorage.QueryDeviceStatus(context.Background(), deviceID)
|
||
if err != nil {
|
||
// 如果查询监控数据失败,返回设备基本信息和空状态
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"device_id": deviceID,
|
||
"name": device.Name,
|
||
"status": make(map[string]float64),
|
||
})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"device_id": deviceID,
|
||
"name": device.Name,
|
||
"status": status,
|
||
})
|
||
}
|
||
|
||
// GetAllDeviceStatus 获取所有设备状态
|
||
func GetAllDeviceStatus(c *gin.Context) {
|
||
// 从设备存储获取所有设备
|
||
allDevices := deviceStorage.GetDevices()
|
||
|
||
// 查询每个设备的状态
|
||
result := make([]map[string]interface{}, 0, len(allDevices))
|
||
for _, device := range allDevices {
|
||
// 查询设备监控数据
|
||
_, status, _ := globalStorage.QueryDeviceStatus(context.Background(), device.ID)
|
||
|
||
// 总是返回设备信息,无论是否有监控数据
|
||
result = append(result, map[string]interface{}{
|
||
"device_id": device.ID,
|
||
"name": device.Name,
|
||
"status": status,
|
||
})
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"devices": result,
|
||
})
|
||
}
|
||
|
||
// AddDevice 添加设备
|
||
func AddDevice(c *gin.Context) {
|
||
var req struct {
|
||
ID string `json:"id" binding:"required"`
|
||
Name string `json:"name" binding:"required"`
|
||
IP string `json:"ip"`
|
||
}
|
||
|
||
if err := c.ShouldBindJSON(&req); err != nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Invalid request: " + err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
// 生成设备令牌
|
||
token := generateDeviceToken()
|
||
|
||
newDevice := device.Device{
|
||
ID: req.ID,
|
||
Name: req.Name,
|
||
IP: req.IP,
|
||
Token: token,
|
||
Status: "inactive",
|
||
CreatedAt: time.Now().Unix(),
|
||
UpdatedAt: time.Now().Unix(),
|
||
}
|
||
|
||
if err := deviceStorage.AddDevice(newDevice); err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{
|
||
"error": "Failed to add device: " + err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"message": "Device added successfully",
|
||
"device": newDevice,
|
||
})
|
||
}
|
||
|
||
// DeleteDevice 删除设备
|
||
func DeleteDevice(c *gin.Context) {
|
||
deviceID := c.Param("id")
|
||
if deviceID == "" {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Device ID is required",
|
||
})
|
||
return
|
||
}
|
||
|
||
if err := deviceStorage.DeleteDevice(deviceID); err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{
|
||
"error": "Failed to delete device: " + err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"message": "Device deleted successfully",
|
||
})
|
||
}
|
||
|
||
// UpdateDevice 更新设备
|
||
func UpdateDevice(c *gin.Context) {
|
||
deviceID := c.Param("id")
|
||
if deviceID == "" {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Device ID is required",
|
||
})
|
||
return
|
||
}
|
||
|
||
var req struct {
|
||
Name string `json:"name"`
|
||
IP string `json:"ip"`
|
||
Status string `json:"status"`
|
||
}
|
||
|
||
if err := c.ShouldBindJSON(&req); err != nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Invalid request: " + err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
// 获取现有设备
|
||
existingDevice, ok := deviceStorage.GetDevice(deviceID)
|
||
if !ok {
|
||
c.JSON(http.StatusNotFound, gin.H{
|
||
"error": "Device not found",
|
||
})
|
||
return
|
||
}
|
||
|
||
// 更新设备信息,保留原有token
|
||
if req.Name != "" {
|
||
existingDevice.Name = req.Name
|
||
}
|
||
if req.IP != "" {
|
||
existingDevice.IP = req.IP
|
||
}
|
||
if req.Status != "" {
|
||
existingDevice.Status = req.Status
|
||
}
|
||
existingDevice.UpdatedAt = time.Now().Unix()
|
||
|
||
if err := deviceStorage.UpdateDevice(existingDevice); err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{
|
||
"error": "Failed to update device: " + err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"message": "Device updated successfully",
|
||
"device": existingDevice,
|
||
})
|
||
}
|
||
|
||
// GetDevice 获取单个设备
|
||
func GetDevice(c *gin.Context) {
|
||
deviceID := c.Param("id")
|
||
if deviceID == "" {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "Device ID is required",
|
||
})
|
||
return
|
||
}
|
||
|
||
device, ok := deviceStorage.GetDevice(deviceID)
|
||
if !ok {
|
||
c.JSON(http.StatusNotFound, gin.H{
|
||
"error": "Device not found",
|
||
})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"device": device,
|
||
})
|
||
}
|
||
|
||
// GetAllDevices 获取所有设备
|
||
func GetAllDevices(c *gin.Context) {
|
||
devices := deviceStorage.GetDevices()
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"devices": devices,
|
||
})
|
||
}
|
||
|
||
// GetProcessMetrics 获取进程指标
|
||
func GetProcessMetrics(c *gin.Context) {
|
||
// 获取查询参数
|
||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||
startTime := c.DefaultQuery("start_time", "-24h")
|
||
endTime := c.DefaultQuery("end_time", "now()")
|
||
page, _ := strconv.Atoi(c.DefaultQuery("page", "1"))
|
||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "10"))
|
||
sortBy := c.DefaultQuery("sort_by", "cpu")
|
||
sortOrder := c.DefaultQuery("sort_order", "desc")
|
||
|
||
// 查询数据
|
||
processes, err := globalStorage.QueryProcessMetrics(context.Background(), deviceID, startTime, endTime)
|
||
if err != nil {
|
||
// 只记录警告,返回空数据
|
||
log.Printf("Warning: Failed to query process metrics: %v", err)
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": []ProcessMetrics{},
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": 0,
|
||
"sort_by": sortBy,
|
||
"sort_order": sortOrder,
|
||
})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": processes,
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": len(processes),
|
||
"sort_by": sortBy,
|
||
"sort_order": sortOrder,
|
||
})
|
||
}
|
||
|
||
// GetDiskDetails 获取磁盘详细信息
|
||
func GetDiskDetails(c *gin.Context) {
|
||
// 获取查询参数
|
||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||
startTime := c.DefaultQuery("start_time", "-24h")
|
||
endTime := c.DefaultQuery("end_time", "now()")
|
||
|
||
// 查询数据
|
||
diskDetails, err := globalStorage.QueryDiskDetails(context.Background(), deviceID, startTime, endTime)
|
||
if err != nil {
|
||
// 只记录警告,返回空数据
|
||
log.Printf("Warning: Failed to query disk details: %v", err)
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": []DiskDetailMetrics{},
|
||
})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": diskDetails,
|
||
})
|
||
}
|
||
|
||
// GetLogs 获取系统日志
|
||
func GetLogs(c *gin.Context) {
|
||
// 获取查询参数
|
||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||
startTime := c.DefaultQuery("start_time", "-24h")
|
||
endTime := c.DefaultQuery("end_time", "now()")
|
||
|
||
// 查询数据
|
||
logData, err := globalStorage.QueryLogMetrics(context.Background(), deviceID, startTime, endTime)
|
||
if err != nil {
|
||
// 只记录警告,返回空数据
|
||
log.Printf("Warning: Failed to query logs: %v", err)
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": []LogMetrics{},
|
||
})
|
||
return
|
||
}
|
||
|
||
// 转换为前端需要的格式
|
||
logs := make([]LogMetrics, 0, len(logData))
|
||
for _, log := range logData {
|
||
// 将time.Time转换为字符串
|
||
timeValue, ok := log["time"].(time.Time)
|
||
var timeStr string
|
||
if ok {
|
||
timeStr = timeValue.Format(time.RFC3339)
|
||
} else {
|
||
// 如果不是time.Time类型,尝试转换
|
||
if timeStrVal, ok := log["time"].(string); ok {
|
||
timeStr = timeStrVal
|
||
} else {
|
||
timeStr = ""
|
||
}
|
||
}
|
||
|
||
// 获取其他字段
|
||
source := ""
|
||
if sourceVal, ok := log["source"].(string); ok {
|
||
source = sourceVal
|
||
}
|
||
|
||
message := ""
|
||
if messageVal, ok := log["message"].(string); ok {
|
||
message = messageVal
|
||
}
|
||
|
||
// 添加到结果列表
|
||
logs = append(logs, LogMetrics{
|
||
Source: source,
|
||
Time: timeStr,
|
||
Message: message,
|
||
})
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"data": logs,
|
||
})
|
||
}
|