package handler import ( "context" "crypto/rand" "encoding/hex" "encoding/json" "log" "net/http" "os" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/monitor/backend/internal/device" "github.com/monitor/backend/internal/storage" ) // Handler API处理器 type Handler struct { storage *storage.Storage } // NewHandler 创建新的API处理器 func NewHandler(storage *storage.Storage) *Handler { return &Handler{ storage: storage, } } // RegisterRoutes 注册所有路由 func RegisterRoutes(r *gin.Engine) { // API路由组 api := r.Group("/api") { // 监控数据路由 metrics := api.Group("/metrics") { metrics.GET("/cpu", GetCPUMetrics) metrics.GET("/memory", GetMemoryMetrics) metrics.GET("/disk", GetDiskMetrics) metrics.GET("/network", GetNetworkMetrics) // 添加POST端点,接收Agent发送的指标数据 metrics.POST("/", HandleMetricsPost) } // 设备管理路由 devices := api.Group("/devices") { devices.GET("/", GetDevices) // 获取活跃设备列表 devices.GET("/all", GetAllDevices) // 获取所有设备 devices.GET("/:id", GetDevice) // 获取单个设备详情 devices.POST("/", AddDevice) // 添加设备 devices.PUT("/:id", UpdateDevice) // 更新设备信息 devices.DELETE("/:id", DeleteDevice) // 删除设备 // 添加获取设备状态的端点 devices.GET("/status", GetAllDeviceStatus) // 获取所有活跃设备状态 devices.GET("/:id/status", GetDeviceStatus) // 获取单个设备状态 } // WebSocket端点 api.GET("/ws", WebSocketHandler) // 添加设备状态概览端点,作为/api/devices/status的别名 api.GET("/status", GetAllDeviceStatus) } } // MetricsRequest 指标请求结构 type MetricsRequest struct { CPU float64 `json:"cpu"` Memory float64 `json:"memory"` Disk map[string]float64 `json:"disk"` Network struct { BytesSent uint64 `json:"bytes_sent"` BytesReceived uint64 `json:"bytes_received"` } `json:"network"` } // HandleMetricsPost 处理Agent发送的指标数据 func HandleMetricsPost(c *gin.Context) { // 获取设备令牌 token := c.GetHeader("X-Device-Token") if token == "" { c.JSON(http.StatusUnauthorized, gin.H{ "error": "Device token is required", }) return } // 通过令牌获取设备 device, ok := deviceStorage.GetDeviceByToken(token) if !ok { c.JSON(http.StatusUnauthorized, gin.H{ "error": "Invalid device token", }) return } // 获取设备ID,用于后续处理 deviceID := device.ID // 获取Agent名称 agentName := c.GetHeader("X-Agent-Name") if agentName == "" { // 如果没有提供名称,使用设备名称 agentName = device.Name } // 更新设备状态为active if err := deviceStorage.UpdateDeviceStatus(deviceID, "active"); err != nil { // 只记录警告,不影响指标处理 log.Printf("Warning: Failed to update device status: %v", err) } // 创建基础标签,包含Agent名称 baseTags := map[string]string{ "agent_name": agentName, } // 处理请求体,支持单指标对象和指标数组 var metricsList []MetricsRequest // 首先尝试解析为数组 if err := c.ShouldBindJSON(&metricsList); err != nil { // 如果解析数组失败,尝试解析为单个对象 var singleMetric MetricsRequest if err := c.ShouldBindJSON(&singleMetric); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": "Invalid request body", }) return } // 将单个对象添加到列表中 metricsList = append(metricsList, singleMetric) } // 处理所有指标 for i, req := range metricsList { // 写入CPU指标 if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "cpu", req.CPU, baseTags); err != nil { // 只记录警告,不影响后续指标处理 log.Printf("Warning: Failed to write CPU metrics: %v", err) } // 写入内存指标 if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "memory", req.Memory, baseTags); err != nil { // 只记录警告,不影响后续指标处理 log.Printf("Warning: Failed to write memory metrics: %v", err) } // 写入磁盘指标,支持多个挂载点 for mountpoint, usage := range req.Disk { // 为每个挂载点创建标签,包含基础标签和挂载点 tags := make(map[string]string) // 复制基础标签 for k, v := range baseTags { tags[k] = v } // 添加挂载点标签 tags["mountpoint"] = mountpoint // 写入磁盘指标 if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "disk", usage, tags); err != nil { // 只记录警告,不影响后续指标处理 log.Printf("Warning: Failed to write disk metrics for mountpoint %s: %v", mountpoint, err) } } // 写入网络发送指标 if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "network_sent", float64(req.Network.BytesSent), baseTags); err != nil { // 只记录警告,不影响后续指标处理 log.Printf("Warning: Failed to write network sent metrics: %v", err) } // 写入网络接收指标 if err := globalStorage.WriteMetric(c.Request.Context(), deviceID, "network_received", float64(req.Network.BytesReceived), baseTags); err != nil { // 只记录警告,不影响后续指标处理 log.Printf("Warning: Failed to write network received metrics: %v", err) } // 广播指标更新消息,只广播最后一个指标 if i == len(metricsList)-1 { metrics := map[string]interface{}{ "cpu": req.CPU, "memory": req.Memory, "disk": req.Disk, "network": map[string]uint64{ "bytes_sent": req.Network.BytesSent, "bytes_received": req.Network.BytesReceived, }, } broadcastMetricsUpdate(deviceID, metrics) } } // 返回成功响应 c.JSON(http.StatusOK, gin.H{ "message": "Metrics received successfully", "count": len(metricsList), }) } // 全局存储实例(简化处理,实际应该使用依赖注入) var globalStorage *storage.Storage var deviceStorage device.Storage // SetStorage 设置全局存储实例 func SetStorage(s *storage.Storage) { globalStorage = s } // SetDeviceStorage 设置设备存储实例 func SetDeviceStorage(s device.Storage) { deviceStorage = s } // 生成安全的设备令牌 func generateDeviceToken() string { // 生成32字节的随机数据 bytes := make([]byte, 16) if _, err := rand.Read(bytes); err != nil { // 如果生成随机数失败,使用时间戳和PID作为备选方案 return hex.EncodeToString([]byte(time.Now().String() + "-" + string(os.Getpid()))) } // 转换为32位的十六进制字符串 return hex.EncodeToString(bytes) } // WebSocket相关配置 var ( // 升级器,用于将HTTP连接升级为WebSocket连接 upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // 允许所有来源,生产环境应该限制 }, } // 客户端连接管理 clients = make(map[*websocket.Conn]bool) clientsLock sync.Mutex // 消息广播通道 broadcast = make(chan map[string]interface{}) ) // 启动WebSocket消息处理 func init() { go handleMessages() } // 处理WebSocket消息 func handleMessages() { for { // 从广播通道接收消息 message := <-broadcast // 序列化消息 jsonMessage, err := json.Marshal(message) if err != nil { continue } // 广播消息给所有客户端 clientsLock.Lock() for client := range clients { if err := client.WriteMessage(websocket.TextMessage, jsonMessage); err != nil { // 连接出错,关闭连接并从客户端列表中移除 client.Close() delete(clients, client) } } clientsLock.Unlock() } } // WebSocket端点,用于客户端连接 func WebSocketHandler(c *gin.Context) { // 升级HTTP连接为WebSocket连接 conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } defer conn.Close() // 将客户端添加到客户端列表 clientsLock.Lock() clients[conn] = true clientsLock.Unlock() // 处理客户端消息(这里我们只发送消息,不处理接收的消息) for { // 读取消息(如果客户端发送消息,我们不处理,直接忽略) _, _, err := conn.ReadMessage() if err != nil { // 连接出错,从客户端列表中移除 clientsLock.Lock() delete(clients, conn) clientsLock.Unlock() break } } } // 广播指标更新消息 func broadcastMetricsUpdate(deviceID string, metrics map[string]interface{}) { message := map[string]interface{}{ "type": "metrics_update", "device_id": deviceID, "metrics": metrics, } broadcast <- message } // GetCPUMetrics 获取CPU指标 func GetCPUMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 startTime := c.DefaultQuery("start_time", "-24h") endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 // 查询数据 points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "cpu", startTime, endTime) if err != nil { // 只记录警告,返回空数据 log.Printf("Warning: Failed to query CPU metrics: %v", err) c.JSON(http.StatusOK, gin.H{ "data": []MetricData{}, }) return } // 处理数据,传递interval、startTime和endTime参数 processedData := ProcessMetrics(points, aggregation, interval, startTime, endTime) c.JSON(http.StatusOK, gin.H{ "data": processedData, }) } // GetMemoryMetrics 获取内存指标 func GetMemoryMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 startTime := c.DefaultQuery("start_time", "-24h") endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 // 查询数据 points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "memory", startTime, endTime) if err != nil { // 只记录警告,返回空数据 log.Printf("Warning: Failed to query memory metrics: %v", err) c.JSON(http.StatusOK, gin.H{ "data": []MetricData{}, }) return } // 处理数据,传递interval、startTime和endTime参数 processedData := ProcessMetrics(points, aggregation, interval, startTime, endTime) c.JSON(http.StatusOK, gin.H{ "data": processedData, }) } // GetDiskMetrics 获取磁盘指标 func GetDiskMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 startTime := c.DefaultQuery("start_time", "-24h") endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 // 查询数据 points, err := globalStorage.QueryMetrics(context.Background(), deviceID, "disk", startTime, endTime) if err != nil { // 只记录警告,返回空数据 log.Printf("Warning: Failed to query disk metrics: %v", err) c.JSON(http.StatusOK, gin.H{ "data": map[string][]MetricData{}, }) return } // 按挂载点分组 mountpointData := make(map[string][]storage.MetricPoint) for _, point := range points { // 获取挂载点标签 mountpoint := "unknown" if point.Tags != nil && point.Tags["mountpoint"] != "" { mountpoint = point.Tags["mountpoint"] } mountpointData[mountpoint] = append(mountpointData[mountpoint], point) } // 处理数据,为每个挂载点创建独立的数据集 result := make(map[string][]MetricData) for mountpoint, mountpointPoints := range mountpointData { processedData := ProcessMetrics(mountpointPoints, aggregation, interval, startTime, endTime) result[mountpoint] = processedData } c.JSON(http.StatusOK, gin.H{ "data": result, }) } // GetNetworkMetrics 获取网络指标 func GetNetworkMetrics(c *gin.Context) { // 获取查询参数 deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备 startTime := c.DefaultQuery("start_time", "-24h") endTime := c.DefaultQuery("end_time", "now()") aggregation := c.DefaultQuery("aggregation", "average") interval := c.DefaultQuery("interval", "10s") // 添加interval参数,默认10秒 // 查询发送和接收的网络指标 sentPoints, err1 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_sent", startTime, endTime) receivedPoints, err2 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_received", startTime, endTime) // 处理错误 if err1 != nil { log.Printf("Warning: Failed to query network sent metrics: %v", err1) sentPoints = []storage.MetricPoint{} } if err2 != nil { log.Printf("Warning: Failed to query network received metrics: %v", err2) receivedPoints = []storage.MetricPoint{} } // 处理数据,传递interval、startTime和endTime参数 processedSentData := ProcessMetrics(sentPoints, aggregation, interval, startTime, endTime) processedReceivedData := ProcessMetrics(receivedPoints, aggregation, interval, startTime, endTime) c.JSON(http.StatusOK, gin.H{ "data": map[string][]MetricData{ "sent": processedSentData, "received": processedReceivedData, }, }) } // GetDevices 获取设备列表 func GetDevices(c *gin.Context) { // 从设备存储获取所有设备 allDevices := deviceStorage.GetDevices() // 转换为前端需要的格式 deviceList := make([]map[string]string, 0, len(allDevices)) for _, device := range allDevices { // 只返回活跃设备 if device.Status == "active" { deviceList = append(deviceList, map[string]string{ "id": device.ID, "name": device.Name, }) } } c.JSON(http.StatusOK, gin.H{ "devices": deviceList, }) } // GetDeviceStatus 获取设备状态 func GetDeviceStatus(c *gin.Context) { // 获取设备ID deviceID := c.Param("id") if deviceID == "" { c.JSON(http.StatusBadRequest, gin.H{ "error": "Device ID is required", }) return } // 从设备存储获取设备信息 device, ok := deviceStorage.GetDevice(deviceID) if !ok { c.JSON(http.StatusNotFound, gin.H{ "error": "Device not found", }) return } // 查询设备监控数据 _, status, err := globalStorage.QueryDeviceStatus(context.Background(), deviceID) if err != nil { // 如果查询监控数据失败,返回设备基本信息和空状态 c.JSON(http.StatusOK, gin.H{ "device_id": deviceID, "name": device.Name, "status": make(map[string]float64), }) return } c.JSON(http.StatusOK, gin.H{ "device_id": deviceID, "name": device.Name, "status": status, }) } // GetAllDeviceStatus 获取所有设备状态 func GetAllDeviceStatus(c *gin.Context) { // 从设备存储获取所有设备 allDevices := deviceStorage.GetDevices() // 查询每个设备的状态 result := make([]map[string]interface{}, 0, len(allDevices)) for _, device := range allDevices { // 查询设备监控数据 _, status, _ := globalStorage.QueryDeviceStatus(context.Background(), device.ID) // 总是返回设备信息,无论是否有监控数据 result = append(result, map[string]interface{}{ "device_id": device.ID, "name": device.Name, "status": status, }) } c.JSON(http.StatusOK, gin.H{ "devices": result, }) } // AddDevice 添加设备 func AddDevice(c *gin.Context) { var req struct { ID string `json:"id" binding:"required"` Name string `json:"name" binding:"required"` IP string `json:"ip"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": "Invalid request: " + err.Error(), }) return } // 生成设备令牌 token := generateDeviceToken() newDevice := device.Device{ ID: req.ID, Name: req.Name, IP: req.IP, Token: token, Status: "inactive", CreatedAt: time.Now().Unix(), UpdatedAt: time.Now().Unix(), } if err := deviceStorage.AddDevice(newDevice); err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to add device: " + err.Error(), }) return } c.JSON(http.StatusOK, gin.H{ "message": "Device added successfully", "device": newDevice, }) } // DeleteDevice 删除设备 func DeleteDevice(c *gin.Context) { deviceID := c.Param("id") if deviceID == "" { c.JSON(http.StatusBadRequest, gin.H{ "error": "Device ID is required", }) return } if err := deviceStorage.DeleteDevice(deviceID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to delete device: " + err.Error(), }) return } c.JSON(http.StatusOK, gin.H{ "message": "Device deleted successfully", }) } // UpdateDevice 更新设备 func UpdateDevice(c *gin.Context) { deviceID := c.Param("id") if deviceID == "" { c.JSON(http.StatusBadRequest, gin.H{ "error": "Device ID is required", }) return } var req struct { Name string `json:"name"` IP string `json:"ip"` Status string `json:"status"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": "Invalid request: " + err.Error(), }) return } // 获取现有设备 existingDevice, ok := deviceStorage.GetDevice(deviceID) if !ok { c.JSON(http.StatusNotFound, gin.H{ "error": "Device not found", }) return } // 更新设备信息,保留原有token if req.Name != "" { existingDevice.Name = req.Name } if req.IP != "" { existingDevice.IP = req.IP } if req.Status != "" { existingDevice.Status = req.Status } existingDevice.UpdatedAt = time.Now().Unix() if err := deviceStorage.UpdateDevice(existingDevice); err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to update device: " + err.Error(), }) return } c.JSON(http.StatusOK, gin.H{ "message": "Device updated successfully", "device": existingDevice, }) } // GetDevice 获取单个设备 func GetDevice(c *gin.Context) { deviceID := c.Param("id") if deviceID == "" { c.JSON(http.StatusBadRequest, gin.H{ "error": "Device ID is required", }) return } device, ok := deviceStorage.GetDevice(deviceID) if !ok { c.JSON(http.StatusNotFound, gin.H{ "error": "Device not found", }) return } c.JSON(http.StatusOK, gin.H{ "device": device, }) } // GetAllDevices 获取所有设备 func GetAllDevices(c *gin.Context) { devices := deviceStorage.GetDevices() c.JSON(http.StatusOK, gin.H{ "devices": devices, }) }