gomod引用

This commit is contained in:
2025-12-12 16:20:47 +08:00
parent 0393931e91
commit a4ba4dd715
27 changed files with 2886 additions and 87 deletions

View File

@@ -0,0 +1,462 @@
package scheduler
import (
"context"
"sync"
"time"
"order/service"
"github.com/gogf/gf/v2/frame/g"
)
// OrderStatisticsScheduler 订单统计定时任务调度器
type OrderStatisticsScheduler struct{}
var OrderStatisticsSchedulerInstance = &OrderStatisticsScheduler{}
var schedulerLock sync.Mutex
var isSchedulerRunning bool
// StartScheduler 启动定时任务调度器(分布式安全)
func (s *OrderStatisticsScheduler) StartScheduler(ctx context.Context) error {
schedulerLock.Lock()
defer schedulerLock.Unlock()
// 检查是否已经有调度器在运行(分布式部署时避免重复执行)
if isSchedulerRunning {
g.Log().Info(ctx, "订单统计定时任务调度器已在运行")
return nil
}
// 尝试获取分布式锁
if !s.acquireDistributedLock(ctx) {
g.Log().Info(ctx, "其他节点正在运行订单统计定时任务,当前节点跳过")
return nil
}
isSchedulerRunning = true
// 启动锁续期任务
go s.startLockRenewal(ctx)
// 启动日报表生成任务每天凌晨3点执行
go s.startDailyReportScheduler(ctx)
// 启动月报表生成任务每月1日凌晨4点执行
go s.startMonthlyReportScheduler(ctx)
// 启动季度报表生成任务每季度首月5日凌晨5点执行
go s.startQuarterlyReportScheduler(ctx)
// 启动年报表生成任务每年1月10日凌晨6点执行
go s.startYearlyReportScheduler(ctx)
g.Log().Info(ctx, "订单统计定时任务调度器已启动")
return nil
}
// acquireDistributedLock 获取分布式锁
func (s *OrderStatisticsScheduler) acquireDistributedLock(ctx context.Context) bool {
lockKey := "order_statistics_scheduler_lock"
// 尝试设置锁过期时间30秒
result, err := g.Redis().SetNX(ctx, lockKey, "locked", time.Second*30)
if err != nil {
g.Log().Errorf(ctx, "获取分布式锁失败: %v", err)
return false
}
return result.Bool()
}
// renewDistributedLock 续期分布式锁
func (s *OrderStatisticsScheduler) renewDistributedLock(ctx context.Context) bool {
lockKey := "order_statistics_scheduler_lock"
// 续期30秒
_, err := g.Redis().Expire(ctx, lockKey, time.Second*30)
if err != nil {
g.Log().Errorf(ctx, "续期分布式锁失败: %v", err)
return false
}
return true
}
// startLockRenewal 启动锁续期任务
func (s *OrderStatisticsScheduler) startLockRenewal(ctx context.Context) {
ticker := time.NewTicker(20 * time.Second) // 每20秒续期一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !s.renewDistributedLock(ctx) {
g.Log().Error(ctx, "分布式锁续期失败,停止调度器")
schedulerLock.Lock()
isSchedulerRunning = false
schedulerLock.Unlock()
return
}
case <-ctx.Done():
// 释放分布式锁
s.releaseDistributedLock(ctx)
return
}
}
}
// releaseDistributedLock 释放分布式锁
func (s *OrderStatisticsScheduler) releaseDistributedLock(ctx context.Context) {
lockKey := "order_statistics_scheduler_lock"
_, err := g.Redis().Do(ctx, "DEL", lockKey)
if err != nil {
g.Log().Errorf(ctx, "释放分布式锁失败: %v", err)
}
}
// acquireTaskLock 获取任务级分布式锁
func (s *OrderStatisticsScheduler) acquireTaskLock(ctx context.Context, lockKey string) bool {
// 尝试设置锁过期时间10分钟
result, err := g.Redis().SetNX(ctx, lockKey, "locked", time.Minute*10)
if err != nil {
g.Log().Errorf(ctx, "获取任务锁失败: %v", err)
return false
}
return result.Bool()
}
// releaseTaskLock 释放任务级分布式锁
func (s *OrderStatisticsScheduler) releaseTaskLock(ctx context.Context, lockKey string) {
_, err := g.Redis().Do(ctx, "DEL", lockKey)
if err != nil {
g.Log().Errorf(ctx, "释放任务锁失败: %v", err)
}
}
// startDailyReportScheduler 日报表定时任务
func (s *OrderStatisticsScheduler) startDailyReportScheduler(ctx context.Context) {
// 计算到凌晨3点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day()+1, 3, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到凌晨3点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
// 立即执行一次昨天的日报表生成
go s.generateYesterdayDailyReport(ctx)
for {
select {
case <-ticker.C:
// 生成昨天的日报表
s.generateYesterdayDailyReport(ctx)
case <-ctx.Done():
return
}
}
}
// startMonthlyReportScheduler 月报表定时任务
func (s *OrderStatisticsScheduler) startMonthlyReportScheduler(ctx context.Context) {
// 计算到下个月1日凌晨4点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month()+1, 1, 4, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个月1日凌晨4点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是每月1日如果是则生成上个月的月报表
if time.Now().Day() == 1 {
go s.generateLastMonthReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startQuarterlyReportScheduler 季度报表定时任务
func (s *OrderStatisticsScheduler) startQuarterlyReportScheduler(ctx context.Context) {
// 计算到下个季度第一天凌晨5点的时间
now := time.Now()
nextQuarter := s.getNextQuarterFirstDay(now)
next := time.Date(nextQuarter.Year(), nextQuarter.Month(), nextQuarter.Day(), 5, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个季度第一天凌晨5点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是季度第一天,如果是则生成上个季度的季度报表
if s.isQuarterFirstDay() {
go s.generateLastQuarterReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startYearlyReportScheduler 年报表定时任务
func (s *OrderStatisticsScheduler) startYearlyReportScheduler(ctx context.Context) {
// 计算到明年1月1日凌晨6点的时间
now := time.Now()
next := time.Date(now.Year()+1, time.January, 1, 6, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到明年1月1日凌晨6点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是1月1日如果是则生成上一年的年报表
if time.Now().Month() == time.January && time.Now().Day() == 1 {
go s.generateLastYearReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// generateYesterdayDailyReport 生成昨天的日报表
func (s *OrderStatisticsScheduler) generateYesterdayDailyReport(ctx context.Context) {
yesterday := time.Now().AddDate(0, 0, -1)
g.Log().Infof(ctx, "开始生成所有租户的日统计数据: %s", yesterday.Format("2006-01-02"))
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的日统计
for _, tenantID := range tenantIDs {
go s.generateTenantDailyReport(ctx, tenantID, yesterday)
}
}
// generateTenantDailyReport 生成指定租户的日报表
func (s *OrderStatisticsScheduler) generateTenantDailyReport(ctx context.Context, tenantID int64, date time.Time) {
lockKey := g.NewVar()
lockKey.Setf("order_stats_daily_%d_%s", tenantID, date.Format("2006-01-02"))
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey.String()) {
g.Log().Infof(ctx, "租户 %d 的日统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey.String())
g.Log().Infof(ctx, "开始生成租户 %d 的日统计: %s", tenantID, date.Format("2006-01-02"))
err := service.OrderStatistics.GenerateDailyStatistics(ctx, tenantID, date, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 日统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 日统计生成成功: %s", tenantID, date.Format("2006-01-02"))
}
// generateLastMonthReport 生成上个月的月报表
func (s *OrderStatisticsScheduler) generateLastMonthReport(ctx context.Context) {
lastMonth := time.Now().AddDate(0, -1, 0)
year := lastMonth.Year()
month := int(lastMonth.Month())
g.Log().Infof(ctx, "开始生成所有租户的月统计数据: %d年%d月", year, month)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的月统计
for _, tenantID := range tenantIDs {
go s.generateTenantMonthlyReport(ctx, tenantID, year, month)
}
}
// generateTenantMonthlyReport 生成指定租户的月报表
func (s *OrderStatisticsScheduler) generateTenantMonthlyReport(ctx context.Context, tenantID int64, year int, month int) {
lockKey := g.NewVar()
lockKey.Setf("order_stats_monthly_%d_%d_%d", tenantID, year, month)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey.String()) {
g.Log().Infof(ctx, "租户 %d 的月统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey.String())
g.Log().Infof(ctx, "开始生成租户 %d 的月统计: %d年%d月", tenantID, year, month)
err := service.OrderStatistics.GenerateMonthlyStatistics(ctx, tenantID, year, month, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 月统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 月统计生成成功: %d年%d月", tenantID, year, month)
}
// generateLastQuarterReport 生成上个季度的季度报表
func (s *OrderStatisticsScheduler) generateLastQuarterReport(ctx context.Context) {
lastQuarter := time.Now().AddDate(0, -3, 0)
year := lastQuarter.Year()
quarter := s.getQuarter(lastQuarter.Month())
g.Log().Infof(ctx, "开始生成所有租户的季度统计数据: %d年第%d季度", year, quarter)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的季度统计
for _, tenantID := range tenantIDs {
go s.generateTenantQuarterlyReport(ctx, tenantID, year, quarter)
}
}
// generateTenantQuarterlyReport 生成指定租户的季度报表
func (s *OrderStatisticsScheduler) generateTenantQuarterlyReport(ctx context.Context, tenantID int64, year int, quarter int) {
lockKey := g.NewVar()
lockKey.Setf("order_stats_quarterly_%d_%d_%d", tenantID, year, quarter)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey.String()) {
g.Log().Infof(ctx, "租户 %d 的季度统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey.String())
g.Log().Infof(ctx, "开始生成租户 %d 的季度统计: %d年第%d季度", tenantID, year, quarter)
err := service.OrderStatistics.GenerateQuarterlyStatistics(ctx, tenantID, year, quarter, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 季度统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 季度统计生成成功: %d年第%d季度", tenantID, year, quarter)
}
// generateLastYearReport 生成上一年的年报表
func (s *OrderStatisticsScheduler) generateLastYearReport(ctx context.Context) {
lastYear := time.Now().Year() - 1
g.Log().Infof(ctx, "开始生成所有租户的年统计数据: %d年", lastYear)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的年统计
for _, tenantID := range tenantIDs {
go s.generateTenantYearlyReport(ctx, tenantID, lastYear)
}
}
// generateTenantYearlyReport 生成指定租户的年报表
func (s *OrderStatisticsScheduler) generateTenantYearlyReport(ctx context.Context, tenantID int64, year int) {
lockKey := g.NewVar()
lockKey.Setf("order_stats_yearly_%d_%d", tenantID, year)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey.String()) {
g.Log().Infof(ctx, "租户 %d 的年统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey.String())
g.Log().Infof(ctx, "开始生成租户 %d 的年统计: %d年", tenantID, year)
err := service.OrderStatistics.GenerateYearlyStatistics(ctx, tenantID, year, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 年统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 年统计生成成功: %d年", tenantID, year)
}
// getNextQuarterFirstDay 获取下个季度的第一天
func (s *OrderStatisticsScheduler) getNextQuarterFirstDay(now time.Time) time.Time {
quarter := s.getQuarter(now.Month())
if quarter == 4 {
return time.Date(now.Year()+1, time.January, 1, 0, 0, 0, 0, now.Location())
}
return time.Date(now.Year(), time.Month((quarter*3)+1), 1, 0, 0, 0, 0, now.Location())
}
// getQuarter 获取月份对应的季度
func (s *OrderStatisticsScheduler) getQuarter(month time.Month) int {
switch {
case month >= 1 && month <= 3:
return 1
case month >= 4 && month <= 6:
return 2
case month >= 7 && month <= 9:
return 3
default:
return 4
}
}
// isQuarterFirstDay 检查今天是否是季度的第一天
func (s *OrderStatisticsScheduler) isQuarterFirstDay() bool {
now := time.Now()
quarter := s.getQuarter(now.Month())
switch quarter {
case 1:
return now.Day() == 1 && now.Month() == time.January
case 2:
return now.Day() == 1 && now.Month() == time.April
case 3:
return now.Day() == 1 && now.Month() == time.July
case 4:
return now.Day() == 1 && now.Month() == time.October
default:
return false
}
}
// getAllTenants 获取所有租户ID这里需要根据实际业务实现
func (s *OrderStatisticsScheduler) getAllTenants(ctx context.Context) ([]int64, error) {
// 这里应该从实际的租户管理服务获取租户列表
// 暂时返回一个默认的租户ID实际使用时需要替换为真实的租户获取逻辑
return []int64{1, 2, 3}, nil
}