416 lines
10 KiB
Go
416 lines
10 KiB
Go
package log
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"database/sql"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"dns-server/logger"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
// ArchiveQueryEngine 归档查询引擎
|
|
type ArchiveQueryEngine struct {
|
|
mainStore *SQLiteStore
|
|
archiveMgr *ArchiveManager
|
|
config *QueryLogConfig
|
|
|
|
// 临时目录管理
|
|
tempDirs map[string]string
|
|
tempDirsMu sync.Mutex
|
|
}
|
|
|
|
// NewArchiveQueryEngine 创建归档查询引擎
|
|
func NewArchiveQueryEngine(mainStore *SQLiteStore, archiveMgr *ArchiveManager, config *QueryLogConfig) (*ArchiveQueryEngine, error) {
|
|
aqe := &ArchiveQueryEngine{
|
|
mainStore: mainStore,
|
|
archiveMgr: archiveMgr,
|
|
config: config,
|
|
tempDirs: make(map[string]string),
|
|
}
|
|
|
|
// 启动临时目录清理协程
|
|
go aqe.cleanupTempDirsLoop()
|
|
|
|
return aqe, nil
|
|
}
|
|
|
|
// QueryLogs 查询日志(透明查询主库和所有归档)
|
|
func (aqe *ArchiveQueryEngine) QueryLogs(filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
|
// 1. 查询主库
|
|
mainLogs, mainTotal, err := aqe.queryMainStore(filter, page)
|
|
if err != nil {
|
|
logger.Error("查询主库失败", "error", err)
|
|
mainLogs = []QueryLog{}
|
|
mainTotal = 0
|
|
}
|
|
|
|
// 2. 智能优化:如果主库数据足够,直接返回
|
|
if len(mainLogs) > page.Offset {
|
|
// 计算主库中需要返回的数据范围
|
|
start := page.Offset
|
|
end := start + page.Limit
|
|
if end > len(mainLogs) {
|
|
end = len(mainLogs)
|
|
}
|
|
return mainLogs[start:end], mainTotal, nil
|
|
}
|
|
|
|
// 3. 计算需要从归档中获取的数据量
|
|
// 注意:归档库查询时,offset 应该始终为 0,因为我们需要从归档的开头开始查询,然后与主库数据合并后再进行分页
|
|
remainingLimit := page.Offset + page.Limit - len(mainLogs)
|
|
if remainingLimit < 0 {
|
|
remainingLimit = 0
|
|
}
|
|
archivePage := PageParams{
|
|
Limit: remainingLimit,
|
|
Offset: 0,
|
|
SortField: page.SortField,
|
|
SortDirection: page.SortDirection,
|
|
}
|
|
|
|
// 4. 查询归档库
|
|
archiveLogs, archiveTotal, err := aqe.queryArchives(filter, archivePage)
|
|
if err != nil {
|
|
logger.Error("查询归档失败", "error", err)
|
|
archiveLogs = []QueryLog{}
|
|
archiveTotal = 0
|
|
}
|
|
|
|
// 5. 合并结果
|
|
allLogs := append(mainLogs, archiveLogs...)
|
|
total := mainTotal + archiveTotal
|
|
|
|
// 6. 排序(默认按时间倒序)
|
|
if page.SortField == "" || page.SortField == "timestamp" {
|
|
if page.SortDirection == "" || page.SortDirection == "desc" {
|
|
sort.Slice(allLogs, func(i, j int) bool {
|
|
return allLogs[i].Timestamp.After(allLogs[j].Timestamp)
|
|
})
|
|
} else {
|
|
sort.Slice(allLogs, func(i, j int) bool {
|
|
return allLogs[i].Timestamp.Before(allLogs[j].Timestamp)
|
|
})
|
|
}
|
|
}
|
|
|
|
// 7. 分页
|
|
start := 0
|
|
end := page.Limit
|
|
if end > len(allLogs) {
|
|
end = len(allLogs)
|
|
}
|
|
|
|
return allLogs[start:end], total, nil
|
|
}
|
|
|
|
// queryMainStore 查询主库
|
|
func (aqe *ArchiveQueryEngine) queryMainStore(filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
|
if aqe.mainStore == nil {
|
|
return []QueryLog{}, 0, nil
|
|
}
|
|
|
|
// 注意:查询主库时,不使用 offset,只使用 limit + offset,这样可以获取足够的数据与归档库数据合并后再进行分页
|
|
mainPage := PageParams{
|
|
Limit: page.Offset + page.Limit,
|
|
Offset: 0,
|
|
SortField: page.SortField,
|
|
SortDirection: page.SortDirection,
|
|
}
|
|
|
|
// 使用 SQLiteStore 的查询方法
|
|
return aqe.mainStore.QueryLogs(filter, mainPage)
|
|
}
|
|
|
|
// queryArchives 查询所有归档库
|
|
func (aqe *ArchiveQueryEngine) queryArchives(filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
|
// 获取所有归档文件列表
|
|
archives, err := aqe.archiveMgr.GetArchiveList()
|
|
if err != nil {
|
|
logger.Error("获取归档列表失败", "error", err)
|
|
return []QueryLog{}, 0, err
|
|
}
|
|
|
|
if len(archives) == 0 {
|
|
return []QueryLog{}, 0, nil
|
|
}
|
|
|
|
var allLogs []QueryLog
|
|
var totalCount int64 = 0
|
|
|
|
// 计算需要从归档中获取的总数据量
|
|
requiredTotal := int64(page.Offset + page.Limit)
|
|
|
|
// 从新到旧遍历归档
|
|
for _, archive := range archives {
|
|
// 如果已经有足够的数据,提前退出
|
|
if int64(len(allLogs)) >= requiredTotal {
|
|
break
|
|
}
|
|
|
|
// 计算当前归档需要查询的数据量
|
|
remaining := requiredTotal - int64(len(allLogs))
|
|
archivePage := page
|
|
archivePage.Limit = int(remaining)
|
|
archivePage.Offset = 0
|
|
|
|
// 查询单个归档
|
|
logs, count, err := aqe.querySingleArchive(&archive, filter, archivePage)
|
|
if err != nil {
|
|
logger.Warn("查询归档失败", "file", archive.FilePath, "error", err)
|
|
continue
|
|
}
|
|
|
|
allLogs = append(allLogs, logs...)
|
|
totalCount += count
|
|
}
|
|
|
|
return allLogs, totalCount, nil
|
|
}
|
|
|
|
// querySingleArchive 查询单个归档
|
|
func (aqe *ArchiveQueryEngine) querySingleArchive(archive *ArchiveMetadata, filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
|
// 1. 解压归档到临时目录
|
|
tempDir, err := aqe.extractArchive(archive.FilePath)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("解压归档失败:%w", err)
|
|
}
|
|
|
|
// 2. 连接临时数据库
|
|
dbPath := filepath.Join(tempDir, filepath.Base(archive.FilePath))
|
|
dbPath = dbPath[:len(dbPath)-3] // 去掉 .gz 后缀
|
|
|
|
tempDB, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
aqe.cleanupTempDir(tempDir)
|
|
return nil, 0, fmt.Errorf("连接临时数据库失败:%w", err)
|
|
}
|
|
defer tempDB.Close()
|
|
|
|
// 3. 查询数据
|
|
return aqe.queryDatabase(tempDB, filter, page)
|
|
}
|
|
|
|
// queryDatabase 从数据库查询数据
|
|
func (aqe *ArchiveQueryEngine) queryDatabase(db *sql.DB, filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
|
// 构建查询条件
|
|
whereClause := "1=1"
|
|
args := []interface{}{}
|
|
|
|
if filter.Result != "" {
|
|
whereClause += " AND result = ?"
|
|
args = append(args, filter.Result)
|
|
}
|
|
|
|
if filter.QueryType != "" {
|
|
whereClause += " AND query_type = ?"
|
|
args = append(args, filter.QueryType)
|
|
}
|
|
|
|
if !filter.StartTime.IsZero() {
|
|
whereClause += " AND timestamp >= ?"
|
|
args = append(args, filter.StartTime)
|
|
}
|
|
|
|
if !filter.EndTime.IsZero() {
|
|
whereClause += " AND timestamp <= ?"
|
|
args = append(args, filter.EndTime)
|
|
}
|
|
|
|
if filter.SearchTerm != "" {
|
|
whereClause += " AND (domain LIKE ? OR client_ip LIKE ?)"
|
|
searchTerm := "%" + filter.SearchTerm + "%"
|
|
args = append(args, searchTerm, searchTerm)
|
|
}
|
|
|
|
// 构建排序
|
|
sortField := page.SortField
|
|
if sortField == "" {
|
|
sortField = "timestamp"
|
|
}
|
|
sortDirection := page.SortDirection
|
|
if sortDirection == "" {
|
|
sortDirection = "DESC"
|
|
}
|
|
|
|
// 查询
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
id, timestamp, client_ip, domain, query_type, response_time,
|
|
result, block_rule, block_type, from_cache, dnssec, edns,
|
|
dns_server, dnssec_server, answers, response_code
|
|
FROM query_logs
|
|
WHERE %s
|
|
ORDER BY %s %s
|
|
LIMIT ? OFFSET ?
|
|
`, whereClause, sortField, sortDirection)
|
|
|
|
queryArgs := make([]interface{}, len(args)+2)
|
|
copy(queryArgs, args)
|
|
queryArgs[len(args)] = page.Limit
|
|
queryArgs[len(args)+1] = page.Offset
|
|
|
|
rows, err := db.Query(query, queryArgs...)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("查询数据库失败:%w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var logs []QueryLog
|
|
for rows.Next() {
|
|
var log QueryLog
|
|
err := rows.Scan(
|
|
&log.ID,
|
|
&log.Timestamp,
|
|
&log.ClientIP,
|
|
&log.Domain,
|
|
&log.QueryType,
|
|
&log.ResponseTime,
|
|
&log.Result,
|
|
&log.BlockRule,
|
|
&log.BlockType,
|
|
&log.FromCache,
|
|
&log.DNSSEC,
|
|
&log.EDNS,
|
|
&log.DNSServer,
|
|
&log.DNSSECServer,
|
|
&log.Answers,
|
|
&log.ResponseCode,
|
|
)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
logs = append(logs, log)
|
|
}
|
|
|
|
// 获取总数(使用单独的 COUNT 查询)
|
|
countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM query_logs WHERE %s`, whereClause)
|
|
var totalCount int64
|
|
err = db.QueryRow(countQuery, args...).Scan(&totalCount)
|
|
if err != nil {
|
|
logger.Warn("获取总数失败", "error", err)
|
|
totalCount = int64(len(logs))
|
|
}
|
|
|
|
return logs, totalCount, nil
|
|
}
|
|
|
|
// extractArchive 解压归档文件到临时目录
|
|
func (aqe *ArchiveQueryEngine) extractArchive(archivePath string) (string, error) {
|
|
aqe.tempDirsMu.Lock()
|
|
defer aqe.tempDirsMu.Unlock()
|
|
|
|
// 创建临时目录
|
|
tempDir, err := os.MkdirTemp("", "dns_archive_*")
|
|
if err != nil {
|
|
return "", fmt.Errorf("创建临时目录失败:%w", err)
|
|
}
|
|
|
|
// 打开压缩文件
|
|
gzFile, err := os.Open(archivePath)
|
|
if err != nil {
|
|
os.RemoveAll(tempDir)
|
|
return "", fmt.Errorf("打开压缩文件失败:%w", err)
|
|
}
|
|
defer gzFile.Close()
|
|
|
|
// 创建 gzip 读取器
|
|
gzReader, err := gzip.NewReader(gzFile)
|
|
if err != nil {
|
|
os.RemoveAll(tempDir)
|
|
return "", fmt.Errorf("创建 gzip 读取器失败:%w", err)
|
|
}
|
|
defer gzReader.Close()
|
|
|
|
// 解压到临时目录
|
|
dstPath := filepath.Join(tempDir, filepath.Base(archivePath))
|
|
dstPath = dstPath[:len(dstPath)-3] // 去掉 .gz 后缀
|
|
|
|
dstFile, err := os.Create(dstPath)
|
|
if err != nil {
|
|
os.RemoveAll(tempDir)
|
|
return "", fmt.Errorf("创建目标文件失败:%w", err)
|
|
}
|
|
defer dstFile.Close()
|
|
|
|
_, err = io.Copy(dstFile, gzReader)
|
|
if err != nil {
|
|
os.RemoveAll(tempDir)
|
|
return "", fmt.Errorf("解压文件失败:%w", err)
|
|
}
|
|
|
|
// 记录临时目录
|
|
aqe.tempDirs[tempDir] = archivePath
|
|
|
|
return tempDir, nil
|
|
}
|
|
|
|
// cleanupTempDir 清理临时目录
|
|
func (aqe *ArchiveQueryEngine) cleanupTempDir(tempDir string) {
|
|
aqe.tempDirsMu.Lock()
|
|
defer aqe.tempDirsMu.Unlock()
|
|
|
|
if err := os.RemoveAll(tempDir); err != nil {
|
|
logger.Warn("清理临时目录失败", "dir", tempDir, "error", err)
|
|
}
|
|
delete(aqe.tempDirs, tempDir)
|
|
}
|
|
|
|
// cleanupTempDirsLoop 定期清理临时目录
|
|
func (aqe *ArchiveQueryEngine) cleanupTempDirsLoop() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
aqe.tempDirsMu.Lock()
|
|
for tempDir := range aqe.tempDirs {
|
|
if err := os.RemoveAll(tempDir); err != nil {
|
|
logger.Warn("清理临时目录失败", "dir", tempDir, "error", err)
|
|
}
|
|
delete(aqe.tempDirs, tempDir)
|
|
}
|
|
aqe.tempDirsMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// GetStats 获取统计信息(包括所有归档)
|
|
func (aqe *ArchiveQueryEngine) GetStats(timeRange TimeRange) (*LogStats, error) {
|
|
// 1. 获取主库统计
|
|
stats, err := aqe.mainStore.GetStats(timeRange)
|
|
if err != nil {
|
|
logger.Error("获取主库统计失败", "error", err)
|
|
stats = &LogStats{
|
|
QueryTypes: make(map[string]int64),
|
|
}
|
|
}
|
|
|
|
// 2. 获取归档统计(简化处理,只统计记录数)
|
|
archives, err := aqe.archiveMgr.GetArchiveList()
|
|
if err == nil {
|
|
for _, archive := range archives {
|
|
stats.TotalQueries += archive.RecordCount
|
|
}
|
|
}
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
// Close 关闭查询引擎
|
|
func (aqe *ArchiveQueryEngine) Close() error {
|
|
// 清理所有临时目录
|
|
aqe.tempDirsMu.Lock()
|
|
for tempDir := range aqe.tempDirs {
|
|
os.RemoveAll(tempDir)
|
|
}
|
|
aqe.tempDirsMu.Unlock()
|
|
|
|
return nil
|
|
}
|