Files
monitor/agent/main.go
Alex Yang bd70dc1748 web修复
2025-12-03 10:27:45 +08:00

675 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/net"
)
// Config Agent配置
type Config struct {
ServerURL string `json:"server_url"`
ID string `json:"id"` // Agent唯一标识自动生成
Name string `json:"name"` // Agent显示名称
DeviceID string `json:"device_id"` // 向后兼容,保留
Token string `json:"token"` // 设备认证令牌
Interval string `json:"interval"` // 采集间隔
Debug bool `json:"debug"` // 调试模式
APIPort int `json:"api_port"` // API端口
}
// NetworkInterfaceMetrics 网卡监控指标
type NetworkInterfaceMetrics struct {
BytesSent uint64 `json:"bytes_sent"`
BytesReceived uint64 `json:"bytes_received"`
}
// Metrics 监控指标
type Metrics struct {
CPU float64 `json:"cpu"`
Memory float64 `json:"memory"`
Disk map[string]float64 `json:"disk"`
Network map[string]NetworkInterfaceMetrics `json:"network"`
}
// 全局配置
var config Config
// 保存解析后的时间间隔
var parsedInterval time.Duration
// 保存上一次网络流量采集的数据
type NetworkStats struct {
BytesSent uint64
BytesReceived uint64
}
var (
lastNetworkStats = make(map[string]NetworkStats)
lastCollectTime time.Time
)
// 保存采集到的数据点
var metricsBuffer []*Metrics
var metricsBufferMutex sync.Mutex
// 初始化互斥锁
func init() {
metricsBuffer = make([]*Metrics, 0)
}
// 初始化配置
func initConfig() {
// 默认配置
config = Config{
ServerURL: "http://localhost:8080/api",
ID: "", // 自动生成
Name: "", // 自动生成或从配置读取
DeviceID: "default", // 向后兼容,保留
Token: "", // 设备认证令牌,从配置或环境变量读取
Interval: "10s",
Debug: false, // 默认非调试模式
APIPort: 8081, // 默认API端口8081
}
// 读取配置文件
readConfigFile()
// 从环境变量读取配置(优先级高于配置文件)
loadFromEnv()
// 生成或确保ID存在
ensureAgentID()
// 确保名称存在
ensureAgentName()
// 保存配置到文件(如果有修改)
saveConfigFile()
// 解析时间间隔
var err error
parsedInterval, err = time.ParseDuration(config.Interval)
if err != nil {
log.Printf("Failed to parse interval: %v, using default 10s", err)
parsedInterval = 10 * time.Second
}
// 打印配置信息
if config.Debug {
log.Printf("Agent ID: %s, Name: %s, DeviceID: %s, Debug: %v, API Port: %d", config.ID, config.Name, config.DeviceID, config.Debug, config.APIPort)
} else {
log.Printf("Agent ID: %s, Name: %s, DeviceID: %s", config.ID, config.Name, config.DeviceID)
}
}
// 从环境变量读取配置
func loadFromEnv() {
if serverURL := os.Getenv("AGENT_SERVER_URL"); serverURL != "" {
config.ServerURL = serverURL
}
if id := os.Getenv("AGENT_ID"); id != "" {
config.ID = id
}
if name := os.Getenv("AGENT_NAME"); name != "" {
config.Name = name
}
if deviceID := os.Getenv("AGENT_DEVICE_ID"); deviceID != "" {
config.DeviceID = deviceID
}
if token := os.Getenv("AGENT_TOKEN"); token != "" {
config.Token = token
}
if intervalStr := os.Getenv("AGENT_INTERVAL"); intervalStr != "" {
config.Interval = intervalStr
}
if debugStr := os.Getenv("AGENT_DEBUG"); debugStr != "" {
debug, err := strconv.ParseBool(debugStr)
if err == nil {
config.Debug = debug
}
}
if apiPortStr := os.Getenv("AGENT_API_PORT"); apiPortStr != "" {
apiPort, err := strconv.Atoi(apiPortStr)
if err == nil {
config.APIPort = apiPort
}
}
}
// 确保Agent ID存在不存在则生成
func ensureAgentID() {
if config.ID != "" {
return
}
// 使用时间戳和随机数生成唯一ID
config.ID = fmt.Sprintf("agent-%d-%d", time.Now().UnixNano(), os.Getpid())
log.Printf("Generated new Agent ID: %s", config.ID)
}
// 确保Agent名称存在不存在则生成
func ensureAgentName() {
if config.Name != "" {
return
}
// 尝试获取主机名作为默认名称
hostname, err := os.Hostname()
if err != nil {
// 如果获取主机名失败使用ID的前8位作为默认名称
hostname = fmt.Sprintf("Agent-%s", config.ID[:8])
}
config.Name = hostname
log.Printf("Generated new Agent Name: %s", config.Name)
}
// 保存配置到文件
func saveConfigFile() {
// 获取配置文件路径,默认./agent.json可通过环境变量指定
configFile := os.Getenv("AGENT_CONFIG_FILE")
if configFile == "" {
configFile = "./agent.json"
}
// 将配置转换为JSON
jsonData, err := json.MarshalIndent(config, "", " ")
if err != nil {
log.Printf("Failed to marshal config: %v", err)
return
}
// 保存配置到文件
if err := os.WriteFile(configFile, jsonData, 0644); err != nil {
log.Printf("Failed to save config to file: %v", err)
return
}
log.Printf("Config saved to %s", configFile)
}
// 读取配置文件
func readConfigFile() {
// 获取配置文件路径,默认./agent.json可通过环境变量指定
configFile := os.Getenv("AGENT_CONFIG_FILE")
if configFile == "" {
configFile = "./agent.json"
}
// 检查配置文件是否存在
if _, err := os.Stat(configFile); os.IsNotExist(err) {
log.Printf("Config file %s not found, using default config", configFile)
return
}
// 读取配置文件
content, err := os.ReadFile(configFile)
if err != nil {
log.Printf("Failed to read config file %s: %v, using default config", configFile, err)
return
}
// 解析配置文件
var fileConfig Config
if err := json.Unmarshal(content, &fileConfig); err != nil {
log.Printf("Failed to parse config file %s: %v, using default config", configFile, err)
return
}
// 合并配置:所有字段都从配置文件覆盖,除了空字符串
if fileConfig.ServerURL != "" {
config.ServerURL = fileConfig.ServerURL
}
if fileConfig.ID != "" {
config.ID = fileConfig.ID
}
if fileConfig.Name != "" {
config.Name = fileConfig.Name
}
if fileConfig.DeviceID != "" {
config.DeviceID = fileConfig.DeviceID
}
if fileConfig.Token != "" {
config.Token = fileConfig.Token
}
if fileConfig.Interval != "" {
config.Interval = fileConfig.Interval
}
// 对于bool类型总是从配置文件覆盖
config.Debug = fileConfig.Debug
// 对于int类型如果配置文件中的值大于0则使用配置文件中的值否则使用默认值
if fileConfig.APIPort > 0 {
config.APIPort = fileConfig.APIPort
} else {
config.APIPort = 8081 // 默认API端口
}
log.Printf("Config loaded from %s", configFile)
}
// 采集CPU使用率
func collectCPU() (float64, error) {
percentages, err := cpu.Percent(0, false)
if err != nil {
return 0, err
}
return percentages[0], nil
}
// 采集内存使用率
func collectMemory() (float64, error) {
vm, err := mem.VirtualMemory()
if err != nil {
return 0, err
}
return vm.UsedPercent, nil
}
// 采集磁盘使用率
func collectDisk() (map[string]float64, error) {
// 获取系统所有挂载点
partitions, err := disk.Partitions(false)
if err != nil {
return nil, err
}
// 初始化返回值
diskUsageMap := make(map[string]float64)
// 遍历所有挂载点,采集磁盘使用率
for _, partition := range partitions {
// 只处理本地文件系统,跳过网络文件系统
if partition.Fstype == "" {
continue
}
// 采集磁盘使用率
usage, err := disk.Usage(partition.Mountpoint)
if err != nil {
continue
}
// 保存磁盘使用率
diskUsageMap[partition.Mountpoint] = usage.UsedPercent
}
return diskUsageMap, nil
}
// 采集网络流量
func collectNetwork() (map[string]NetworkInterfaceMetrics, error) {
// 获取所有网卡的统计数据
ioCounters, err := net.IOCounters(true)
if err != nil {
return nil, err
}
if len(ioCounters) == 0 {
return make(map[string]NetworkInterfaceMetrics), nil
}
// 获取当前时间
currentTime := time.Now()
// 初始化返回值
networkMetrics := make(map[string]NetworkInterfaceMetrics)
// 遍历所有网卡
for _, counter := range ioCounters {
// 获取当前网卡的流量
currentBytesSent := counter.BytesSent
currentBytesReceived := counter.BytesRecv
// 计算速率
var bytesSentRate, bytesReceivedRate uint64
if !lastCollectTime.IsZero() {
// 计算时间差(秒)
timeDiff := currentTime.Sub(lastCollectTime).Seconds()
if timeDiff > 0 {
// 获取上一次该网卡的流量
lastStats, exists := lastNetworkStats[counter.Name]
if exists {
// 计算流量差
sentDiff := currentBytesSent - lastStats.BytesSent
receivedDiff := currentBytesReceived - lastStats.BytesReceived
// 计算速率bytes/s
bytesSentRate = uint64(float64(sentDiff) / timeDiff)
bytesReceivedRate = uint64(float64(receivedDiff) / timeDiff)
}
}
}
// 更新上一次采集的值
lastNetworkStats[counter.Name] = NetworkStats{
BytesSent: currentBytesSent,
BytesReceived: currentBytesReceived,
}
// 保存当前网卡的速率
networkMetrics[counter.Name] = NetworkInterfaceMetrics{
BytesSent: bytesSentRate,
BytesReceived: bytesReceivedRate,
}
}
// 更新上一次采集时间
lastCollectTime = currentTime
// 返回所有网卡的速率
return networkMetrics, nil
}
// 采集所有监控指标
func collectMetrics() (*Metrics, error) {
metrics := &Metrics{}
// 采集CPU使用率
cpuUsage, err := collectCPU()
if err != nil {
return nil, fmt.Errorf("failed to collect CPU metrics: %w", err)
}
metrics.CPU = cpuUsage
// 采集内存使用率
memoryUsage, err := collectMemory()
if err != nil {
return nil, fmt.Errorf("failed to collect memory metrics: %w", err)
}
metrics.Memory = memoryUsage
// 采集磁盘使用率
diskUsageMap, err := collectDisk()
if err != nil {
return nil, fmt.Errorf("failed to collect disk metrics: %w", err)
}
metrics.Disk = diskUsageMap
// 采集网络流量
networkMetrics, err := collectNetwork()
if err != nil {
return nil, fmt.Errorf("failed to collect network metrics: %w", err)
}
// 直接使用采集到的网卡流量
metrics.Network = networkMetrics
return metrics, nil
}
// 发送指标到服务器
func sendMetrics(metricsList []*Metrics) error {
// 创建HTTP客户端
client := &http.Client{
Timeout: 10 * time.Second,
}
// 将指标转换为JSON
jsonData, err := json.Marshal(metricsList)
if err != nil {
return err
}
// 创建请求
req, err := http.NewRequest("POST", fmt.Sprintf("%s/metrics/", config.ServerURL), bytes.NewBuffer(jsonData))
if err != nil {
return err
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
// 使用DeviceID作为设备唯一标识与设备管理中的ID匹配
deviceID := config.DeviceID
if deviceID == "" {
deviceID = config.ID
}
req.Header.Set("X-Device-ID", deviceID)
// 设置Agent名称
req.Header.Set("X-Agent-Name", config.Name)
// 设置设备认证令牌
req.Header.Set("X-Device-Token", config.Token)
// 发送请求
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 检查响应状态码
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned status code %d", resp.StatusCode)
}
return nil
}
// 启动HTTP服务器提供本地指标查询
// 启动HTTP服务器
func startHTTPServer() {
// 指标查询端点
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
if config.Debug {
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
}
metrics, err := collectMetrics()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 设置响应头
w.Header().Set("Content-Type", "application/json")
// 返回JSON响应
json.NewEncoder(w).Encode(metrics)
})
// 健康检查端点
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if config.Debug {
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "ok",
"agent_id": config.ID,
})
})
// 获取配置端点
http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
if config.Debug {
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(config)
})
// 获取状态端点
http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
if config.Debug {
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
}
// 采集当前状态
cpu, _ := collectCPU()
memory, _ := collectMemory()
disk, _ := collectDisk()
status := map[string]interface{}{
"status": "running",
"agent_id": config.ID,
"name": config.Name,
"device_id": config.DeviceID,
"debug": config.Debug,
"interval": config.Interval,
"cpu": cpu,
"memory": memory,
"disk": disk,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
})
// 立即采集和发送指标端点
http.HandleFunc("/collect", func(w http.ResponseWriter, r *http.Request) {
if config.Debug {
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
}
go collectAndSendMetrics()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "ok",
"message": "Metrics collection triggered",
})
})
// 启动服务器
addr := fmt.Sprintf(":%d", config.APIPort)
log.Printf("Starting HTTP server on %s", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatalf("Failed to start HTTP server: %v", err)
}
}
func main() {
// 初始化配置
initConfig()
// 启动HTTP服务器异步
go startHTTPServer()
log.Printf("Agent started, reporting to %s every %v, collecting data every 1s", config.ServerURL, config.Interval)
// 启动时立即采集一次数据
collectMetricsToBuffer()
// 创建两个ticker一个用于每秒采集数据一个用于定期发送数据
collectTicker := time.NewTicker(1 * time.Second)
sendTicker := time.NewTicker(parsedInterval)
defer collectTicker.Stop()
defer sendTicker.Stop()
for {
select {
case <-collectTicker.C:
// 每秒采集一次数据
collectMetricsToBuffer()
case <-sendTicker.C:
// 定期发送采集到的数据
collectAndSendMetrics()
}
}
}
// 每秒采集数据并添加到缓冲区
func collectMetricsToBuffer() {
// 采集指标
metrics, err := collectMetrics()
if err != nil {
log.Printf("Failed to collect metrics: %v", err)
return
}
// 将指标添加到缓冲区
metricsBufferMutex.Lock()
metricsBuffer = append(metricsBuffer, metrics)
metricsBufferMutex.Unlock()
if config.Debug {
// 计算平均磁盘使用率
totalDiskUsage := 0.0
diskCount := 0
for _, usage := range metrics.Disk {
totalDiskUsage += usage
diskCount++
}
averageDiskUsage := 0.0
if diskCount > 0 {
averageDiskUsage = totalDiskUsage / float64(diskCount)
}
log.Printf("Metrics collected: Agent=%s, CPU=%.2f%%, Memory=%.2f%%, Disk=%.2f%%",
config.Name, metrics.CPU, metrics.Memory, averageDiskUsage)
}
}
// 发送缓冲区中的所有指标
func collectAndSendMetrics() {
// 从缓冲区获取所有指标
metricsBufferMutex.Lock()
if len(metricsBuffer) == 0 {
metricsBufferMutex.Unlock()
return
}
// 创建一个副本并清空缓冲区
metricsToSend := make([]*Metrics, len(metricsBuffer))
copy(metricsToSend, metricsBuffer)
metricsBuffer = make([]*Metrics, 0)
metricsBufferMutex.Unlock()
// 发送指标
if err := sendMetrics(metricsToSend); err != nil {
log.Printf("Failed to send metrics: %v", err)
return
}
// 计算平均指标值
var totalCPU, totalMemory, totalDiskUsage float64
var diskCount int
pointCount := len(metricsToSend)
for _, metrics := range metricsToSend {
totalCPU += metrics.CPU
totalMemory += metrics.Memory
// 计算磁盘使用率
for _, usage := range metrics.Disk {
totalDiskUsage += usage
diskCount++
}
}
// 计算平均值
averageCPU := 0.0
averageMemory := 0.0
averageDiskUsage := 0.0
if pointCount > 0 {
averageCPU = totalCPU / float64(pointCount)
averageMemory = totalMemory / float64(pointCount)
}
if diskCount > 0 {
averageDiskUsage = totalDiskUsage / float64(diskCount)
}
// 只在production模式下显示基本指标信息
log.Printf("Metrics sent successfully: Agent=%s, CPU=%.2f%%, Memory=%.2f%%, Disk=%.2f%%",
config.Name, averageCPU, averageMemory, averageDiskUsage)
}