pubsub.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package redisclient
  2. import (
  3. "app/commons/core"
  4. "context"
  5. "encoding/json"
  6. "time"
  7. )
  8. // Publish 发布消息
  9. func Publish(channel string, message interface{}) error {
  10. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  11. defer cancel()
  12. _, err := DefaultClient().Publish(ctx, channel, ObjToJson(message)).Result()
  13. return err
  14. }
  15. func ObjToJson(obj interface{}) string {
  16. if obj == nil {
  17. return ""
  18. }
  19. b, err := json.Marshal(obj)
  20. if err != nil {
  21. return ""
  22. }
  23. return string(b)
  24. }
  25. // Subscribe 订阅消息(支持 Redis 断线重连)
  26. func Subscribe(channel string, handler func(data string) error) error {
  27. ctx, cancel := context.WithCancel(context.Background())
  28. defer cancel()
  29. for {
  30. pubsub := DefaultClient().Subscribe(ctx, channel)
  31. ch := pubsub.Channel()
  32. core.Log.Info("Redis 订阅成功, channel: ", channel)
  33. // 监听消息
  34. for {
  35. select {
  36. case <-ctx.Done(): // 监听外部取消信号,安全退出
  37. _ = pubsub.Close() // 确保关闭 Redis 订阅连接
  38. core.Log.Warn("Redis 订阅退出, channel: ", channel)
  39. return nil
  40. case msg, ok := <-ch: // 监听 Redis 消息
  41. if !ok {
  42. core.Log.Warn("Redis 订阅通道关闭, 5秒后重连, channel: ", channel)
  43. _ = pubsub.Close() // 关闭当前连接,防止遗留连接
  44. time.Sleep(5 * time.Second) // 休眠后重新订阅
  45. break // 退出当前 for 循环,进入重连
  46. }
  47. // 处理收到的消息
  48. if err := handler(msg.Payload); err != nil {
  49. core.Log.Error("消息处理失败, channel: ", channel, ", error: ", err)
  50. } else {
  51. core.Log.Debug("收到 Redis 消息, channel: ", channel, ", message: ", msg.Payload)
  52. }
  53. }
  54. }
  55. }
  56. }