541 lines
14 KiB
Go
541 lines
14 KiB
Go
package log
|
||
|
||
import (
|
||
"compress/gzip"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"os"
|
||
"path/filepath"
|
||
"sync"
|
||
"time"
|
||
|
||
"dns-server/logger"
|
||
)
|
||
|
||
// QueryLogConfig 查询日志配置(本地定义,避免循环依赖)
|
||
type QueryLogConfig struct {
|
||
Enabled bool `json:"enabled"`
|
||
RingBufferSize int `json:"ringBufferSize"`
|
||
DatabasePath string `json:"databasePath"`
|
||
MaxDatabaseSizeMB int `json:"maxDatabaseSizeMB"`
|
||
EnableWAL bool `json:"enableWAL"`
|
||
|
||
// 归档配置
|
||
ArchiveEnabled bool `json:"archiveEnabled"`
|
||
ArchiveDir string `json:"archiveDir"`
|
||
ArchivePrefix string `json:"archivePrefix"`
|
||
CompressionLevel int `json:"compressionLevel"`
|
||
|
||
// 清理配置
|
||
RetentionDays int `json:"retentionDays"`
|
||
RetentionMonths int `json:"retentionMonths"`
|
||
|
||
// 查询配置
|
||
QueryTimeout int `json:"queryTimeout"`
|
||
EnableCache bool `json:"enableCache"`
|
||
CacheTTL int `json:"cacheTTL"`
|
||
}
|
||
|
||
// ArchiveManager 归档管理器
|
||
type ArchiveManager struct {
|
||
config *QueryLogConfig
|
||
dbPath string
|
||
archiveDir string
|
||
currentSize int64
|
||
sizeThreshold int64
|
||
mu sync.Mutex
|
||
isArchiving bool
|
||
|
||
// 定时任务
|
||
monthTicker *time.Ticker
|
||
sizeTicker *time.Ticker
|
||
stopChan chan struct{}
|
||
|
||
// 元数据缓存
|
||
metadataCache map[string]*ArchiveMetadata
|
||
cacheMutex sync.RWMutex
|
||
}
|
||
|
||
// ArchiveMetadata 归档元数据
|
||
type ArchiveMetadata struct {
|
||
ArchiveDate time.Time `json:"archiveDate"`
|
||
Month string `json:"month"` // 格式:2026-04
|
||
FilePath string `json:"filePath"`
|
||
OriginalSize int64 `json:"originalSize"`
|
||
CompressedSize int64 `json:"compressedSize"`
|
||
RecordCount int64 `json:"recordCount"`
|
||
StartTime time.Time `json:"startTime"`
|
||
EndTime time.Time `json:"endTime"`
|
||
CompressionLevel int `json:"compressionLevel"`
|
||
Checksum string `json:"checksum"`
|
||
}
|
||
|
||
// NewArchiveManager 创建归档管理器
|
||
func NewArchiveManager(config *QueryLogConfig, dbPath string) (*ArchiveManager, error) {
|
||
am := &ArchiveManager{
|
||
config: config,
|
||
dbPath: dbPath,
|
||
archiveDir: config.ArchiveDir,
|
||
sizeThreshold: int64(config.MaxDatabaseSizeMB) * 1024 * 1024,
|
||
metadataCache: make(map[string]*ArchiveMetadata),
|
||
stopChan: make(chan struct{}),
|
||
}
|
||
|
||
// 创建归档目录
|
||
if err := os.MkdirAll(am.archiveDir, 0755); err != nil {
|
||
return nil, fmt.Errorf("创建归档目录失败:%w", err)
|
||
}
|
||
|
||
// 加载现有元数据
|
||
if err := am.loadAllMetadata(); err != nil {
|
||
logger.Warn("加载元数据失败", "error", err)
|
||
}
|
||
|
||
// 获取当前数据库大小
|
||
if size, err := am.GetDatabaseSize(); err == nil {
|
||
am.currentSize = size
|
||
}
|
||
|
||
return am, nil
|
||
}
|
||
|
||
// StartWatching 启动监控(大小 + 月度归档)
|
||
func (am *ArchiveManager) StartWatching() {
|
||
// 启动大小监控(每分钟检查一次)
|
||
am.sizeTicker = time.NewTicker(1 * time.Minute)
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-am.sizeTicker.C:
|
||
am.CheckAndArchive()
|
||
case <-am.stopChan:
|
||
am.sizeTicker.Stop()
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 启动月度归档监控(每天检查一次)
|
||
am.monthTicker = time.NewTicker(24 * time.Hour)
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-am.monthTicker.C:
|
||
// 检查是否是每月 1 号
|
||
now := time.Now()
|
||
if now.Day() == 1 && now.Hour() == 0 && now.Minute() == 0 {
|
||
am.ArchiveByMonth()
|
||
}
|
||
case <-am.stopChan:
|
||
am.monthTicker.Stop()
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
logger.Info("归档管理器监控启动")
|
||
}
|
||
|
||
// StopWatching 停止监控
|
||
func (am *ArchiveManager) StopWatching() {
|
||
close(am.stopChan)
|
||
if am.sizeTicker != nil {
|
||
am.sizeTicker.Stop()
|
||
}
|
||
if am.monthTicker != nil {
|
||
am.monthTicker.Stop()
|
||
}
|
||
logger.Info("归档管理器监控停止")
|
||
}
|
||
|
||
// CheckAndArchive 检查是否需要归档(大小或时间)
|
||
func (am *ArchiveManager) CheckAndArchive() error {
|
||
// 检查数据库大小
|
||
if am.currentSize >= am.sizeThreshold {
|
||
logger.Info("数据库大小达到阈值,触发归档", "size", am.currentSize, "threshold", am.sizeThreshold)
|
||
return am.ArchiveBySize()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ArchiveByMonth 按月归档(每月 1 号执行)
|
||
func (am *ArchiveManager) ArchiveByMonth() error {
|
||
now := time.Now()
|
||
// 只在每月 1 号 00:00 执行
|
||
if now.Day() != 1 || now.Hour() != 0 || now.Minute() != 0 {
|
||
return nil
|
||
}
|
||
|
||
am.mu.Lock()
|
||
defer am.mu.Unlock()
|
||
|
||
if am.isArchiving {
|
||
return fmt.Errorf("正在归档中")
|
||
}
|
||
|
||
am.isArchiving = true
|
||
defer func() { am.isArchiving = false }()
|
||
|
||
// 生成归档文件名(格式:querylog-20260401.db)
|
||
archiveName := fmt.Sprintf("%s-%s.db", am.config.ArchivePrefix, now.Format("20060102"))
|
||
|
||
// 创建月份目录(格式:2026-04)
|
||
monthDir := filepath.Join(am.archiveDir, now.Format("2006-01"))
|
||
if err := os.MkdirAll(monthDir, 0755); err != nil {
|
||
return fmt.Errorf("创建月份目录失败:%w", err)
|
||
}
|
||
|
||
// 执行归档
|
||
archivePath := filepath.Join(monthDir, archiveName)
|
||
if err := am.archiveDatabase(archivePath); err != nil {
|
||
return fmt.Errorf("归档数据库失败:%w", err)
|
||
}
|
||
|
||
// 记录元数据
|
||
metadata := &ArchiveMetadata{
|
||
ArchiveDate: now,
|
||
Month: now.Format("2006-01"),
|
||
FilePath: archivePath + ".gz",
|
||
OriginalSize: am.currentSize,
|
||
CompressionLevel: am.config.CompressionLevel,
|
||
}
|
||
|
||
if err := am.saveMetadata(metadata); err != nil {
|
||
logger.Error("保存元数据失败", "error", err)
|
||
}
|
||
|
||
logger.Info("月度归档完成",
|
||
"month", now.Format("2006-01"),
|
||
"file", archivePath+".gz",
|
||
"size", am.currentSize)
|
||
|
||
return nil
|
||
}
|
||
|
||
// ArchiveBySize 按大小归档
|
||
func (am *ArchiveManager) ArchiveBySize() error {
|
||
am.mu.Lock()
|
||
defer am.mu.Unlock()
|
||
|
||
if am.isArchiving {
|
||
return fmt.Errorf("正在归档中")
|
||
}
|
||
|
||
am.isArchiving = true
|
||
defer func() { am.isArchiving = false }()
|
||
|
||
// 生成归档文件名(使用当前日期)
|
||
now := time.Now()
|
||
archiveName := fmt.Sprintf("%s-%s.db", am.config.ArchivePrefix, now.Format("20060102"))
|
||
|
||
// 创建月份目录
|
||
monthDir := filepath.Join(am.archiveDir, now.Format("2006-01"))
|
||
if err := os.MkdirAll(monthDir, 0755); err != nil {
|
||
return fmt.Errorf("创建月份目录失败:%w", err)
|
||
}
|
||
|
||
// 执行归档
|
||
archivePath := filepath.Join(monthDir, archiveName)
|
||
if err := am.archiveDatabase(archivePath); err != nil {
|
||
return fmt.Errorf("归档数据库失败:%w", err)
|
||
}
|
||
|
||
// 记录元数据
|
||
metadata := &ArchiveMetadata{
|
||
ArchiveDate: now,
|
||
Month: now.Format("2006-01"),
|
||
FilePath: archivePath + ".gz",
|
||
OriginalSize: am.currentSize,
|
||
CompressionLevel: am.config.CompressionLevel,
|
||
}
|
||
|
||
if err := am.saveMetadata(metadata); err != nil {
|
||
logger.Error("保存元数据失败", "error", err)
|
||
}
|
||
|
||
logger.Info("按大小归档完成",
|
||
"file", archivePath+".gz",
|
||
"size", am.currentSize)
|
||
|
||
return nil
|
||
}
|
||
|
||
// archiveDatabase 执行归档操作
|
||
func (am *ArchiveManager) archiveDatabase(archivePath string) error {
|
||
// 1. 压缩归档
|
||
if err := am.compressArchive(am.dbPath, archivePath+".gz"); err != nil {
|
||
return fmt.Errorf("压缩归档失败:%w", err)
|
||
}
|
||
|
||
// 2. 获取压缩后大小
|
||
compressedSize, err := getFileSize(archivePath + ".gz")
|
||
if err != nil {
|
||
logger.Warn("获取压缩文件大小失败", "error", err)
|
||
}
|
||
|
||
// 3. 获取记录数量(通过元数据或查询数据库)
|
||
recordCount := am.getRecordCount()
|
||
|
||
// 4. 更新时间范围
|
||
startTime, endTime := am.getTimeRange()
|
||
|
||
// 5. 更新元数据
|
||
metadata := &ArchiveMetadata{
|
||
FilePath: archivePath + ".gz",
|
||
CompressedSize: compressedSize,
|
||
RecordCount: recordCount,
|
||
StartTime: startTime,
|
||
EndTime: endTime,
|
||
}
|
||
|
||
return am.saveMetadata(metadata)
|
||
}
|
||
|
||
// compressArchive 压缩归档文件
|
||
func (am *ArchiveManager) compressArchive(srcPath, dstPath string) error {
|
||
// 打开源文件
|
||
srcFile, err := os.Open(srcPath)
|
||
if err != nil {
|
||
return fmt.Errorf("打开源文件失败:%w", err)
|
||
}
|
||
defer srcFile.Close()
|
||
|
||
// 创建目标文件
|
||
dstFile, err := os.Create(dstPath)
|
||
if err != nil {
|
||
return fmt.Errorf("创建目标文件失败:%w", err)
|
||
}
|
||
defer dstFile.Close()
|
||
|
||
// 创建 gzip 写入器
|
||
gzWriter, err := gzip.NewWriterLevel(dstFile, am.config.CompressionLevel)
|
||
if err != nil {
|
||
return fmt.Errorf("创建 gzip 写入器失败:%w", err)
|
||
}
|
||
defer gzWriter.Close()
|
||
|
||
// 复制数据
|
||
_, err = io.Copy(gzWriter, srcFile)
|
||
if err != nil {
|
||
return fmt.Errorf("压缩数据失败:%w", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// saveMetadata 保存元数据
|
||
func (am *ArchiveManager) saveMetadata(metadata *ArchiveMetadata) error {
|
||
if metadata.Month == "" {
|
||
metadata.Month = time.Now().Format("2006-01")
|
||
}
|
||
|
||
// 元数据文件路径
|
||
metaPath := filepath.Join(am.archiveDir, metadata.Month,
|
||
filepath.Base(metadata.FilePath)+".meta.json")
|
||
|
||
// 写入元数据文件
|
||
data, err := json.MarshalIndent(metadata, "", " ")
|
||
if err != nil {
|
||
return fmt.Errorf("序列化元数据失败:%w", err)
|
||
}
|
||
|
||
if err := os.WriteFile(metaPath, data, 0644); err != nil {
|
||
return fmt.Errorf("写入元数据文件失败:%w", err)
|
||
}
|
||
|
||
// 更新缓存
|
||
am.cacheMutex.Lock()
|
||
key := metadata.Month + "_" + filepath.Base(metadata.FilePath)
|
||
am.metadataCache[key] = metadata
|
||
am.cacheMutex.Unlock()
|
||
|
||
return nil
|
||
}
|
||
|
||
// loadMetadata 加载元数据
|
||
func (am *ArchiveManager) loadMetadata(month, fileName string) (*ArchiveMetadata, error) {
|
||
key := month + "_" + fileName
|
||
|
||
// 先查缓存
|
||
am.cacheMutex.RLock()
|
||
if meta, ok := am.metadataCache[key]; ok {
|
||
am.cacheMutex.RUnlock()
|
||
return meta, nil
|
||
}
|
||
am.cacheMutex.RUnlock()
|
||
|
||
// 从文件加载
|
||
metaPath := filepath.Join(am.archiveDir, month, fileName+".meta.json")
|
||
data, err := os.ReadFile(metaPath)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("读取元数据文件失败:%w", err)
|
||
}
|
||
|
||
var metadata ArchiveMetadata
|
||
if err := json.Unmarshal(data, &metadata); err != nil {
|
||
return nil, fmt.Errorf("解析元数据失败:%w", err)
|
||
}
|
||
|
||
// 更新缓存
|
||
am.cacheMutex.Lock()
|
||
am.metadataCache[key] = &metadata
|
||
am.cacheMutex.Unlock()
|
||
|
||
return &metadata, nil
|
||
}
|
||
|
||
// loadAllMetadata 加载所有元数据
|
||
func (am *ArchiveManager) loadAllMetadata() error {
|
||
// 遍历归档目录
|
||
months, err := os.ReadDir(am.archiveDir)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
return nil // 目录不存在,正常
|
||
}
|
||
return fmt.Errorf("读取归档目录失败:%w", err)
|
||
}
|
||
|
||
for _, monthDir := range months {
|
||
if !monthDir.IsDir() {
|
||
continue
|
||
}
|
||
|
||
month := monthDir.Name()
|
||
files, err := os.ReadDir(filepath.Join(am.archiveDir, month))
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
for _, file := range files {
|
||
if filepath.Ext(file.Name()) == ".meta.json" {
|
||
fileName := file.Name()[:len(file.Name())-len(".meta.json")]
|
||
_, err := am.loadMetadata(month, fileName)
|
||
if err != nil {
|
||
logger.Warn("加载元数据失败", "file", file.Name(), "error", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
logger.Info("元数据加载完成", "count", len(am.metadataCache))
|
||
return nil
|
||
}
|
||
|
||
// GetArchiveList 获取所有归档文件列表
|
||
func (am *ArchiveManager) GetArchiveList() ([]ArchiveMetadata, error) {
|
||
am.cacheMutex.RLock()
|
||
defer am.cacheMutex.RUnlock()
|
||
|
||
archives := make([]ArchiveMetadata, 0, len(am.metadataCache))
|
||
for _, meta := range am.metadataCache {
|
||
archives = append(archives, *meta)
|
||
}
|
||
|
||
// 按归档日期排序
|
||
for i := 0; i < len(archives)-1; i++ {
|
||
for j := i + 1; j < len(archives); j++ {
|
||
if archives[i].ArchiveDate.Before(archives[j].ArchiveDate) {
|
||
archives[i], archives[j] = archives[j], archives[i]
|
||
}
|
||
}
|
||
}
|
||
|
||
return archives, nil
|
||
}
|
||
|
||
// GetDatabaseSize 获取当前数据库大小
|
||
func (am *ArchiveManager) GetDatabaseSize() (int64, error) {
|
||
return getFileSize(am.dbPath)
|
||
}
|
||
|
||
// getRecordCount 获取记录数量
|
||
func (am *ArchiveManager) getRecordCount() int64 {
|
||
// 这里简化处理,返回 0
|
||
// 实际实现需要查询数据库
|
||
return 0
|
||
}
|
||
|
||
// getTimeRange 获取时间范围
|
||
func (am *ArchiveManager) getTimeRange() (time.Time, time.Time) {
|
||
now := time.Now()
|
||
// 简化处理,返回当前时间
|
||
return now, now
|
||
}
|
||
|
||
// getFileSize 获取文件大小
|
||
func getFileSize(path string) (int64, error) {
|
||
info, err := os.Stat(path)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
return info.Size(), nil
|
||
}
|
||
|
||
// CleanupOldArchives 清理旧归档
|
||
func (am *ArchiveManager) CleanupOldArchives() (int64, error) {
|
||
am.mu.Lock()
|
||
defer am.mu.Unlock()
|
||
|
||
archives, err := am.GetArchiveList()
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
now := time.Now()
|
||
deleted := int64(0)
|
||
|
||
for _, archive := range archives {
|
||
shouldDelete := false
|
||
|
||
// 按天数清理
|
||
if am.config.RetentionDays > 0 {
|
||
age := now.Sub(archive.ArchiveDate)
|
||
if age > time.Duration(am.config.RetentionDays)*24*time.Hour {
|
||
shouldDelete = true
|
||
}
|
||
}
|
||
|
||
// 按月数清理
|
||
if am.config.RetentionMonths > 0 {
|
||
months := int(now.Sub(archive.ArchiveDate).Hours() / 24 / 30)
|
||
if months > am.config.RetentionMonths {
|
||
shouldDelete = true
|
||
}
|
||
}
|
||
|
||
if shouldDelete {
|
||
if err := am.deleteArchive(archive); err != nil {
|
||
logger.Error("删除归档失败", "file", archive.FilePath, "error", err)
|
||
} else {
|
||
deleted++
|
||
logger.Info("删除旧归档", "file", archive.FilePath, "month", archive.Month)
|
||
}
|
||
}
|
||
}
|
||
|
||
return deleted, nil
|
||
}
|
||
|
||
// deleteArchive 删除单个归档
|
||
func (am *ArchiveManager) deleteArchive(archive ArchiveMetadata) error {
|
||
// 删除压缩文件
|
||
if err := os.Remove(archive.FilePath); err != nil && !os.IsNotExist(err) {
|
||
return fmt.Errorf("删除归档文件失败:%w", err)
|
||
}
|
||
|
||
// 删除元数据文件
|
||
metaPath := archive.FilePath + ".meta.json"
|
||
if err := os.Remove(metaPath); err != nil && !os.IsNotExist(err) {
|
||
logger.Warn("删除元数据文件失败", "file", metaPath, "error", err)
|
||
}
|
||
|
||
// 从缓存中移除
|
||
am.cacheMutex.Lock()
|
||
key := archive.Month + "_" + filepath.Base(archive.FilePath)
|
||
delete(am.metadataCache, key)
|
||
am.cacheMutex.Unlock()
|
||
|
||
return nil
|
||
}
|