实现日志收集功能
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -37,12 +38,13 @@ func RegisterRoutes(r *gin.Engine) {
|
||||
// 监控数据路由
|
||||
metrics := api.Group("/metrics")
|
||||
{
|
||||
metrics.GET("/cpu", GetCPUMetrics)
|
||||
metrics.GET("/memory", GetMemoryMetrics)
|
||||
metrics.GET("/disk", GetDiskMetrics)
|
||||
metrics.GET("/network", GetNetworkMetrics)
|
||||
metrics.GET("/cpu", GetCPUMetrics) // 添加CPU信息查询端点
|
||||
metrics.GET("/memory", GetMemoryMetrics) // 添加内存信息查询端点
|
||||
metrics.GET("/disk", GetDiskMetrics) // 添加磁盘信息查询端点
|
||||
metrics.GET("/network", GetNetworkMetrics) // 添加网络信息查询端点
|
||||
metrics.GET("/processes", GetProcessMetrics) // 添加进程信息查询端点
|
||||
metrics.GET("/disk_details", GetDiskDetails) // 添加磁盘详细信息查询端点
|
||||
metrics.GET("/logs", GetLogs) // 添加系统日志查询端点
|
||||
// 添加POST端点,接收Agent发送的指标数据
|
||||
metrics.POST("/", HandleMetricsPost)
|
||||
}
|
||||
@@ -98,6 +100,13 @@ type DiskDetailMetrics struct {
|
||||
Description string `json:"description"` // 设备描述
|
||||
}
|
||||
|
||||
// LogMetrics 系统日志指标
|
||||
type LogMetrics struct {
|
||||
Source string `json:"source"` // 日志来源
|
||||
Time string `json:"time"` // 日志时间
|
||||
Message string `json:"message"` // 日志内容
|
||||
}
|
||||
|
||||
// NetworkInterfaceMetrics 网卡监控指标
|
||||
type NetworkInterfaceMetrics struct {
|
||||
BytesSent uint64 `json:"bytes_sent"` // 发送速率 (bytes/s)
|
||||
@@ -106,6 +115,14 @@ type NetworkInterfaceMetrics struct {
|
||||
RxBytes uint64 `json:"rx_bytes"` // 累计接收字节数
|
||||
}
|
||||
|
||||
// LogEntry 系统日志条目
|
||||
type LogEntry struct {
|
||||
Sequence int `json:"sequence"` // 日志序号
|
||||
Source string `json:"source"` // 来源
|
||||
Time time.Time `json:"time"` // 发生时间
|
||||
Message string `json:"message"` // 内容
|
||||
}
|
||||
|
||||
// MetricsRequest 指标请求结构
|
||||
type MetricsRequest struct {
|
||||
CPU float64 `json:"cpu"`
|
||||
@@ -115,6 +132,7 @@ type MetricsRequest struct {
|
||||
DiskDetails []DiskDetailMetrics `json:"disk_details"` // 磁盘详细信息
|
||||
Network map[string]NetworkInterfaceMetrics `json:"network"`
|
||||
Processes []ProcessMetrics `json:"processes"` // 进程信息
|
||||
Logs []LogEntry `json:"logs"` // 系统日志
|
||||
RxTotal uint64 `json:"rx_total"` // 所有网卡累计接收字节数总和
|
||||
TxTotal uint64 `json:"tx_total"` // 所有网卡累计发送字节数总和
|
||||
RxRate uint64 `json:"rx_rate"` // 所有网卡实时接收速率总和 (bytes/s)
|
||||
@@ -303,6 +321,14 @@ func HandleMetricsPost(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// 写入日志数据
|
||||
for _, logEntry := range req.Logs {
|
||||
if err := globalStorage.WriteLogMetric(writeCtx, deviceID, logEntry.Sequence, logEntry.Source, logEntry.Time, logEntry.Message, baseTags); err != nil {
|
||||
// 只记录警告,不影响后续指标处理
|
||||
log.Printf("Warning: Failed to write log for device %s: %v", deviceID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 广播指标更新消息,只广播最后一个指标
|
||||
if i == len(metricsList)-1 {
|
||||
// 准备广播的磁盘使用率数据(兼容旧格式)
|
||||
@@ -559,8 +585,8 @@ func GetNetworkMetrics(c *gin.Context) {
|
||||
receivedPoints, err2 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_received", startTime, endTime)
|
||||
|
||||
// 查询发送和接收的累积总流量指标
|
||||
txBytesPoints, err3 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_total_tx_bytes", startTime, endTime)
|
||||
rxBytesPoints, err4 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_total_rx_bytes", startTime, endTime)
|
||||
txBytesPoints, err3 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_tx_bytes", startTime, endTime)
|
||||
rxBytesPoints, err4 := globalStorage.QueryMetrics(context.Background(), deviceID, "network_rx_bytes", startTime, endTime)
|
||||
|
||||
// 处理错误
|
||||
if err1 != nil {
|
||||
@@ -918,8 +944,12 @@ func GetAllDevices(c *gin.Context) {
|
||||
func GetProcessMetrics(c *gin.Context) {
|
||||
// 获取查询参数
|
||||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||||
startTime := c.DefaultQuery("start_time", "-1h")
|
||||
startTime := c.DefaultQuery("start_time", "-24h")
|
||||
endTime := c.DefaultQuery("end_time", "now()")
|
||||
page, _ := strconv.Atoi(c.DefaultQuery("page", "1"))
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "10"))
|
||||
sortBy := c.DefaultQuery("sort_by", "cpu")
|
||||
sortOrder := c.DefaultQuery("sort_order", "desc")
|
||||
|
||||
// 查询数据
|
||||
processes, err := globalStorage.QueryProcessMetrics(context.Background(), deviceID, startTime, endTime)
|
||||
@@ -927,13 +957,23 @@ func GetProcessMetrics(c *gin.Context) {
|
||||
// 只记录警告,返回空数据
|
||||
log.Printf("Warning: Failed to query process metrics: %v", err)
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"data": []map[string]interface{}{},
|
||||
"data": []ProcessMetrics{},
|
||||
"page": page,
|
||||
"limit": limit,
|
||||
"total": 0,
|
||||
"sort_by": sortBy,
|
||||
"sort_order": sortOrder,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"data": processes,
|
||||
"data": processes,
|
||||
"page": page,
|
||||
"limit": limit,
|
||||
"total": len(processes),
|
||||
"sort_by": sortBy,
|
||||
"sort_order": sortOrder,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -941,7 +981,7 @@ func GetProcessMetrics(c *gin.Context) {
|
||||
func GetDiskDetails(c *gin.Context) {
|
||||
// 获取查询参数
|
||||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||||
startTime := c.DefaultQuery("start_time", "-1h")
|
||||
startTime := c.DefaultQuery("start_time", "-24h")
|
||||
endTime := c.DefaultQuery("end_time", "now()")
|
||||
|
||||
// 查询数据
|
||||
@@ -950,7 +990,7 @@ func GetDiskDetails(c *gin.Context) {
|
||||
// 只记录警告,返回空数据
|
||||
log.Printf("Warning: Failed to query disk details: %v", err)
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"data": []map[string]interface{}{},
|
||||
"data": []DiskDetailMetrics{},
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -959,3 +999,62 @@ func GetDiskDetails(c *gin.Context) {
|
||||
"data": diskDetails,
|
||||
})
|
||||
}
|
||||
|
||||
// GetLogs 获取系统日志
|
||||
func GetLogs(c *gin.Context) {
|
||||
// 获取查询参数
|
||||
deviceID := c.Query("device_id") // 不使用默认值,空值表示查询所有设备
|
||||
startTime := c.DefaultQuery("start_time", "-24h")
|
||||
endTime := c.DefaultQuery("end_time", "now()")
|
||||
|
||||
// 查询数据
|
||||
logData, err := globalStorage.QueryLogMetrics(context.Background(), deviceID, startTime, endTime)
|
||||
if err != nil {
|
||||
// 只记录警告,返回空数据
|
||||
log.Printf("Warning: Failed to query logs: %v", err)
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"data": []LogMetrics{},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// 转换为前端需要的格式
|
||||
logs := make([]LogMetrics, 0, len(logData))
|
||||
for _, log := range logData {
|
||||
// 将time.Time转换为字符串
|
||||
timeValue, ok := log["time"].(time.Time)
|
||||
var timeStr string
|
||||
if ok {
|
||||
timeStr = timeValue.Format(time.RFC3339)
|
||||
} else {
|
||||
// 如果不是time.Time类型,尝试转换
|
||||
if timeStrVal, ok := log["time"].(string); ok {
|
||||
timeStr = timeStrVal
|
||||
} else {
|
||||
timeStr = ""
|
||||
}
|
||||
}
|
||||
|
||||
// 获取其他字段
|
||||
source := ""
|
||||
if sourceVal, ok := log["source"].(string); ok {
|
||||
source = sourceVal
|
||||
}
|
||||
|
||||
message := ""
|
||||
if messageVal, ok := log["message"].(string); ok {
|
||||
message = messageVal
|
||||
}
|
||||
|
||||
// 添加到结果列表
|
||||
logs = append(logs, LogMetrics{
|
||||
Source: source,
|
||||
Time: timeStr,
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"data": logs,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -228,6 +228,29 @@ func (s *Storage) WriteDiskDetailMetric(ctx context.Context, deviceID, diskDevic
|
||||
return s.writeData(ctx, "disk_details", allTags, fields, deviceID, "disk_detail")
|
||||
}
|
||||
|
||||
// WriteLogMetric 写入日志指标
|
||||
func (s *Storage) WriteLogMetric(ctx context.Context, deviceID string, sequence int, source string, time time.Time, message string, tags map[string]string) error {
|
||||
// 创建标签映射,合并原有标签和新标签
|
||||
allTags := make(map[string]string)
|
||||
// 复制原有标签
|
||||
for k, v := range tags {
|
||||
allTags[k] = v
|
||||
}
|
||||
// 添加设备ID标签
|
||||
allTags["device_id"] = deviceID
|
||||
// 添加日志来源标签
|
||||
allTags["source"] = source
|
||||
|
||||
// 创建字段映射
|
||||
fields := map[string]interface{}{
|
||||
"sequence": sequence,
|
||||
"message": message,
|
||||
}
|
||||
|
||||
// 使用新的writeData方法
|
||||
return s.writeData(ctx, "logs", allTags, fields, deviceID, "log")
|
||||
}
|
||||
|
||||
// QueryMetrics 查询监控指标
|
||||
func (s *Storage) QueryMetrics(ctx context.Context, deviceID, metricType, startTime, endTime string) ([]MetricPoint, error) {
|
||||
queryAPI := s.client.QueryAPI(s.org)
|
||||
@@ -526,6 +549,67 @@ func (s *Storage) QueryProcessMetrics(ctx context.Context, deviceID string, star
|
||||
return processes, nil
|
||||
}
|
||||
|
||||
// QueryLogMetrics 查询日志指标
|
||||
func (s *Storage) QueryLogMetrics(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
|
||||
queryAPI := s.client.QueryAPI(s.org)
|
||||
|
||||
// 构建查询语句
|
||||
query := `from(bucket: "` + s.bucket + `")
|
||||
|> range(start: ` + startTime + `, stop: ` + endTime + `)
|
||||
|> filter(fn: (r) => r["_measurement"] == "logs")`
|
||||
|
||||
// 如果指定了设备ID,添加设备ID过滤
|
||||
if deviceID != "" {
|
||||
query += `
|
||||
|> filter(fn: (r) => r["device_id"] == "` + deviceID + `")`
|
||||
}
|
||||
|
||||
// 按时间倒序排列,获取最新的日志
|
||||
query += `
|
||||
|> sort(columns: ["_time"], desc: true)
|
||||
|> limit(n: 100)` // 限制返回100条最新日志
|
||||
|
||||
// 执行查询
|
||||
queryResult, err := queryAPI.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer queryResult.Close()
|
||||
|
||||
// 存储日志数据
|
||||
logs := make([]map[string]interface{}, 0)
|
||||
|
||||
// 处理查询结果
|
||||
for queryResult.Next() {
|
||||
if queryResult.TableChanged() {
|
||||
// 表结构变化,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
// 获取记录
|
||||
record := queryResult.Record()
|
||||
|
||||
// 构建日志数据
|
||||
logData := map[string]interface{}{
|
||||
"time": record.Time(),
|
||||
"device_id": record.ValueByKey("device_id"),
|
||||
"source": record.ValueByKey("source"),
|
||||
"sequence": record.ValueByKey("sequence"),
|
||||
"message": record.ValueByKey("message"),
|
||||
"agent_name": record.ValueByKey("agent_name"),
|
||||
}
|
||||
|
||||
// 添加到日志列表
|
||||
logs = append(logs, logData)
|
||||
}
|
||||
|
||||
if queryResult.Err() != nil {
|
||||
return nil, queryResult.Err()
|
||||
}
|
||||
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
// QueryDiskDetails 查询磁盘详细信息
|
||||
func (s *Storage) QueryDiskDetails(ctx context.Context, deviceID string, startTime, endTime string) ([]map[string]interface{}, error) {
|
||||
queryAPI := s.client.QueryAPI(s.org)
|
||||
|
||||
Reference in New Issue
Block a user