Files
dns-server/log/archive_manager.go
T
2026-04-03 10:04:07 +08:00

541 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package log
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
"dns-server/logger"
)
// QueryLogConfig 查询日志配置(本地定义,避免循环依赖)
type QueryLogConfig struct {
Enabled bool `json:"enabled"`
RingBufferSize int `json:"ringBufferSize"`
DatabasePath string `json:"databasePath"`
MaxDatabaseSizeMB int `json:"maxDatabaseSizeMB"`
EnableWAL bool `json:"enableWAL"`
// 归档配置
ArchiveEnabled bool `json:"archiveEnabled"`
ArchiveDir string `json:"archiveDir"`
ArchivePrefix string `json:"archivePrefix"`
CompressionLevel int `json:"compressionLevel"`
// 清理配置
RetentionDays int `json:"retentionDays"`
RetentionMonths int `json:"retentionMonths"`
// 查询配置
QueryTimeout int `json:"queryTimeout"`
EnableCache bool `json:"enableCache"`
CacheTTL int `json:"cacheTTL"`
}
// ArchiveManager 归档管理器
type ArchiveManager struct {
config *QueryLogConfig
dbPath string
archiveDir string
currentSize int64
sizeThreshold int64
mu sync.Mutex
isArchiving bool
// 定时任务
monthTicker *time.Ticker
sizeTicker *time.Ticker
stopChan chan struct{}
// 元数据缓存
metadataCache map[string]*ArchiveMetadata
cacheMutex sync.RWMutex
}
// ArchiveMetadata 归档元数据
type ArchiveMetadata struct {
ArchiveDate time.Time `json:"archiveDate"`
Month string `json:"month"` // 格式:2026-04
FilePath string `json:"filePath"`
OriginalSize int64 `json:"originalSize"`
CompressedSize int64 `json:"compressedSize"`
RecordCount int64 `json:"recordCount"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
CompressionLevel int `json:"compressionLevel"`
Checksum string `json:"checksum"`
}
// NewArchiveManager 创建归档管理器
func NewArchiveManager(config *QueryLogConfig, dbPath string) (*ArchiveManager, error) {
am := &ArchiveManager{
config: config,
dbPath: dbPath,
archiveDir: config.ArchiveDir,
sizeThreshold: int64(config.MaxDatabaseSizeMB) * 1024 * 1024,
metadataCache: make(map[string]*ArchiveMetadata),
stopChan: make(chan struct{}),
}
// 创建归档目录
if err := os.MkdirAll(am.archiveDir, 0755); err != nil {
return nil, fmt.Errorf("创建归档目录失败:%w", err)
}
// 加载现有元数据
if err := am.loadAllMetadata(); err != nil {
logger.Warn("加载元数据失败", "error", err)
}
// 获取当前数据库大小
if size, err := am.GetDatabaseSize(); err == nil {
am.currentSize = size
}
return am, nil
}
// StartWatching 启动监控(大小 + 月度归档)
func (am *ArchiveManager) StartWatching() {
// 启动大小监控(每分钟检查一次)
am.sizeTicker = time.NewTicker(1 * time.Minute)
go func() {
for {
select {
case <-am.sizeTicker.C:
am.CheckAndArchive()
case <-am.stopChan:
am.sizeTicker.Stop()
return
}
}
}()
// 启动月度归档监控(每天检查一次)
am.monthTicker = time.NewTicker(24 * time.Hour)
go func() {
for {
select {
case <-am.monthTicker.C:
// 检查是否是每月 1 号
now := time.Now()
if now.Day() == 1 && now.Hour() == 0 && now.Minute() == 0 {
am.ArchiveByMonth()
}
case <-am.stopChan:
am.monthTicker.Stop()
return
}
}
}()
logger.Info("归档管理器监控启动")
}
// StopWatching 停止监控
func (am *ArchiveManager) StopWatching() {
close(am.stopChan)
if am.sizeTicker != nil {
am.sizeTicker.Stop()
}
if am.monthTicker != nil {
am.monthTicker.Stop()
}
logger.Info("归档管理器监控停止")
}
// CheckAndArchive 检查是否需要归档(大小或时间)
func (am *ArchiveManager) CheckAndArchive() error {
// 检查数据库大小
if am.currentSize >= am.sizeThreshold {
logger.Info("数据库大小达到阈值,触发归档", "size", am.currentSize, "threshold", am.sizeThreshold)
return am.ArchiveBySize()
}
return nil
}
// ArchiveByMonth 按月归档(每月 1 号执行)
func (am *ArchiveManager) ArchiveByMonth() error {
now := time.Now()
// 只在每月 1 号 00:00 执行
if now.Day() != 1 || now.Hour() != 0 || now.Minute() != 0 {
return nil
}
am.mu.Lock()
defer am.mu.Unlock()
if am.isArchiving {
return fmt.Errorf("正在归档中")
}
am.isArchiving = true
defer func() { am.isArchiving = false }()
// 生成归档文件名(格式:querylog-20260401.db
archiveName := fmt.Sprintf("%s-%s.db", am.config.ArchivePrefix, now.Format("20060102"))
// 创建月份目录(格式:2026-04)
monthDir := filepath.Join(am.archiveDir, now.Format("2006-01"))
if err := os.MkdirAll(monthDir, 0755); err != nil {
return fmt.Errorf("创建月份目录失败:%w", err)
}
// 执行归档
archivePath := filepath.Join(monthDir, archiveName)
if err := am.archiveDatabase(archivePath); err != nil {
return fmt.Errorf("归档数据库失败:%w", err)
}
// 记录元数据
metadata := &ArchiveMetadata{
ArchiveDate: now,
Month: now.Format("2006-01"),
FilePath: archivePath + ".gz",
OriginalSize: am.currentSize,
CompressionLevel: am.config.CompressionLevel,
}
if err := am.saveMetadata(metadata); err != nil {
logger.Error("保存元数据失败", "error", err)
}
logger.Info("月度归档完成",
"month", now.Format("2006-01"),
"file", archivePath+".gz",
"size", am.currentSize)
return nil
}
// ArchiveBySize 按大小归档
func (am *ArchiveManager) ArchiveBySize() error {
am.mu.Lock()
defer am.mu.Unlock()
if am.isArchiving {
return fmt.Errorf("正在归档中")
}
am.isArchiving = true
defer func() { am.isArchiving = false }()
// 生成归档文件名(使用当前日期)
now := time.Now()
archiveName := fmt.Sprintf("%s-%s.db", am.config.ArchivePrefix, now.Format("20060102"))
// 创建月份目录
monthDir := filepath.Join(am.archiveDir, now.Format("2006-01"))
if err := os.MkdirAll(monthDir, 0755); err != nil {
return fmt.Errorf("创建月份目录失败:%w", err)
}
// 执行归档
archivePath := filepath.Join(monthDir, archiveName)
if err := am.archiveDatabase(archivePath); err != nil {
return fmt.Errorf("归档数据库失败:%w", err)
}
// 记录元数据
metadata := &ArchiveMetadata{
ArchiveDate: now,
Month: now.Format("2006-01"),
FilePath: archivePath + ".gz",
OriginalSize: am.currentSize,
CompressionLevel: am.config.CompressionLevel,
}
if err := am.saveMetadata(metadata); err != nil {
logger.Error("保存元数据失败", "error", err)
}
logger.Info("按大小归档完成",
"file", archivePath+".gz",
"size", am.currentSize)
return nil
}
// archiveDatabase 执行归档操作
func (am *ArchiveManager) archiveDatabase(archivePath string) error {
// 1. 压缩归档
if err := am.compressArchive(am.dbPath, archivePath+".gz"); err != nil {
return fmt.Errorf("压缩归档失败:%w", err)
}
// 2. 获取压缩后大小
compressedSize, err := getFileSize(archivePath + ".gz")
if err != nil {
logger.Warn("获取压缩文件大小失败", "error", err)
}
// 3. 获取记录数量(通过元数据或查询数据库)
recordCount := am.getRecordCount()
// 4. 更新时间范围
startTime, endTime := am.getTimeRange()
// 5. 更新元数据
metadata := &ArchiveMetadata{
FilePath: archivePath + ".gz",
CompressedSize: compressedSize,
RecordCount: recordCount,
StartTime: startTime,
EndTime: endTime,
}
return am.saveMetadata(metadata)
}
// compressArchive 压缩归档文件
func (am *ArchiveManager) compressArchive(srcPath, dstPath string) error {
// 打开源文件
srcFile, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("打开源文件失败:%w", err)
}
defer srcFile.Close()
// 创建目标文件
dstFile, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("创建目标文件失败:%w", err)
}
defer dstFile.Close()
// 创建 gzip 写入器
gzWriter, err := gzip.NewWriterLevel(dstFile, am.config.CompressionLevel)
if err != nil {
return fmt.Errorf("创建 gzip 写入器失败:%w", err)
}
defer gzWriter.Close()
// 复制数据
_, err = io.Copy(gzWriter, srcFile)
if err != nil {
return fmt.Errorf("压缩数据失败:%w", err)
}
return nil
}
// saveMetadata 保存元数据
func (am *ArchiveManager) saveMetadata(metadata *ArchiveMetadata) error {
if metadata.Month == "" {
metadata.Month = time.Now().Format("2006-01")
}
// 元数据文件路径
metaPath := filepath.Join(am.archiveDir, metadata.Month,
filepath.Base(metadata.FilePath)+".meta.json")
// 写入元数据文件
data, err := json.MarshalIndent(metadata, "", " ")
if err != nil {
return fmt.Errorf("序列化元数据失败:%w", err)
}
if err := os.WriteFile(metaPath, data, 0644); err != nil {
return fmt.Errorf("写入元数据文件失败:%w", err)
}
// 更新缓存
am.cacheMutex.Lock()
key := metadata.Month + "_" + filepath.Base(metadata.FilePath)
am.metadataCache[key] = metadata
am.cacheMutex.Unlock()
return nil
}
// loadMetadata 加载元数据
func (am *ArchiveManager) loadMetadata(month, fileName string) (*ArchiveMetadata, error) {
key := month + "_" + fileName
// 先查缓存
am.cacheMutex.RLock()
if meta, ok := am.metadataCache[key]; ok {
am.cacheMutex.RUnlock()
return meta, nil
}
am.cacheMutex.RUnlock()
// 从文件加载
metaPath := filepath.Join(am.archiveDir, month, fileName+".meta.json")
data, err := os.ReadFile(metaPath)
if err != nil {
return nil, fmt.Errorf("读取元数据文件失败:%w", err)
}
var metadata ArchiveMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
return nil, fmt.Errorf("解析元数据失败:%w", err)
}
// 更新缓存
am.cacheMutex.Lock()
am.metadataCache[key] = &metadata
am.cacheMutex.Unlock()
return &metadata, nil
}
// loadAllMetadata 加载所有元数据
func (am *ArchiveManager) loadAllMetadata() error {
// 遍历归档目录
months, err := os.ReadDir(am.archiveDir)
if err != nil {
if os.IsNotExist(err) {
return nil // 目录不存在,正常
}
return fmt.Errorf("读取归档目录失败:%w", err)
}
for _, monthDir := range months {
if !monthDir.IsDir() {
continue
}
month := monthDir.Name()
files, err := os.ReadDir(filepath.Join(am.archiveDir, month))
if err != nil {
continue
}
for _, file := range files {
if filepath.Ext(file.Name()) == ".meta.json" {
fileName := file.Name()[:len(file.Name())-len(".meta.json")]
_, err := am.loadMetadata(month, fileName)
if err != nil {
logger.Warn("加载元数据失败", "file", file.Name(), "error", err)
}
}
}
}
logger.Info("元数据加载完成", "count", len(am.metadataCache))
return nil
}
// GetArchiveList 获取所有归档文件列表
func (am *ArchiveManager) GetArchiveList() ([]ArchiveMetadata, error) {
am.cacheMutex.RLock()
defer am.cacheMutex.RUnlock()
archives := make([]ArchiveMetadata, 0, len(am.metadataCache))
for _, meta := range am.metadataCache {
archives = append(archives, *meta)
}
// 按归档日期排序
for i := 0; i < len(archives)-1; i++ {
for j := i + 1; j < len(archives); j++ {
if archives[i].ArchiveDate.Before(archives[j].ArchiveDate) {
archives[i], archives[j] = archives[j], archives[i]
}
}
}
return archives, nil
}
// GetDatabaseSize 获取当前数据库大小
func (am *ArchiveManager) GetDatabaseSize() (int64, error) {
return getFileSize(am.dbPath)
}
// getRecordCount 获取记录数量
func (am *ArchiveManager) getRecordCount() int64 {
// 这里简化处理,返回 0
// 实际实现需要查询数据库
return 0
}
// getTimeRange 获取时间范围
func (am *ArchiveManager) getTimeRange() (time.Time, time.Time) {
now := time.Now()
// 简化处理,返回当前时间
return now, now
}
// getFileSize 获取文件大小
func getFileSize(path string) (int64, error) {
info, err := os.Stat(path)
if err != nil {
return 0, err
}
return info.Size(), nil
}
// CleanupOldArchives 清理旧归档
func (am *ArchiveManager) CleanupOldArchives() (int64, error) {
am.mu.Lock()
defer am.mu.Unlock()
archives, err := am.GetArchiveList()
if err != nil {
return 0, err
}
now := time.Now()
deleted := int64(0)
for _, archive := range archives {
shouldDelete := false
// 按天数清理
if am.config.RetentionDays > 0 {
age := now.Sub(archive.ArchiveDate)
if age > time.Duration(am.config.RetentionDays)*24*time.Hour {
shouldDelete = true
}
}
// 按月数清理
if am.config.RetentionMonths > 0 {
months := int(now.Sub(archive.ArchiveDate).Hours() / 24 / 30)
if months > am.config.RetentionMonths {
shouldDelete = true
}
}
if shouldDelete {
if err := am.deleteArchive(archive); err != nil {
logger.Error("删除归档失败", "file", archive.FilePath, "error", err)
} else {
deleted++
logger.Info("删除旧归档", "file", archive.FilePath, "month", archive.Month)
}
}
}
return deleted, nil
}
// deleteArchive 删除单个归档
func (am *ArchiveManager) deleteArchive(archive ArchiveMetadata) error {
// 删除压缩文件
if err := os.Remove(archive.FilePath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("删除归档文件失败:%w", err)
}
// 删除元数据文件
metaPath := archive.FilePath + ".meta.json"
if err := os.Remove(metaPath); err != nil && !os.IsNotExist(err) {
logger.Warn("删除元数据文件失败", "file", metaPath, "error", err)
}
// 从缓存中移除
am.cacheMutex.Lock()
key := archive.Month + "_" + filepath.Base(archive.FilePath)
delete(am.metadataCache, key)
am.cacheMutex.Unlock()
return nil
}