red_packet_scheduler.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package scheduler
  2. import (
  3. "fmt"
  4. model "go_server/model/biz_modules/app"
  5. "go_server/service/app"
  6. "sync"
  7. "time"
  8. "github.com/robfig/cron/v3"
  9. "go_server/base/core"
  10. "gorm.io/gorm"
  11. )
  12. // 秒级 cron 解析器(与 cron.WithSeconds() 一致)
  13. var secondParser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
  14. // RedPacketScheduler 红包定时任务调度器
  15. type RedPacketScheduler struct {
  16. cron *cron.Cron
  17. db *gorm.DB
  18. jobs map[int64]cron.EntryID // configId -> entryID
  19. jobCrons map[int64]string // configId -> cronExpr(用于检测变化)
  20. jobsMutex sync.RWMutex
  21. service *app.TgRedPacketSendService
  22. }
  23. var (
  24. scheduler *RedPacketScheduler
  25. once sync.Once
  26. )
  27. // GetScheduler 获取调度器单例
  28. func GetScheduler(db *gorm.DB) *RedPacketScheduler {
  29. once.Do(func() {
  30. scheduler = &RedPacketScheduler{
  31. cron: cron.New(cron.WithSeconds()), // 支持秒级
  32. db: db,
  33. jobs: make(map[int64]cron.EntryID),
  34. jobCrons: make(map[int64]string),
  35. service: &app.TgRedPacketSendService{},
  36. }
  37. })
  38. return scheduler
  39. }
  40. // Start 启动调度器
  41. func (s *RedPacketScheduler) Start() error {
  42. core.Log.Info("🕐 启动红包定时任务调度器...")
  43. // 加载所有启用的定时配置
  44. if err := s.LoadAllConfigs(); err != nil {
  45. return fmt.Errorf("加载定时配置失败: %v", err)
  46. }
  47. // 启动 cron
  48. s.cron.Start()
  49. // 启动配置监听器(每分钟检查一次配置变化)
  50. go s.watchConfigChanges()
  51. core.Log.Info("✅ 红包定时任务调度器启动成功")
  52. return nil
  53. }
  54. // Stop 停止调度器
  55. func (s *RedPacketScheduler) Stop() {
  56. core.Log.Info("🛑 停止红包定时任务调度器...")
  57. s.cron.Stop()
  58. }
  59. // LoadAllConfigs 加载所有启用的定时配置
  60. func (s *RedPacketScheduler) LoadAllConfigs() error {
  61. var configs []model.TgRedPacketConfig
  62. // 查询所有启用的定时红包配置
  63. err := s.db.Where("config_type = ? AND status = ?", 1, 1).Find(&configs).Error
  64. if err != nil {
  65. return err
  66. }
  67. core.Log.Infof("📋 加载到 %d 个定时红包配置", len(configs))
  68. // 添加到调度器
  69. for _, config := range configs {
  70. if err := s.AddJob(&config); err != nil {
  71. core.Log.Errorf("添加定时任务失败 [ID:%d]: %v", config.Id, err)
  72. continue
  73. }
  74. }
  75. return nil
  76. }
  77. // AddJob 添加定时任务
  78. func (s *RedPacketScheduler) AddJob(config *model.TgRedPacketConfig) error {
  79. s.jobsMutex.Lock()
  80. defer s.jobsMutex.Unlock()
  81. return s.addJobLocked(config)
  82. }
  83. // addJobLocked 添加定时任务(需要提前加锁)
  84. func (s *RedPacketScheduler) addJobLocked(config *model.TgRedPacketConfig) error {
  85. // 检查是否已存在
  86. if _, exists := s.jobs[config.Id]; exists {
  87. s.removeJobLocked(config.Id)
  88. }
  89. // 验证 Cron 表达式
  90. if config.CronExpr == "" {
  91. return fmt.Errorf("Cron 表达式为空")
  92. }
  93. // 添加任务
  94. entryID, err := s.cron.AddFunc(config.CronExpr, func() {
  95. s.executeJob(config.Id)
  96. })
  97. if err != nil {
  98. return fmt.Errorf("添加 Cron 任务失败: %v", err)
  99. }
  100. s.jobs[config.Id] = entryID
  101. s.jobCrons[config.Id] = config.CronExpr
  102. core.Log.Infof("✅ 添加定时任务 [ID:%d, Name:%s, Cron:%s]",
  103. config.Id, config.ConfigName, config.CronExpr)
  104. return nil
  105. }
  106. // RemoveJob 移除定时任务
  107. func (s *RedPacketScheduler) RemoveJob(configId int64) {
  108. s.jobsMutex.Lock()
  109. defer s.jobsMutex.Unlock()
  110. s.removeJobLocked(configId)
  111. }
  112. // removeJobLocked 移除定时任务(需要提前加锁)
  113. func (s *RedPacketScheduler) removeJobLocked(configId int64) {
  114. if entryID, exists := s.jobs[configId]; exists {
  115. s.cron.Remove(entryID)
  116. delete(s.jobs, configId)
  117. delete(s.jobCrons, configId)
  118. core.Log.Infof("🗑️ 移除定时任务 [ID:%d]", configId)
  119. }
  120. }
  121. // executeJob 执行定时任务
  122. func (s *RedPacketScheduler) executeJob(configId int64) {
  123. core.Log.Infof("⏰ 执行定时红包任务 [ID:%d]", configId)
  124. // 查询配置
  125. var config model.TgRedPacketConfig
  126. if err := s.db.Where("id = ?", configId).First(&config).Error; err != nil {
  127. core.Log.Errorf("查询配置失败 [ID:%d]: %v", configId, err)
  128. return
  129. }
  130. // 检查状态
  131. if config.Status != 1 {
  132. core.Log.Warnf("配置已禁用,跳过执行 [ID:%d]", configId)
  133. return
  134. }
  135. // 使用配置的时区做时间比较
  136. now := time.Now()
  137. if config.TimeZone != "" {
  138. if loc, err := time.LoadLocation(config.TimeZone); err == nil {
  139. now = now.In(loc)
  140. }
  141. }
  142. if config.StartDate != nil && !config.StartDate.IsZero() && now.Before(*config.StartDate) {
  143. core.Log.Infof("未到开始时间,跳过执行 [ID:%d, StartDate:%v, Now:%v]", configId, config.StartDate, now)
  144. return
  145. }
  146. if config.EndDate != nil && !config.EndDate.IsZero() && now.After(*config.EndDate) {
  147. core.Log.Infof("已过结束时间,停止任务 [ID:%d, EndDate:%v, Now:%v]", configId, config.EndDate, now)
  148. s.RemoveJob(configId)
  149. return
  150. }
  151. // 调用发送服务
  152. s.service.SetDbAlias("app")
  153. packetNo, err := s.service.ExecuteSendRedPacket(&config)
  154. if err != nil {
  155. core.Log.Errorf("发送红包失败 [ID:%d]: %v", configId, err)
  156. return
  157. }
  158. // 更新执行统计
  159. updates := map[string]interface{}{
  160. "last_exec_time": time.Now(),
  161. "exec_count": config.ExecCount + 1,
  162. }
  163. // 计算下次执行时间(使用秒级解析器,与 cron.WithSeconds() 一致)
  164. schedule, err := secondParser.Parse(config.CronExpr)
  165. if err == nil {
  166. nextTime := schedule.Next(time.Now())
  167. updates["next_exec_time"] = nextTime
  168. } else {
  169. core.Log.Warnf("解析 Cron 表达式失败 [ID:%d, Cron:%s]: %v", configId, config.CronExpr, err)
  170. }
  171. s.db.Model(&model.TgRedPacketConfig{}).Where("id = ?", configId).Updates(updates)
  172. core.Log.Infof("✅ 定时红包发送成功 [ID:%d, PacketNo:%s]", configId, packetNo)
  173. }
  174. // watchConfigChanges 监听配置变化
  175. func (s *RedPacketScheduler) watchConfigChanges() {
  176. ticker := time.NewTicker(1 * time.Minute)
  177. defer ticker.Stop()
  178. for range ticker.C {
  179. s.reloadConfigs()
  180. }
  181. }
  182. // reloadConfigs 重新加载配置
  183. func (s *RedPacketScheduler) reloadConfigs() {
  184. var configs []model.TgRedPacketConfig
  185. // 查询所有启用的定时配置
  186. err := s.db.Where("config_type = ? AND status = ?", 1, 1).Find(&configs).Error
  187. if err != nil {
  188. core.Log.Errorf("重新加载配置失败: %v", err)
  189. return
  190. }
  191. s.jobsMutex.Lock()
  192. defer s.jobsMutex.Unlock()
  193. // 构建当前应该存在的任务 ID 集合
  194. activeIds := make(map[int64]bool)
  195. for _, config := range configs {
  196. activeIds[config.Id] = true
  197. existingCron, exists := s.jobCrons[config.Id]
  198. if !exists {
  199. // 任务不存在,添加
  200. if err := s.addJobLocked(&config); err != nil {
  201. core.Log.Errorf("重新加载-添加定时任务失败 [ID:%d]: %v", config.Id, err)
  202. }
  203. } else if existingCron != config.CronExpr {
  204. // Cron 表达式变了,重新注册
  205. core.Log.Infof("🔄 检测到 Cron 变化 [ID:%d]: %s -> %s", config.Id, existingCron, config.CronExpr)
  206. if err := s.addJobLocked(&config); err != nil {
  207. core.Log.Errorf("重新加载-更新定时任务失败 [ID:%d]: %v", config.Id, err)
  208. }
  209. }
  210. }
  211. // 移除不再需要的任务
  212. for id := range s.jobs {
  213. if !activeIds[id] {
  214. s.removeJobLocked(id)
  215. }
  216. }
  217. }