package log import ( "compress/gzip" "database/sql" "fmt" "io" "os" "path/filepath" "sort" "sync" "time" "dns-server/logger" _ "github.com/mattn/go-sqlite3" ) // ArchiveQueryEngine 归档查询引擎 type ArchiveQueryEngine struct { mainStore *SQLiteStore archiveMgr *ArchiveManager config *QueryLogConfig // 临时目录管理 tempDirs map[string]string tempDirsMu sync.Mutex } // NewArchiveQueryEngine 创建归档查询引擎 func NewArchiveQueryEngine(mainStore *SQLiteStore, archiveMgr *ArchiveManager, config *QueryLogConfig) (*ArchiveQueryEngine, error) { aqe := &ArchiveQueryEngine{ mainStore: mainStore, archiveMgr: archiveMgr, config: config, tempDirs: make(map[string]string), } // 启动临时目录清理协程 go aqe.cleanupTempDirsLoop() return aqe, nil } // QueryLogs 查询日志(透明查询主库和所有归档) func (aqe *ArchiveQueryEngine) QueryLogs(filter LogFilter, page PageParams) ([]QueryLog, int64, error) { // 1. 查询主库 mainLogs, mainTotal, err := aqe.queryMainStore(filter, page) if err != nil { logger.Error("查询主库失败", "error", err) mainLogs = []QueryLog{} mainTotal = 0 } // 2. 智能优化:如果主库数据足够,直接返回 if len(mainLogs) > page.Offset { // 计算主库中需要返回的数据范围 start := page.Offset end := start + page.Limit if end > len(mainLogs) { end = len(mainLogs) } return mainLogs[start:end], mainTotal, nil } // 3. 计算需要从归档中获取的数据量 // 注意:归档库查询时,offset 应该始终为 0,因为我们需要从归档的开头开始查询,然后与主库数据合并后再进行分页 remainingLimit := page.Offset + page.Limit - len(mainLogs) if remainingLimit < 0 { remainingLimit = 0 } archivePage := PageParams{ Limit: remainingLimit, Offset: 0, SortField: page.SortField, SortDirection: page.SortDirection, } // 4. 查询归档库 archiveLogs, archiveTotal, err := aqe.queryArchives(filter, archivePage) if err != nil { logger.Error("查询归档失败", "error", err) archiveLogs = []QueryLog{} archiveTotal = 0 } // 5. 合并结果 allLogs := append(mainLogs, archiveLogs...) total := mainTotal + archiveTotal // 6. 排序(默认按时间倒序) if page.SortField == "" || page.SortField == "timestamp" { if page.SortDirection == "" || page.SortDirection == "desc" { sort.Slice(allLogs, func(i, j int) bool { return allLogs[i].Timestamp.After(allLogs[j].Timestamp) }) } else { sort.Slice(allLogs, func(i, j int) bool { return allLogs[i].Timestamp.Before(allLogs[j].Timestamp) }) } } // 7. 分页 start := 0 end := page.Limit if end > len(allLogs) { end = len(allLogs) } return allLogs[start:end], total, nil } // queryMainStore 查询主库 func (aqe *ArchiveQueryEngine) queryMainStore(filter LogFilter, page PageParams) ([]QueryLog, int64, error) { if aqe.mainStore == nil { return []QueryLog{}, 0, nil } // 注意:查询主库时,不使用 offset,只使用 limit + offset,这样可以获取足够的数据与归档库数据合并后再进行分页 mainPage := PageParams{ Limit: page.Offset + page.Limit, Offset: 0, SortField: page.SortField, SortDirection: page.SortDirection, } // 使用 SQLiteStore 的查询方法 return aqe.mainStore.QueryLogs(filter, mainPage) } // queryArchives 查询所有归档库 func (aqe *ArchiveQueryEngine) queryArchives(filter LogFilter, page PageParams) ([]QueryLog, int64, error) { // 获取所有归档文件列表 archives, err := aqe.archiveMgr.GetArchiveList() if err != nil { logger.Error("获取归档列表失败", "error", err) return []QueryLog{}, 0, err } if len(archives) == 0 { return []QueryLog{}, 0, nil } var allLogs []QueryLog var totalCount int64 = 0 // 计算需要从归档中获取的总数据量 requiredTotal := int64(page.Offset + page.Limit) // 从新到旧遍历归档 for _, archive := range archives { // 如果已经有足够的数据,提前退出 if int64(len(allLogs)) >= requiredTotal { break } // 计算当前归档需要查询的数据量 remaining := requiredTotal - int64(len(allLogs)) archivePage := page archivePage.Limit = int(remaining) archivePage.Offset = 0 // 查询单个归档 logs, count, err := aqe.querySingleArchive(&archive, filter, archivePage) if err != nil { logger.Warn("查询归档失败", "file", archive.FilePath, "error", err) continue } allLogs = append(allLogs, logs...) totalCount += count } return allLogs, totalCount, nil } // querySingleArchive 查询单个归档 func (aqe *ArchiveQueryEngine) querySingleArchive(archive *ArchiveMetadata, filter LogFilter, page PageParams) ([]QueryLog, int64, error) { // 1. 解压归档到临时目录 tempDir, err := aqe.extractArchive(archive.FilePath) if err != nil { return nil, 0, fmt.Errorf("解压归档失败:%w", err) } // 2. 连接临时数据库 dbPath := filepath.Join(tempDir, filepath.Base(archive.FilePath)) dbPath = dbPath[:len(dbPath)-3] // 去掉 .gz 后缀 tempDB, err := sql.Open("sqlite3", dbPath) if err != nil { aqe.cleanupTempDir(tempDir) return nil, 0, fmt.Errorf("连接临时数据库失败:%w", err) } defer tempDB.Close() // 3. 查询数据 return aqe.queryDatabase(tempDB, filter, page) } // queryDatabase 从数据库查询数据 func (aqe *ArchiveQueryEngine) queryDatabase(db *sql.DB, filter LogFilter, page PageParams) ([]QueryLog, int64, error) { // 构建查询条件 whereClause := "1=1" args := []interface{}{} if filter.Result != "" { whereClause += " AND result = ?" args = append(args, filter.Result) } if filter.QueryType != "" { whereClause += " AND query_type = ?" args = append(args, filter.QueryType) } if !filter.StartTime.IsZero() { whereClause += " AND timestamp >= ?" args = append(args, filter.StartTime) } if !filter.EndTime.IsZero() { whereClause += " AND timestamp <= ?" args = append(args, filter.EndTime) } if filter.SearchTerm != "" { whereClause += " AND (domain LIKE ? OR client_ip LIKE ?)" searchTerm := "%" + filter.SearchTerm + "%" args = append(args, searchTerm, searchTerm) } // 构建排序 sortField := page.SortField if sortField == "" { sortField = "timestamp" } sortDirection := page.SortDirection if sortDirection == "" { sortDirection = "DESC" } // 查询 query := fmt.Sprintf(` SELECT id, timestamp, client_ip, domain, query_type, response_time, result, block_rule, block_type, from_cache, dnssec, edns, dns_server, dnssec_server, answers, response_code FROM query_logs WHERE %s ORDER BY %s %s LIMIT ? OFFSET ? `, whereClause, sortField, sortDirection) queryArgs := make([]interface{}, len(args)+2) copy(queryArgs, args) queryArgs[len(args)] = page.Limit queryArgs[len(args)+1] = page.Offset rows, err := db.Query(query, queryArgs...) if err != nil { return nil, 0, fmt.Errorf("查询数据库失败:%w", err) } defer rows.Close() var logs []QueryLog for rows.Next() { var log QueryLog err := rows.Scan( &log.ID, &log.Timestamp, &log.ClientIP, &log.Domain, &log.QueryType, &log.ResponseTime, &log.Result, &log.BlockRule, &log.BlockType, &log.FromCache, &log.DNSSEC, &log.EDNS, &log.DNSServer, &log.DNSSECServer, &log.Answers, &log.ResponseCode, ) if err != nil { continue } logs = append(logs, log) } // 获取总数(使用单独的 COUNT 查询) countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM query_logs WHERE %s`, whereClause) var totalCount int64 err = db.QueryRow(countQuery, args...).Scan(&totalCount) if err != nil { logger.Warn("获取总数失败", "error", err) totalCount = int64(len(logs)) } return logs, totalCount, nil } // extractArchive 解压归档文件到临时目录 func (aqe *ArchiveQueryEngine) extractArchive(archivePath string) (string, error) { aqe.tempDirsMu.Lock() defer aqe.tempDirsMu.Unlock() // 创建临时目录 tempDir, err := os.MkdirTemp("", "dns_archive_*") if err != nil { return "", fmt.Errorf("创建临时目录失败:%w", err) } // 打开压缩文件 gzFile, err := os.Open(archivePath) if err != nil { os.RemoveAll(tempDir) return "", fmt.Errorf("打开压缩文件失败:%w", err) } defer gzFile.Close() // 创建 gzip 读取器 gzReader, err := gzip.NewReader(gzFile) if err != nil { os.RemoveAll(tempDir) return "", fmt.Errorf("创建 gzip 读取器失败:%w", err) } defer gzReader.Close() // 解压到临时目录 dstPath := filepath.Join(tempDir, filepath.Base(archivePath)) dstPath = dstPath[:len(dstPath)-3] // 去掉 .gz 后缀 dstFile, err := os.Create(dstPath) if err != nil { os.RemoveAll(tempDir) return "", fmt.Errorf("创建目标文件失败:%w", err) } defer dstFile.Close() _, err = io.Copy(dstFile, gzReader) if err != nil { os.RemoveAll(tempDir) return "", fmt.Errorf("解压文件失败:%w", err) } // 记录临时目录 aqe.tempDirs[tempDir] = archivePath return tempDir, nil } // cleanupTempDir 清理临时目录 func (aqe *ArchiveQueryEngine) cleanupTempDir(tempDir string) { aqe.tempDirsMu.Lock() defer aqe.tempDirsMu.Unlock() if err := os.RemoveAll(tempDir); err != nil { logger.Warn("清理临时目录失败", "dir", tempDir, "error", err) } delete(aqe.tempDirs, tempDir) } // cleanupTempDirsLoop 定期清理临时目录 func (aqe *ArchiveQueryEngine) cleanupTempDirsLoop() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for range ticker.C { aqe.tempDirsMu.Lock() for tempDir := range aqe.tempDirs { if err := os.RemoveAll(tempDir); err != nil { logger.Warn("清理临时目录失败", "dir", tempDir, "error", err) } delete(aqe.tempDirs, tempDir) } aqe.tempDirsMu.Unlock() } } // GetStats 获取统计信息(包括所有归档) func (aqe *ArchiveQueryEngine) GetStats(timeRange TimeRange) (*LogStats, error) { // 1. 获取主库统计 stats, err := aqe.mainStore.GetStats(timeRange) if err != nil { logger.Error("获取主库统计失败", "error", err) stats = &LogStats{ QueryTypes: make(map[string]int64), } } // 2. 获取归档统计(简化处理,只统计记录数) archives, err := aqe.archiveMgr.GetArchiveList() if err == nil { for _, archive := range archives { stats.TotalQueries += archive.RecordCount } } return stats, nil } // Close 关闭查询引擎 func (aqe *ArchiveQueryEngine) Close() error { // 清理所有临时目录 aqe.tempDirsMu.Lock() for tempDir := range aqe.tempDirs { os.RemoveAll(tempDir) } aqe.tempDirsMu.Unlock() return nil }