diff --git a/agent/agent b/agent/agent index 184ab45..30e71ae 100755 Binary files a/agent/agent and b/agent/agent differ diff --git a/agent/agent.json b/agent/agent.json index f4b92c8..ce3330f 100644 --- a/agent/agent.json +++ b/agent/agent.json @@ -4,5 +4,7 @@ "name": "cloud", "device_id": "yunc", "token": "0b1f00e76e28beaed3be71d13e25aceb", - "interval": "10s" + "interval": "10s", + "debug": true, + "api_port": 8082 } \ No newline at end of file diff --git a/agent/main.go b/agent/main.go index 679e9aa..d9771ac 100644 --- a/agent/main.go +++ b/agent/main.go @@ -7,6 +7,8 @@ import ( "log" "net/http" "os" + "strconv" + "sync" "time" "github.com/shirou/gopsutil/cpu" @@ -23,6 +25,8 @@ type Config struct { DeviceID string `json:"device_id"` // 向后兼容,保留 Token string `json:"token"` // 设备认证令牌 Interval string `json:"interval"` // 采集间隔 + Debug bool `json:"debug"` // 调试模式 + APIPort int `json:"api_port"` // API端口 } // Metrics 监控指标 @@ -49,6 +53,15 @@ var ( lastCollectTime time.Time ) +// 保存采集到的数据点 +var metricsBuffer []*Metrics +var metricsBufferMutex sync.Mutex + +// 初始化互斥锁 +func init() { + metricsBuffer = make([]*Metrics, 0) +} + // 初始化配置 func initConfig() { // 默认配置 @@ -59,6 +72,8 @@ func initConfig() { DeviceID: "default", // 向后兼容,保留 Token: "", // 设备认证令牌,从配置或环境变量读取 Interval: "10s", + Debug: false, // 默认非调试模式 + APIPort: 8081, // 默认API端口8081 } // 读取配置文件 @@ -85,7 +100,11 @@ func initConfig() { } // 打印配置信息 - log.Printf("Agent ID: %s, Name: %s, DeviceID: %s", config.ID, config.Name, config.DeviceID) + if config.Debug { + log.Printf("Agent ID: %s, Name: %s, DeviceID: %s, Debug: %v, API Port: %d", config.ID, config.Name, config.DeviceID, config.Debug, config.APIPort) + } else { + log.Printf("Agent ID: %s, Name: %s, DeviceID: %s", config.ID, config.Name, config.DeviceID) + } } // 从环境变量读取配置 @@ -113,6 +132,20 @@ func loadFromEnv() { if intervalStr := os.Getenv("AGENT_INTERVAL"); intervalStr != "" { config.Interval = intervalStr } + + if debugStr := os.Getenv("AGENT_DEBUG"); debugStr != "" { + debug, err := strconv.ParseBool(debugStr) + if err == nil { + config.Debug = debug + } + } + + if apiPortStr := os.Getenv("AGENT_API_PORT"); apiPortStr != "" { + apiPort, err := strconv.Atoi(apiPortStr) + if err == nil { + config.APIPort = apiPort + } + } } // 确保Agent ID存在,不存在则生成 @@ -194,7 +227,7 @@ func readConfigFile() { return } - // 合并配置:只覆盖非零值 + // 合并配置:所有字段都从配置文件覆盖,除了空字符串 if fileConfig.ServerURL != "" { config.ServerURL = fileConfig.ServerURL } @@ -219,6 +252,15 @@ func readConfigFile() { config.Interval = fileConfig.Interval } + // 对于bool类型,总是从配置文件覆盖 + config.Debug = fileConfig.Debug + // 对于int类型,如果配置文件中的值大于0,则使用配置文件中的值,否则使用默认值 + if fileConfig.APIPort > 0 { + config.APIPort = fileConfig.APIPort + } else { + config.APIPort = 8081 // 默认API端口 + } + log.Printf("Config loaded from %s", configFile) } @@ -352,14 +394,14 @@ func collectMetrics() (*Metrics, error) { } // 发送指标到服务器 -func sendMetrics(metrics *Metrics) error { +func sendMetrics(metricsList []*Metrics) error { // 创建HTTP客户端 client := &http.Client{ Timeout: 10 * time.Second, } // 将指标转换为JSON - jsonData, err := json.Marshal(metrics) + jsonData, err := json.Marshal(metricsList) if err != nil { return err } @@ -399,60 +441,126 @@ func sendMetrics(metrics *Metrics) error { } // 启动HTTP服务器,提供本地指标查询 -// func startHTTPServer() { -// // 指标查询端点 -// http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { -// metrics, err := collectMetrics() -// if err != nil { -// http.Error(w, err.Error(), http.StatusInternalServerError) -// return -// } +// 启动HTTP服务器 +func startHTTPServer() { + // 指标查询端点 + http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + if config.Debug { + log.Printf("API Request: %s %s", r.Method, r.URL.Path) + } + metrics, err := collectMetrics() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } -// // 设置响应头 -// w.Header().Set("Content-Type", "application/json") -// // 返回JSON响应 -// json.NewEncoder(w).Encode(metrics) -// }) + // 设置响应头 + w.Header().Set("Content-Type", "application/json") + // 返回JSON响应 + json.NewEncoder(w).Encode(metrics) + }) -// // 健康检查端点 -// http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { -// w.WriteHeader(http.StatusOK) -// w.Write([]byte("OK")) -// }) + // 健康检查端点 + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + if config.Debug { + log.Printf("API Request: %s %s", r.Method, r.URL.Path) + } + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "status": "ok", + "agent_id": config.ID, + }) + }) -// // 启动服务器 -// log.Println("Starting HTTP server on :8081") -// if err := http.ListenAndServe(":8081", nil); err != nil { -// log.Fatalf("Failed to start HTTP server: %v", err) -// } -// } + // 获取配置端点 + http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { + if config.Debug { + log.Printf("API Request: %s %s", r.Method, r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(config) + }) + + // 获取状态端点 + http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { + if config.Debug { + log.Printf("API Request: %s %s", r.Method, r.URL.Path) + } + // 采集当前状态 + cpu, _ := collectCPU() + memory, _ := collectMemory() + disk, _ := collectDisk() + + status := map[string]interface{}{ + "status": "running", + "agent_id": config.ID, + "name": config.Name, + "device_id": config.DeviceID, + "debug": config.Debug, + "interval": config.Interval, + "cpu": cpu, + "memory": memory, + "disk": disk, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) + }) + + // 立即采集和发送指标端点 + http.HandleFunc("/collect", func(w http.ResponseWriter, r *http.Request) { + if config.Debug { + log.Printf("API Request: %s %s", r.Method, r.URL.Path) + } + go collectAndSendMetrics() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "status": "ok", + "message": "Metrics collection triggered", + }) + }) + + // 启动服务器 + addr := fmt.Sprintf(":%d", config.APIPort) + log.Printf("Starting HTTP server on %s", addr) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatalf("Failed to start HTTP server: %v", err) + } +} func main() { // 初始化配置 initConfig() // 启动HTTP服务器(异步) - // go startHTTPServer() + go startHTTPServer() - log.Printf("Agent started, reporting to %s every %v", config.ServerURL, config.Interval) + log.Printf("Agent started, reporting to %s every %v, collecting data every 1s", config.ServerURL, config.Interval) - // 启动时立即采集和发送一次指标 - collectAndSendMetrics() + // 启动时立即采集一次数据 + collectMetricsToBuffer() - // 定期采集和发送指标 - ticker := time.NewTicker(parsedInterval) - defer ticker.Stop() + // 创建两个ticker:一个用于每秒采集数据,一个用于定期发送数据 + collectTicker := time.NewTicker(1 * time.Second) + sendTicker := time.NewTicker(parsedInterval) + defer collectTicker.Stop() + defer sendTicker.Stop() for { select { - case <-ticker.C: + case <-collectTicker.C: + // 每秒采集一次数据 + collectMetricsToBuffer() + case <-sendTicker.C: + // 定期发送采集到的数据 collectAndSendMetrics() } } } -// 采集并发送指标 -func collectAndSendMetrics() { +// 每秒采集数据并添加到缓冲区 +func collectMetricsToBuffer() { // 采集指标 metrics, err := collectMetrics() if err != nil { @@ -460,24 +568,78 @@ func collectAndSendMetrics() { return } + // 将指标添加到缓冲区 + metricsBufferMutex.Lock() + metricsBuffer = append(metricsBuffer, metrics) + metricsBufferMutex.Unlock() + + if config.Debug { + // 计算平均磁盘使用率 + totalDiskUsage := 0.0 + diskCount := 0 + for _, usage := range metrics.Disk { + totalDiskUsage += usage + diskCount++ + } + averageDiskUsage := 0.0 + if diskCount > 0 { + averageDiskUsage = totalDiskUsage / float64(diskCount) + } + log.Printf("Metrics collected: Agent=%s, CPU=%.2f%%, Memory=%.2f%%, Disk=%.2f%%", + config.Name, metrics.CPU, metrics.Memory, averageDiskUsage) + } +} + +// 发送缓冲区中的所有指标 +func collectAndSendMetrics() { + // 从缓冲区获取所有指标 + metricsBufferMutex.Lock() + if len(metricsBuffer) == 0 { + metricsBufferMutex.Unlock() + return + } + // 创建一个副本并清空缓冲区 + metricsToSend := make([]*Metrics, len(metricsBuffer)) + copy(metricsToSend, metricsBuffer) + metricsBuffer = make([]*Metrics, 0) + metricsBufferMutex.Unlock() + // 发送指标 - if err := sendMetrics(metrics); err != nil { + if err := sendMetrics(metricsToSend); err != nil { log.Printf("Failed to send metrics: %v", err) return } - // 计算平均磁盘使用率 - totalDiskUsage := 0.0 - diskCount := 0 - for _, usage := range metrics.Disk { - totalDiskUsage += usage - diskCount++ + // 计算平均指标值 + var totalCPU, totalMemory, totalDiskUsage float64 + var diskCount int + pointCount := len(metricsToSend) + + for _, metrics := range metricsToSend { + totalCPU += metrics.CPU + totalMemory += metrics.Memory + // 计算磁盘使用率 + for _, usage := range metrics.Disk { + totalDiskUsage += usage + diskCount++ + } } + + // 计算平均值 + averageCPU := 0.0 + averageMemory := 0.0 averageDiskUsage := 0.0 + + if pointCount > 0 { + averageCPU = totalCPU / float64(pointCount) + averageMemory = totalMemory / float64(pointCount) + } + if diskCount > 0 { averageDiskUsage = totalDiskUsage / float64(diskCount) } + // 只在production模式下显示基本指标信息 log.Printf("Metrics sent successfully: Agent=%s, CPU=%.2f%%, Memory=%.2f%%, Disk=%.2f%%", - config.Name, metrics.CPU, metrics.Memory, averageDiskUsage) + config.Name, averageCPU, averageMemory, averageDiskUsage) }