Files
common/rabbitmq/queue_setup.go
2026-03-12 08:51:13 +08:00

112 lines
3.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package rabbitmq 提供 RabbitMQ 队列初始化的封装方法
//
// 本文件包含常用队列的声明和绑定逻辑,简化业务层的队列配置代码
package rabbitmq
import (
"context"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
)
// SetupResponseQueue 初始化 RAGFlow 响应队列
//
// 功能:
// 1. 声明持久化队列(从配置文件读取队列名,默认 ragflow.response.queue
// 2. 绑定到 ragflow.response ExchangeTopic 类型)
// 3. 使用通配符 # 匹配所有 routing keyuserId
//
// 参数:
//
// ctx: 上下文
//
// 返回:
//
// err: 错误信息,成功返回 nil
//
// 配置示例config.yml
//
// rabbitmq:
// responseQueue: "ragflow.response.queue" # 可选,默认值
func SetupResponseQueue(ctx context.Context) (err error) {
// 从配置文件读取队列名(支持每个开发者配置独立队列名)
responseQueue := g.Cfg().MustGet(ctx, "rabbitmq.responseQueue", "ragflow.response.queue").String()
// 声明持久化队列(服务器重启后队列仍存在)
if err = DeclareQueue(ctx, &QueueConfig{
Name: responseQueue,
Durable: true, // 持久化,防止数据丢失
}); err != nil {
glog.Errorf(ctx, "声明响应队列失败: %v", err)
return
}
// 绑定队列到 Exchange
// Exchange 类型为 topicrouting key 使用通配符 # 匹配所有 userId
if err = BindQueue(ctx, &BindingConfig{
Queue: responseQueue,
Exchange: "ragflow.response", // RAGFlow 响应 Exchange
RoutingKey: "#", // 通配符,匹配所有消息
}); err != nil {
glog.Errorf(ctx, "绑定响应队列失败: %v", err)
return
}
glog.Infof(ctx, "响应队列已绑定: %s -> ragflow.response (routingKey=#)", responseQueue)
return
}
// SetupDelayedFlushQueue 初始化延时落库队列
//
// 功能:
// 1. 声明延时 Exchangex-delayed-message 插件)
// 2. 声明持久化队列 conversation.flush.queue
// 3. 绑定队列到延时 Exchange
//
// 用途:
//
// 对话缓存延时落库机制的兜底策略
// 当对话少于5句时10分钟后触发延时消息将缓存写入MongoDB
//
// 参数:
//
// ctx: 上下文
//
// 返回:
//
// err: 错误信息,成功返回 nil
//
// 相关:
// - service/conversation_service.go: handleResponse()
// - service/conversation_service.go: handleDelayedFlush()
func SetupDelayedFlushQueue(ctx context.Context) (err error) {
// 声明延时 Exchange需要 RabbitMQ 安装 x-delayed-message 插件)
if err = SetupDelayExchange(ctx, "conversation.flush.delayed"); err != nil {
glog.Warningf(ctx, "声明延时落库 Exchange 失败: %v", err)
return
}
// 声明持久化队列
if err = DeclareQueue(ctx, &QueueConfig{
Name: "conversation.flush.queue",
Durable: true, // 持久化,防止延时消息丢失
}); err != nil {
glog.Warningf(ctx, "声明延时落库 Queue 失败: %v", err)
return
}
// 绑定队列到延时 Exchange
if err = BindQueue(ctx, &BindingConfig{
Queue: "conversation.flush.queue",
Exchange: "conversation.flush.delayed",
RoutingKey: "flush", // 延时落库消息的 routing key
}); err != nil {
glog.Warningf(ctx, "绑定延时落库 Queue 失败: %v", err)
return
}
glog.Info(ctx, "延时落库队列已配置")
return
}