red_packet_scheduler.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package scheduler
  2. import (
  3. "fmt"
  4. model "go_server/model/biz_modules/app"
  5. "go_server/service/app"
  6. "sync"
  7. "time"
  8. "github.com/robfig/cron/v3"
  9. "go_server/base/core"
  10. "gorm.io/gorm"
  11. )
  12. // RedPacketScheduler 红包定时任务调度器
  13. type RedPacketScheduler struct {
  14. cron *cron.Cron
  15. db *gorm.DB
  16. jobs map[int64]cron.EntryID // configId -> entryID
  17. jobsMutex sync.RWMutex
  18. service *app.TgRedPacketSendService
  19. }
  20. var (
  21. scheduler *RedPacketScheduler
  22. once sync.Once
  23. )
  24. // GetScheduler 获取调度器单例
  25. func GetScheduler(db *gorm.DB) *RedPacketScheduler {
  26. once.Do(func() {
  27. scheduler = &RedPacketScheduler{
  28. cron: cron.New(cron.WithSeconds()), // 支持秒级
  29. db: db,
  30. jobs: make(map[int64]cron.EntryID),
  31. service: &app.TgRedPacketSendService{},
  32. }
  33. })
  34. return scheduler
  35. }
  36. // Start 启动调度器
  37. func (s *RedPacketScheduler) Start() error {
  38. core.Log.Info("🕐 启动红包定时任务调度器...")
  39. // 加载所有启用的定时配置
  40. if err := s.LoadAllConfigs(); err != nil {
  41. return fmt.Errorf("加载定时配置失败: %v", err)
  42. }
  43. // 启动 cron
  44. s.cron.Start()
  45. // 启动配置监听器(每分钟检查一次配置变化)
  46. go s.watchConfigChanges()
  47. core.Log.Info("✅ 红包定时任务调度器启动成功")
  48. return nil
  49. }
  50. // Stop 停止调度器
  51. func (s *RedPacketScheduler) Stop() {
  52. core.Log.Info("🛑 停止红包定时任务调度器...")
  53. s.cron.Stop()
  54. }
  55. // LoadAllConfigs 加载所有启用的定时配置
  56. func (s *RedPacketScheduler) LoadAllConfigs() error {
  57. var configs []model.TgRedPacketConfig
  58. // 查询所有启用的定时红包配置
  59. err := s.db.Where("config_type = ? AND status = ?", 1, 1).Find(&configs).Error
  60. if err != nil {
  61. return err
  62. }
  63. core.Log.Infof("📋 加载到 %d 个定时红包配置", len(configs))
  64. // 添加到调度器
  65. for _, config := range configs {
  66. if err := s.AddJob(&config); err != nil {
  67. core.Log.Errorf("添加定时任务失败 [ID:%d]: %v", config.Id, err)
  68. continue
  69. }
  70. }
  71. return nil
  72. }
  73. // AddJob 添加定时任务
  74. func (s *RedPacketScheduler) AddJob(config *model.TgRedPacketConfig) error {
  75. s.jobsMutex.Lock()
  76. defer s.jobsMutex.Unlock()
  77. // 检查是否已存在
  78. if _, exists := s.jobs[config.Id]; exists {
  79. s.RemoveJobLocked(config.Id)
  80. }
  81. // 验证 Cron 表达式
  82. if config.CronExpr == "" {
  83. return fmt.Errorf("Cron 表达式为空")
  84. }
  85. // 添加任务
  86. entryID, err := s.cron.AddFunc(config.CronExpr, func() {
  87. s.executeJob(config.Id)
  88. })
  89. if err != nil {
  90. return fmt.Errorf("添加 Cron 任务失败: %v", err)
  91. }
  92. s.jobs[config.Id] = entryID
  93. core.Log.Infof("✅ 添加定时任务 [ID:%d, Name:%s, Cron:%s]",
  94. config.Id, config.ConfigName, config.CronExpr)
  95. return nil
  96. }
  97. // RemoveJob 移除定时任务
  98. func (s *RedPacketScheduler) RemoveJob(configId int64) {
  99. s.jobsMutex.Lock()
  100. defer s.jobsMutex.Unlock()
  101. s.RemoveJobLocked(configId)
  102. }
  103. // RemoveJobLocked 移除定时任务(需要提前加锁)
  104. func (s *RedPacketScheduler) RemoveJobLocked(configId int64) {
  105. if entryID, exists := s.jobs[configId]; exists {
  106. s.cron.Remove(entryID)
  107. delete(s.jobs, configId)
  108. core.Log.Infof("🗑️ 移除定时任务 [ID:%d]", configId)
  109. }
  110. }
  111. // executeJob 执行定时任务
  112. func (s *RedPacketScheduler) executeJob(configId int64) {
  113. core.Log.Infof("⏰ 执行定时红包任务 [ID:%d]", configId)
  114. // 查询配置
  115. var config model.TgRedPacketConfig
  116. if err := s.db.Where("id = ?", configId).First(&config).Error; err != nil {
  117. core.Log.Errorf("查询配置失败 [ID:%d]: %v", configId, err)
  118. return
  119. }
  120. // 检查状态
  121. if config.Status != 1 {
  122. core.Log.Warnf("配置已禁用,跳过执行 [ID:%d]", configId)
  123. return
  124. }
  125. // 检查时间范围
  126. now := time.Now()
  127. if config.StartDate != nil && now.Unix() < *config.StartDate {
  128. core.Log.Infof("未到开始时间,跳过执行 [ID:%d]", configId)
  129. return
  130. }
  131. if config.EndDate != nil && now.Unix() > *config.EndDate {
  132. core.Log.Infof("已过结束时间,停止任务 [ID:%d]", configId)
  133. s.RemoveJob(configId)
  134. return
  135. }
  136. // 调用发送服务
  137. s.service.SetDbAlias("app")
  138. packetNo, err := s.service.ExecuteSendRedPacket(&config)
  139. if err != nil {
  140. core.Log.Errorf("发送红包失败 [ID:%d]: %v", configId, err)
  141. return
  142. }
  143. // 更新执行统计
  144. nowUnix := now.Unix()
  145. updates := map[string]interface{}{
  146. "last_exec_time": nowUnix,
  147. "exec_count": config.ExecCount + 1,
  148. }
  149. // 计算下次执行时间(仅用于显示)
  150. schedule, _ := cron.ParseStandard(config.CronExpr)
  151. if schedule != nil {
  152. nextTime := schedule.Next(now)
  153. nextUnix := nextTime.Unix()
  154. updates["next_exec_time"] = nextUnix
  155. }
  156. s.db.Model(&model.TgRedPacketConfig{}).Where("id = ?", configId).Updates(updates)
  157. core.Log.Infof("✅ 定时红包发送成功 [ID:%d, PacketNo:%s]", configId, packetNo)
  158. }
  159. // watchConfigChanges 监听配置变化
  160. func (s *RedPacketScheduler) watchConfigChanges() {
  161. ticker := time.NewTicker(1 * time.Minute)
  162. defer ticker.Stop()
  163. for range ticker.C {
  164. s.reloadConfigs()
  165. }
  166. }
  167. // reloadConfigs 重新加载配置
  168. func (s *RedPacketScheduler) reloadConfigs() {
  169. var configs []model.TgRedPacketConfig
  170. // 查询所有启用的定时配置
  171. err := s.db.Where("config_type = ? AND status = ?", 1, 1).Find(&configs).Error
  172. if err != nil {
  173. core.Log.Errorf("重新加载配置失败: %v", err)
  174. return
  175. }
  176. s.jobsMutex.Lock()
  177. defer s.jobsMutex.Unlock()
  178. // 构建当前应该存在的任务 ID 集合
  179. activeIds := make(map[int64]bool)
  180. for _, config := range configs {
  181. activeIds[config.Id] = true
  182. // 如果任务不存在,添加
  183. if _, exists := s.jobs[config.Id]; !exists {
  184. s.jobsMutex.Unlock()
  185. s.AddJob(&config)
  186. s.jobsMutex.Lock()
  187. }
  188. }
  189. // 移除不再需要的任务
  190. for id := range s.jobs {
  191. if !activeIds[id] {
  192. s.RemoveJobLocked(id)
  193. }
  194. }
  195. }