实现Agent的API数据和端口功能
This commit is contained in:
BIN
agent/agent
BIN
agent/agent
Binary file not shown.
@@ -4,5 +4,7 @@
|
|||||||
"name": "cloud",
|
"name": "cloud",
|
||||||
"device_id": "yunc",
|
"device_id": "yunc",
|
||||||
"token": "0b1f00e76e28beaed3be71d13e25aceb",
|
"token": "0b1f00e76e28beaed3be71d13e25aceb",
|
||||||
"interval": "10s"
|
"interval": "10s",
|
||||||
|
"debug": true,
|
||||||
|
"api_port": 8082
|
||||||
}
|
}
|
||||||
254
agent/main.go
254
agent/main.go
@@ -7,6 +7,8 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/shirou/gopsutil/cpu"
|
"github.com/shirou/gopsutil/cpu"
|
||||||
@@ -23,6 +25,8 @@ type Config struct {
|
|||||||
DeviceID string `json:"device_id"` // 向后兼容,保留
|
DeviceID string `json:"device_id"` // 向后兼容,保留
|
||||||
Token string `json:"token"` // 设备认证令牌
|
Token string `json:"token"` // 设备认证令牌
|
||||||
Interval string `json:"interval"` // 采集间隔
|
Interval string `json:"interval"` // 采集间隔
|
||||||
|
Debug bool `json:"debug"` // 调试模式
|
||||||
|
APIPort int `json:"api_port"` // API端口
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metrics 监控指标
|
// Metrics 监控指标
|
||||||
@@ -49,6 +53,15 @@ var (
|
|||||||
lastCollectTime time.Time
|
lastCollectTime time.Time
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// 保存采集到的数据点
|
||||||
|
var metricsBuffer []*Metrics
|
||||||
|
var metricsBufferMutex sync.Mutex
|
||||||
|
|
||||||
|
// 初始化互斥锁
|
||||||
|
func init() {
|
||||||
|
metricsBuffer = make([]*Metrics, 0)
|
||||||
|
}
|
||||||
|
|
||||||
// 初始化配置
|
// 初始化配置
|
||||||
func initConfig() {
|
func initConfig() {
|
||||||
// 默认配置
|
// 默认配置
|
||||||
@@ -59,6 +72,8 @@ func initConfig() {
|
|||||||
DeviceID: "default", // 向后兼容,保留
|
DeviceID: "default", // 向后兼容,保留
|
||||||
Token: "", // 设备认证令牌,从配置或环境变量读取
|
Token: "", // 设备认证令牌,从配置或环境变量读取
|
||||||
Interval: "10s",
|
Interval: "10s",
|
||||||
|
Debug: false, // 默认非调试模式
|
||||||
|
APIPort: 8081, // 默认API端口8081
|
||||||
}
|
}
|
||||||
|
|
||||||
// 读取配置文件
|
// 读取配置文件
|
||||||
@@ -85,7 +100,11 @@ func initConfig() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 打印配置信息
|
// 打印配置信息
|
||||||
log.Printf("Agent ID: %s, Name: %s, DeviceID: %s", config.ID, config.Name, config.DeviceID)
|
if config.Debug {
|
||||||
|
log.Printf("Agent ID: %s, Name: %s, DeviceID: %s, Debug: %v, API Port: %d", config.ID, config.Name, config.DeviceID, config.Debug, config.APIPort)
|
||||||
|
} else {
|
||||||
|
log.Printf("Agent ID: %s, Name: %s, DeviceID: %s", config.ID, config.Name, config.DeviceID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 从环境变量读取配置
|
// 从环境变量读取配置
|
||||||
@@ -113,6 +132,20 @@ func loadFromEnv() {
|
|||||||
if intervalStr := os.Getenv("AGENT_INTERVAL"); intervalStr != "" {
|
if intervalStr := os.Getenv("AGENT_INTERVAL"); intervalStr != "" {
|
||||||
config.Interval = intervalStr
|
config.Interval = intervalStr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if debugStr := os.Getenv("AGENT_DEBUG"); debugStr != "" {
|
||||||
|
debug, err := strconv.ParseBool(debugStr)
|
||||||
|
if err == nil {
|
||||||
|
config.Debug = debug
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if apiPortStr := os.Getenv("AGENT_API_PORT"); apiPortStr != "" {
|
||||||
|
apiPort, err := strconv.Atoi(apiPortStr)
|
||||||
|
if err == nil {
|
||||||
|
config.APIPort = apiPort
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 确保Agent ID存在,不存在则生成
|
// 确保Agent ID存在,不存在则生成
|
||||||
@@ -194,7 +227,7 @@ func readConfigFile() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 合并配置:只覆盖非零值
|
// 合并配置:所有字段都从配置文件覆盖,除了空字符串
|
||||||
if fileConfig.ServerURL != "" {
|
if fileConfig.ServerURL != "" {
|
||||||
config.ServerURL = fileConfig.ServerURL
|
config.ServerURL = fileConfig.ServerURL
|
||||||
}
|
}
|
||||||
@@ -219,6 +252,15 @@ func readConfigFile() {
|
|||||||
config.Interval = fileConfig.Interval
|
config.Interval = fileConfig.Interval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 对于bool类型,总是从配置文件覆盖
|
||||||
|
config.Debug = fileConfig.Debug
|
||||||
|
// 对于int类型,如果配置文件中的值大于0,则使用配置文件中的值,否则使用默认值
|
||||||
|
if fileConfig.APIPort > 0 {
|
||||||
|
config.APIPort = fileConfig.APIPort
|
||||||
|
} else {
|
||||||
|
config.APIPort = 8081 // 默认API端口
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("Config loaded from %s", configFile)
|
log.Printf("Config loaded from %s", configFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -352,14 +394,14 @@ func collectMetrics() (*Metrics, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 发送指标到服务器
|
// 发送指标到服务器
|
||||||
func sendMetrics(metrics *Metrics) error {
|
func sendMetrics(metricsList []*Metrics) error {
|
||||||
// 创建HTTP客户端
|
// 创建HTTP客户端
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将指标转换为JSON
|
// 将指标转换为JSON
|
||||||
jsonData, err := json.Marshal(metrics)
|
jsonData, err := json.Marshal(metricsList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -399,60 +441,126 @@ func sendMetrics(metrics *Metrics) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 启动HTTP服务器,提供本地指标查询
|
// 启动HTTP服务器,提供本地指标查询
|
||||||
// func startHTTPServer() {
|
// 启动HTTP服务器
|
||||||
// // 指标查询端点
|
func startHTTPServer() {
|
||||||
// http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
|
// 指标查询端点
|
||||||
// metrics, err := collectMetrics()
|
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
|
||||||
// if err != nil {
|
if config.Debug {
|
||||||
// http.Error(w, err.Error(), http.StatusInternalServerError)
|
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
|
||||||
// return
|
}
|
||||||
// }
|
metrics, err := collectMetrics()
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// // 设置响应头
|
// 设置响应头
|
||||||
// w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
// // 返回JSON响应
|
// 返回JSON响应
|
||||||
// json.NewEncoder(w).Encode(metrics)
|
json.NewEncoder(w).Encode(metrics)
|
||||||
// })
|
})
|
||||||
|
|
||||||
// // 健康检查端点
|
// 健康检查端点
|
||||||
// http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
||||||
// w.WriteHeader(http.StatusOK)
|
if config.Debug {
|
||||||
// w.Write([]byte("OK"))
|
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
|
||||||
// })
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{
|
||||||
|
"status": "ok",
|
||||||
|
"agent_id": config.ID,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
// // 启动服务器
|
// 获取配置端点
|
||||||
// log.Println("Starting HTTP server on :8081")
|
http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
|
||||||
// if err := http.ListenAndServe(":8081", nil); err != nil {
|
if config.Debug {
|
||||||
// log.Fatalf("Failed to start HTTP server: %v", err)
|
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
|
||||||
// }
|
}
|
||||||
// }
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(config)
|
||||||
|
})
|
||||||
|
|
||||||
|
// 获取状态端点
|
||||||
|
http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if config.Debug {
|
||||||
|
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
|
||||||
|
}
|
||||||
|
// 采集当前状态
|
||||||
|
cpu, _ := collectCPU()
|
||||||
|
memory, _ := collectMemory()
|
||||||
|
disk, _ := collectDisk()
|
||||||
|
|
||||||
|
status := map[string]interface{}{
|
||||||
|
"status": "running",
|
||||||
|
"agent_id": config.ID,
|
||||||
|
"name": config.Name,
|
||||||
|
"device_id": config.DeviceID,
|
||||||
|
"debug": config.Debug,
|
||||||
|
"interval": config.Interval,
|
||||||
|
"cpu": cpu,
|
||||||
|
"memory": memory,
|
||||||
|
"disk": disk,
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(status)
|
||||||
|
})
|
||||||
|
|
||||||
|
// 立即采集和发送指标端点
|
||||||
|
http.HandleFunc("/collect", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if config.Debug {
|
||||||
|
log.Printf("API Request: %s %s", r.Method, r.URL.Path)
|
||||||
|
}
|
||||||
|
go collectAndSendMetrics()
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{
|
||||||
|
"status": "ok",
|
||||||
|
"message": "Metrics collection triggered",
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// 启动服务器
|
||||||
|
addr := fmt.Sprintf(":%d", config.APIPort)
|
||||||
|
log.Printf("Starting HTTP server on %s", addr)
|
||||||
|
if err := http.ListenAndServe(addr, nil); err != nil {
|
||||||
|
log.Fatalf("Failed to start HTTP server: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// 初始化配置
|
// 初始化配置
|
||||||
initConfig()
|
initConfig()
|
||||||
|
|
||||||
// 启动HTTP服务器(异步)
|
// 启动HTTP服务器(异步)
|
||||||
// go startHTTPServer()
|
go startHTTPServer()
|
||||||
|
|
||||||
log.Printf("Agent started, reporting to %s every %v", config.ServerURL, config.Interval)
|
log.Printf("Agent started, reporting to %s every %v, collecting data every 1s", config.ServerURL, config.Interval)
|
||||||
|
|
||||||
// 启动时立即采集和发送一次指标
|
// 启动时立即采集一次数据
|
||||||
collectAndSendMetrics()
|
collectMetricsToBuffer()
|
||||||
|
|
||||||
// 定期采集和发送指标
|
// 创建两个ticker:一个用于每秒采集数据,一个用于定期发送数据
|
||||||
ticker := time.NewTicker(parsedInterval)
|
collectTicker := time.NewTicker(1 * time.Second)
|
||||||
defer ticker.Stop()
|
sendTicker := time.NewTicker(parsedInterval)
|
||||||
|
defer collectTicker.Stop()
|
||||||
|
defer sendTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-collectTicker.C:
|
||||||
|
// 每秒采集一次数据
|
||||||
|
collectMetricsToBuffer()
|
||||||
|
case <-sendTicker.C:
|
||||||
|
// 定期发送采集到的数据
|
||||||
collectAndSendMetrics()
|
collectAndSendMetrics()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 采集并发送指标
|
// 每秒采集数据并添加到缓冲区
|
||||||
func collectAndSendMetrics() {
|
func collectMetricsToBuffer() {
|
||||||
// 采集指标
|
// 采集指标
|
||||||
metrics, err := collectMetrics()
|
metrics, err := collectMetrics()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -460,24 +568,78 @@ func collectAndSendMetrics() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 将指标添加到缓冲区
|
||||||
|
metricsBufferMutex.Lock()
|
||||||
|
metricsBuffer = append(metricsBuffer, metrics)
|
||||||
|
metricsBufferMutex.Unlock()
|
||||||
|
|
||||||
|
if config.Debug {
|
||||||
|
// 计算平均磁盘使用率
|
||||||
|
totalDiskUsage := 0.0
|
||||||
|
diskCount := 0
|
||||||
|
for _, usage := range metrics.Disk {
|
||||||
|
totalDiskUsage += usage
|
||||||
|
diskCount++
|
||||||
|
}
|
||||||
|
averageDiskUsage := 0.0
|
||||||
|
if diskCount > 0 {
|
||||||
|
averageDiskUsage = totalDiskUsage / float64(diskCount)
|
||||||
|
}
|
||||||
|
log.Printf("Metrics collected: Agent=%s, CPU=%.2f%%, Memory=%.2f%%, Disk=%.2f%%",
|
||||||
|
config.Name, metrics.CPU, metrics.Memory, averageDiskUsage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送缓冲区中的所有指标
|
||||||
|
func collectAndSendMetrics() {
|
||||||
|
// 从缓冲区获取所有指标
|
||||||
|
metricsBufferMutex.Lock()
|
||||||
|
if len(metricsBuffer) == 0 {
|
||||||
|
metricsBufferMutex.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 创建一个副本并清空缓冲区
|
||||||
|
metricsToSend := make([]*Metrics, len(metricsBuffer))
|
||||||
|
copy(metricsToSend, metricsBuffer)
|
||||||
|
metricsBuffer = make([]*Metrics, 0)
|
||||||
|
metricsBufferMutex.Unlock()
|
||||||
|
|
||||||
// 发送指标
|
// 发送指标
|
||||||
if err := sendMetrics(metrics); err != nil {
|
if err := sendMetrics(metricsToSend); err != nil {
|
||||||
log.Printf("Failed to send metrics: %v", err)
|
log.Printf("Failed to send metrics: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 计算平均磁盘使用率
|
// 计算平均指标值
|
||||||
totalDiskUsage := 0.0
|
var totalCPU, totalMemory, totalDiskUsage float64
|
||||||
diskCount := 0
|
var diskCount int
|
||||||
for _, usage := range metrics.Disk {
|
pointCount := len(metricsToSend)
|
||||||
totalDiskUsage += usage
|
|
||||||
diskCount++
|
for _, metrics := range metricsToSend {
|
||||||
|
totalCPU += metrics.CPU
|
||||||
|
totalMemory += metrics.Memory
|
||||||
|
// 计算磁盘使用率
|
||||||
|
for _, usage := range metrics.Disk {
|
||||||
|
totalDiskUsage += usage
|
||||||
|
diskCount++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 计算平均值
|
||||||
|
averageCPU := 0.0
|
||||||
|
averageMemory := 0.0
|
||||||
averageDiskUsage := 0.0
|
averageDiskUsage := 0.0
|
||||||
|
|
||||||
|
if pointCount > 0 {
|
||||||
|
averageCPU = totalCPU / float64(pointCount)
|
||||||
|
averageMemory = totalMemory / float64(pointCount)
|
||||||
|
}
|
||||||
|
|
||||||
if diskCount > 0 {
|
if diskCount > 0 {
|
||||||
averageDiskUsage = totalDiskUsage / float64(diskCount)
|
averageDiskUsage = totalDiskUsage / float64(diskCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 只在production模式下显示基本指标信息
|
||||||
log.Printf("Metrics sent successfully: Agent=%s, CPU=%.2f%%, Memory=%.2f%%, Disk=%.2f%%",
|
log.Printf("Metrics sent successfully: Agent=%s, CPU=%.2f%%, Memory=%.2f%%, Disk=%.2f%%",
|
||||||
config.Name, metrics.CPU, metrics.Memory, averageDiskUsage)
|
config.Name, averageCPU, averageMemory, averageDiskUsage)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user