gomod引用
This commit is contained in:
@@ -2,6 +2,7 @@ package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -60,13 +61,24 @@ func (s *OrderStatisticsScheduler) acquireDistributedLock(ctx context.Context) b
|
||||
lockKey := "order_statistics_scheduler_lock"
|
||||
|
||||
// 尝试设置锁,过期时间30秒
|
||||
result, err := g.Redis().SetNX(ctx, lockKey, "locked", time.Second*30)
|
||||
success, err := g.Redis().SetNX(ctx, lockKey, "locked")
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "获取分布式锁失败: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return result.Bool()
|
||||
// 设置过期时间
|
||||
if success {
|
||||
_, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 30)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err)
|
||||
// 删除已设置的锁
|
||||
g.Redis().Do(ctx, "DEL", lockKey)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return success
|
||||
}
|
||||
|
||||
// renewDistributedLock 续期分布式锁
|
||||
@@ -74,7 +86,7 @@ func (s *OrderStatisticsScheduler) renewDistributedLock(ctx context.Context) boo
|
||||
lockKey := "order_statistics_scheduler_lock"
|
||||
|
||||
// 续期30秒
|
||||
_, err := g.Redis().Expire(ctx, lockKey, time.Second*30)
|
||||
_, err := g.Redis().Do(ctx, "EXPIRE", lockKey, 30)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "续期分布式锁失败: %v", err)
|
||||
return false
|
||||
@@ -118,13 +130,24 @@ func (s *OrderStatisticsScheduler) releaseDistributedLock(ctx context.Context) {
|
||||
// acquireTaskLock 获取任务级分布式锁
|
||||
func (s *OrderStatisticsScheduler) acquireTaskLock(ctx context.Context, lockKey string) bool {
|
||||
// 尝试设置锁,过期时间10分钟
|
||||
result, err := g.Redis().SetNX(ctx, lockKey, "locked", time.Minute*10)
|
||||
success, err := g.Redis().SetNX(ctx, lockKey, "locked")
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "获取任务锁失败: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return result.Bool()
|
||||
// 设置过期时间
|
||||
if success {
|
||||
_, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 600) // 10分钟=600秒
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err)
|
||||
// 删除已设置的锁
|
||||
g.Redis().Do(ctx, "DEL", lockKey)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return success
|
||||
}
|
||||
|
||||
// releaseTaskLock 释放任务级分布式锁
|
||||
@@ -262,15 +285,14 @@ func (s *OrderStatisticsScheduler) generateYesterdayDailyReport(ctx context.Cont
|
||||
|
||||
// generateTenantDailyReport 生成指定租户的日报表
|
||||
func (s *OrderStatisticsScheduler) generateTenantDailyReport(ctx context.Context, tenantID int64, date time.Time) {
|
||||
lockKey := g.NewVar()
|
||||
lockKey.Setf("order_stats_daily_%d_%s", tenantID, date.Format("2006-01-02"))
|
||||
lockKey := fmt.Sprintf("order_stats_daily_%d_%s", tenantID, date.Format("2006-01-02"))
|
||||
|
||||
// 获取任务锁
|
||||
if !s.acquireTaskLock(ctx, lockKey.String()) {
|
||||
if !s.acquireTaskLock(ctx, lockKey) {
|
||||
g.Log().Infof(ctx, "租户 %d 的日统计任务正在执行,跳过", tenantID)
|
||||
return
|
||||
}
|
||||
defer s.releaseTaskLock(ctx, lockKey.String())
|
||||
defer s.releaseTaskLock(ctx, lockKey)
|
||||
|
||||
g.Log().Infof(ctx, "开始生成租户 %d 的日统计: %s", tenantID, date.Format("2006-01-02"))
|
||||
|
||||
@@ -306,15 +328,14 @@ func (s *OrderStatisticsScheduler) generateLastMonthReport(ctx context.Context)
|
||||
|
||||
// generateTenantMonthlyReport 生成指定租户的月报表
|
||||
func (s *OrderStatisticsScheduler) generateTenantMonthlyReport(ctx context.Context, tenantID int64, year int, month int) {
|
||||
lockKey := g.NewVar()
|
||||
lockKey.Setf("order_stats_monthly_%d_%d_%d", tenantID, year, month)
|
||||
lockKey := fmt.Sprintf("order_stats_monthly_%d_%d_%d", tenantID, year, month)
|
||||
|
||||
// 获取任务锁
|
||||
if !s.acquireTaskLock(ctx, lockKey.String()) {
|
||||
if !s.acquireTaskLock(ctx, lockKey) {
|
||||
g.Log().Infof(ctx, "租户 %d 的月统计任务正在执行,跳过", tenantID)
|
||||
return
|
||||
}
|
||||
defer s.releaseTaskLock(ctx, lockKey.String())
|
||||
defer s.releaseTaskLock(ctx, lockKey)
|
||||
|
||||
g.Log().Infof(ctx, "开始生成租户 %d 的月统计: %d年%d月", tenantID, year, month)
|
||||
|
||||
@@ -350,15 +371,14 @@ func (s *OrderStatisticsScheduler) generateLastQuarterReport(ctx context.Context
|
||||
|
||||
// generateTenantQuarterlyReport 生成指定租户的季度报表
|
||||
func (s *OrderStatisticsScheduler) generateTenantQuarterlyReport(ctx context.Context, tenantID int64, year int, quarter int) {
|
||||
lockKey := g.NewVar()
|
||||
lockKey.Setf("order_stats_quarterly_%d_%d_%d", tenantID, year, quarter)
|
||||
lockKey := fmt.Sprintf("order_stats_quarterly_%d_%d_%d", tenantID, year, quarter)
|
||||
|
||||
// 获取任务锁
|
||||
if !s.acquireTaskLock(ctx, lockKey.String()) {
|
||||
if !s.acquireTaskLock(ctx, lockKey) {
|
||||
g.Log().Infof(ctx, "租户 %d 的季度统计任务正在执行,跳过", tenantID)
|
||||
return
|
||||
}
|
||||
defer s.releaseTaskLock(ctx, lockKey.String())
|
||||
defer s.releaseTaskLock(ctx, lockKey)
|
||||
|
||||
g.Log().Infof(ctx, "开始生成租户 %d 的季度统计: %d年第%d季度", tenantID, year, quarter)
|
||||
|
||||
@@ -392,15 +412,14 @@ func (s *OrderStatisticsScheduler) generateLastYearReport(ctx context.Context) {
|
||||
|
||||
// generateTenantYearlyReport 生成指定租户的年报表
|
||||
func (s *OrderStatisticsScheduler) generateTenantYearlyReport(ctx context.Context, tenantID int64, year int) {
|
||||
lockKey := g.NewVar()
|
||||
lockKey.Setf("order_stats_yearly_%d_%d", tenantID, year)
|
||||
lockKey := fmt.Sprintf("order_stats_yearly_%d_%d", tenantID, year)
|
||||
|
||||
// 获取任务锁
|
||||
if !s.acquireTaskLock(ctx, lockKey.String()) {
|
||||
if !s.acquireTaskLock(ctx, lockKey) {
|
||||
g.Log().Infof(ctx, "租户 %d 的年统计任务正在执行,跳过", tenantID)
|
||||
return
|
||||
}
|
||||
defer s.releaseTaskLock(ctx, lockKey.String())
|
||||
defer s.releaseTaskLock(ctx, lockKey)
|
||||
|
||||
g.Log().Infof(ctx, "开始生成租户 %d 的年统计: %d年", tenantID, year)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user