增加威胁域名审计
This commit is contained in:
@@ -0,0 +1,382 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"dns-server/logger"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// ArchiveQueryEngine 归档查询引擎
|
||||
type ArchiveQueryEngine struct {
|
||||
mainStore *SQLiteStore
|
||||
archiveMgr *ArchiveManager
|
||||
config *QueryLogConfig
|
||||
|
||||
// 临时目录管理
|
||||
tempDirs map[string]string
|
||||
tempDirsMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewArchiveQueryEngine 创建归档查询引擎
|
||||
func NewArchiveQueryEngine(mainStore *SQLiteStore, archiveMgr *ArchiveManager, config *QueryLogConfig) (*ArchiveQueryEngine, error) {
|
||||
aqe := &ArchiveQueryEngine{
|
||||
mainStore: mainStore,
|
||||
archiveMgr: archiveMgr,
|
||||
config: config,
|
||||
tempDirs: make(map[string]string),
|
||||
}
|
||||
|
||||
// 启动临时目录清理协程
|
||||
go aqe.cleanupTempDirsLoop()
|
||||
|
||||
return aqe, nil
|
||||
}
|
||||
|
||||
// QueryLogs 查询日志(透明查询主库和所有归档)
|
||||
func (aqe *ArchiveQueryEngine) QueryLogs(filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
||||
// 1. 查询主库
|
||||
mainLogs, mainTotal, err := aqe.queryMainStore(filter, page)
|
||||
if err != nil {
|
||||
logger.Error("查询主库失败", "error", err)
|
||||
mainLogs = []QueryLog{}
|
||||
mainTotal = 0
|
||||
}
|
||||
|
||||
// 2. 智能优化:如果主库数据足够且是第一页,直接返回
|
||||
if page.Offset == 0 && len(mainLogs) >= page.Limit {
|
||||
return mainLogs, mainTotal, nil
|
||||
}
|
||||
|
||||
// 3. 查询所有归档库
|
||||
archiveLogs, archiveTotal, err := aqe.queryArchives(filter, page)
|
||||
if err != nil {
|
||||
logger.Error("查询归档失败", "error", err)
|
||||
archiveLogs = []QueryLog{}
|
||||
archiveTotal = 0
|
||||
}
|
||||
|
||||
// 4. 合并结果
|
||||
allLogs := append(mainLogs, archiveLogs...)
|
||||
total := mainTotal + archiveTotal
|
||||
|
||||
// 5. 排序(默认按时间倒序)
|
||||
if page.SortField == "" || page.SortField == "timestamp" {
|
||||
if page.SortDirection == "" || page.SortDirection == "desc" {
|
||||
sort.Slice(allLogs, func(i, j int) bool {
|
||||
return allLogs[i].Timestamp.After(allLogs[j].Timestamp)
|
||||
})
|
||||
} else {
|
||||
sort.Slice(allLogs, func(i, j int) bool {
|
||||
return allLogs[i].Timestamp.Before(allLogs[j].Timestamp)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 6. 分页
|
||||
start := page.Offset
|
||||
if start >= len(allLogs) {
|
||||
return []QueryLog{}, total, nil
|
||||
}
|
||||
|
||||
end := start + page.Limit
|
||||
if end > len(allLogs) {
|
||||
end = len(allLogs)
|
||||
}
|
||||
|
||||
return allLogs[start:end], total, nil
|
||||
}
|
||||
|
||||
// queryMainStore 查询主库
|
||||
func (aqe *ArchiveQueryEngine) queryMainStore(filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
||||
if aqe.mainStore == nil {
|
||||
return []QueryLog{}, 0, nil
|
||||
}
|
||||
|
||||
// 使用 SQLiteStore 的查询方法
|
||||
return aqe.mainStore.QueryLogs(filter, page)
|
||||
}
|
||||
|
||||
// queryArchives 查询所有归档库
|
||||
func (aqe *ArchiveQueryEngine) queryArchives(filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
||||
// 获取所有归档文件列表
|
||||
archives, err := aqe.archiveMgr.GetArchiveList()
|
||||
if err != nil {
|
||||
logger.Error("获取归档列表失败", "error", err)
|
||||
return []QueryLog{}, 0, err
|
||||
}
|
||||
|
||||
if len(archives) == 0 {
|
||||
return []QueryLog{}, 0, nil
|
||||
}
|
||||
|
||||
var allLogs []QueryLog
|
||||
var totalCount int64 = 0
|
||||
|
||||
// 从新到旧遍历归档
|
||||
for _, archive := range archives {
|
||||
// 如果已经有足够的数据,提前退出
|
||||
if int64(len(allLogs)) >= int64(page.Limit) && page.Offset == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// 查询单个归档
|
||||
logs, count, err := aqe.querySingleArchive(&archive, filter, page)
|
||||
if err != nil {
|
||||
logger.Warn("查询归档失败", "file", archive.FilePath, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
allLogs = append(allLogs, logs...)
|
||||
totalCount += count
|
||||
}
|
||||
|
||||
return allLogs, totalCount, nil
|
||||
}
|
||||
|
||||
// querySingleArchive 查询单个归档
|
||||
func (aqe *ArchiveQueryEngine) querySingleArchive(archive *ArchiveMetadata, filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
||||
// 1. 解压归档到临时目录
|
||||
tempDir, err := aqe.extractArchive(archive.FilePath)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("解压归档失败:%w", err)
|
||||
}
|
||||
|
||||
// 2. 连接临时数据库
|
||||
dbPath := filepath.Join(tempDir, filepath.Base(archive.FilePath))
|
||||
dbPath = dbPath[:len(dbPath)-3] // 去掉 .gz 后缀
|
||||
|
||||
tempDB, err := sql.Open("sqlite3", dbPath)
|
||||
if err != nil {
|
||||
aqe.cleanupTempDir(tempDir)
|
||||
return nil, 0, fmt.Errorf("连接临时数据库失败:%w", err)
|
||||
}
|
||||
defer tempDB.Close()
|
||||
|
||||
// 3. 查询数据
|
||||
return aqe.queryDatabase(tempDB, filter, page)
|
||||
}
|
||||
|
||||
// queryDatabase 从数据库查询数据
|
||||
func (aqe *ArchiveQueryEngine) queryDatabase(db *sql.DB, filter LogFilter, page PageParams) ([]QueryLog, int64, error) {
|
||||
// 构建查询条件
|
||||
whereClause := "1=1"
|
||||
args := []interface{}{}
|
||||
|
||||
if filter.Result != "" {
|
||||
whereClause += " AND result = ?"
|
||||
args = append(args, filter.Result)
|
||||
}
|
||||
|
||||
if filter.QueryType != "" {
|
||||
whereClause += " AND query_type = ?"
|
||||
args = append(args, filter.QueryType)
|
||||
}
|
||||
|
||||
if !filter.StartTime.IsZero() {
|
||||
whereClause += " AND timestamp >= ?"
|
||||
args = append(args, filter.StartTime)
|
||||
}
|
||||
|
||||
if !filter.EndTime.IsZero() {
|
||||
whereClause += " AND timestamp <= ?"
|
||||
args = append(args, filter.EndTime)
|
||||
}
|
||||
|
||||
if filter.SearchTerm != "" {
|
||||
whereClause += " AND (domain LIKE ? OR client_ip LIKE ?)"
|
||||
searchTerm := "%" + filter.SearchTerm + "%"
|
||||
args = append(args, searchTerm, searchTerm)
|
||||
}
|
||||
|
||||
// 构建排序
|
||||
sortField := page.SortField
|
||||
if sortField == "" {
|
||||
sortField = "timestamp"
|
||||
}
|
||||
sortDirection := page.SortDirection
|
||||
if sortDirection == "" {
|
||||
sortDirection = "DESC"
|
||||
}
|
||||
|
||||
// 查询
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
id, timestamp, client_ip, domain, query_type, response_time,
|
||||
result, block_rule, block_type, from_cache, dnssec, edns,
|
||||
dns_server, dnssec_server, answers, response_code
|
||||
FROM query_logs
|
||||
WHERE %s
|
||||
ORDER BY %s %s
|
||||
LIMIT ? OFFSET ?
|
||||
`, whereClause, sortField, sortDirection)
|
||||
|
||||
queryArgs := make([]interface{}, len(args)+2)
|
||||
copy(queryArgs, args)
|
||||
queryArgs[len(args)] = page.Limit
|
||||
queryArgs[len(args)+1] = page.Offset
|
||||
|
||||
rows, err := db.Query(query, queryArgs...)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("查询数据库失败:%w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var logs []QueryLog
|
||||
for rows.Next() {
|
||||
var log QueryLog
|
||||
err := rows.Scan(
|
||||
&log.ID,
|
||||
&log.Timestamp,
|
||||
&log.ClientIP,
|
||||
&log.Domain,
|
||||
&log.QueryType,
|
||||
&log.ResponseTime,
|
||||
&log.Result,
|
||||
&log.BlockRule,
|
||||
&log.BlockType,
|
||||
&log.FromCache,
|
||||
&log.DNSSEC,
|
||||
&log.EDNS,
|
||||
&log.DNSServer,
|
||||
&log.DNSSECServer,
|
||||
&log.Answers,
|
||||
&log.ResponseCode,
|
||||
)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
logs = append(logs, log)
|
||||
}
|
||||
|
||||
// 获取总数(使用单独的 COUNT 查询)
|
||||
countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM query_logs WHERE %s`, whereClause)
|
||||
var totalCount int64
|
||||
err = db.QueryRow(countQuery, args...).Scan(&totalCount)
|
||||
if err != nil {
|
||||
logger.Warn("获取总数失败", "error", err)
|
||||
totalCount = int64(len(logs))
|
||||
}
|
||||
|
||||
return logs, totalCount, nil
|
||||
}
|
||||
|
||||
// extractArchive 解压归档文件到临时目录
|
||||
func (aqe *ArchiveQueryEngine) extractArchive(archivePath string) (string, error) {
|
||||
aqe.tempDirsMu.Lock()
|
||||
defer aqe.tempDirsMu.Unlock()
|
||||
|
||||
// 创建临时目录
|
||||
tempDir, err := os.MkdirTemp("", "dns_archive_*")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("创建临时目录失败:%w", err)
|
||||
}
|
||||
|
||||
// 打开压缩文件
|
||||
gzFile, err := os.Open(archivePath)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempDir)
|
||||
return "", fmt.Errorf("打开压缩文件失败:%w", err)
|
||||
}
|
||||
defer gzFile.Close()
|
||||
|
||||
// 创建 gzip 读取器
|
||||
gzReader, err := gzip.NewReader(gzFile)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempDir)
|
||||
return "", fmt.Errorf("创建 gzip 读取器失败:%w", err)
|
||||
}
|
||||
defer gzReader.Close()
|
||||
|
||||
// 解压到临时目录
|
||||
dstPath := filepath.Join(tempDir, filepath.Base(archivePath))
|
||||
dstPath = dstPath[:len(dstPath)-3] // 去掉 .gz 后缀
|
||||
|
||||
dstFile, err := os.Create(dstPath)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempDir)
|
||||
return "", fmt.Errorf("创建目标文件失败:%w", err)
|
||||
}
|
||||
defer dstFile.Close()
|
||||
|
||||
_, err = io.Copy(dstFile, gzReader)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempDir)
|
||||
return "", fmt.Errorf("解压文件失败:%w", err)
|
||||
}
|
||||
|
||||
// 记录临时目录
|
||||
aqe.tempDirs[tempDir] = archivePath
|
||||
|
||||
return tempDir, nil
|
||||
}
|
||||
|
||||
// cleanupTempDir 清理临时目录
|
||||
func (aqe *ArchiveQueryEngine) cleanupTempDir(tempDir string) {
|
||||
aqe.tempDirsMu.Lock()
|
||||
defer aqe.tempDirsMu.Unlock()
|
||||
|
||||
if err := os.RemoveAll(tempDir); err != nil {
|
||||
logger.Warn("清理临时目录失败", "dir", tempDir, "error", err)
|
||||
}
|
||||
delete(aqe.tempDirs, tempDir)
|
||||
}
|
||||
|
||||
// cleanupTempDirsLoop 定期清理临时目录
|
||||
func (aqe *ArchiveQueryEngine) cleanupTempDirsLoop() {
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
aqe.tempDirsMu.Lock()
|
||||
for tempDir := range aqe.tempDirs {
|
||||
if err := os.RemoveAll(tempDir); err != nil {
|
||||
logger.Warn("清理临时目录失败", "dir", tempDir, "error", err)
|
||||
}
|
||||
delete(aqe.tempDirs, tempDir)
|
||||
}
|
||||
aqe.tempDirsMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats 获取统计信息(包括所有归档)
|
||||
func (aqe *ArchiveQueryEngine) GetStats(timeRange TimeRange) (*LogStats, error) {
|
||||
// 1. 获取主库统计
|
||||
stats, err := aqe.mainStore.GetStats(timeRange)
|
||||
if err != nil {
|
||||
logger.Error("获取主库统计失败", "error", err)
|
||||
stats = &LogStats{
|
||||
QueryTypes: make(map[string]int64),
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 获取归档统计(简化处理,只统计记录数)
|
||||
archives, err := aqe.archiveMgr.GetArchiveList()
|
||||
if err == nil {
|
||||
for _, archive := range archives {
|
||||
stats.TotalQueries += archive.RecordCount
|
||||
}
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// Close 关闭查询引擎
|
||||
func (aqe *ArchiveQueryEngine) Close() error {
|
||||
// 清理所有临时目录
|
||||
aqe.tempDirsMu.Lock()
|
||||
for tempDir := range aqe.tempDirs {
|
||||
os.RemoveAll(tempDir)
|
||||
}
|
||||
aqe.tempDirsMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user