Files
common/message/store.go
qhd 55a6ec0374 重构消息队列连接管理,支持多数据源配置
主要变更:
1. 重构NATS、RabbitMQ和Redis连接管理模块,支持多数据源配置
2. 统一连接管理接口,增加数据源名称参数
3. 优化连接状态检查和错误处理
4. 增加连接池管理和资源清理机制
5. 改进日志输出格式和内容
2026-03-12 08:51:45 +08:00

126 lines
3.3 KiB
Go

// Copyright 2019-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import "fmt"
type RetentionPolicy int
const (
// LimitsPolicy (default) means that messages are retained until any given limit is reached.
// This could be one of MaxMsgs, MaxBytes, or MaxAge.
LimitsPolicy RetentionPolicy = iota
// InterestPolicy specifies that when all known consumers have acknowledged a message it can be removed.
InterestPolicy
// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
WorkQueuePolicy
)
// MarshalJSON 将 RetentionPolicy 序列化为字符串
func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
switch rp {
case LimitsPolicy:
return []byte(`"limits"`), nil
case InterestPolicy:
return []byte(`"interest"`), nil
case WorkQueuePolicy:
return []byte(`"workqueue"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", rp)
}
}
// UnmarshalJSON 将字符串反序列化为 RetentionPolicy
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"limits"`:
*rp = LimitsPolicy
case `"interest"`:
*rp = InterestPolicy
case `"workqueue"`:
*rp = WorkQueuePolicy
default:
return fmt.Errorf("unknown retention policy: %s", string(data))
}
return nil
}
type DiscardPolicy int
const (
// DiscardOld will remove older messages to return to the limits.
DiscardOld = iota
// DiscardNew will error on a StoreMsg call
DiscardNew
)
// MarshalJSON 将 DiscardPolicy 序列化为字符串
func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
switch dp {
case DiscardOld:
return []byte(`"old"`), nil
case DiscardNew:
return []byte(`"new"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", dp)
}
}
// UnmarshalJSON 将字符串反序列化为 DiscardPolicy
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"old"`:
*dp = DiscardOld
case `"new"`:
*dp = DiscardNew
default:
return fmt.Errorf("unknown discard policy: %s", string(data))
}
return nil
}
type StorageType int
const (
// FileStorage specifies on disk, designated by the JetStream config StoreDir.
FileStorage = StorageType(22)
// MemoryStorage specifies in memory only.
MemoryStorage = StorageType(33)
)
// MarshalJSON 将 StorageType 序列化为字符串
func (st StorageType) MarshalJSON() ([]byte, error) {
switch st {
case MemoryStorage:
return []byte(`"memory"`), nil
case FileStorage:
return []byte(`"file"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", st)
}
}
// UnmarshalJSON 将字符串反序列化为 StorageType
func (st *StorageType) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"memory"`:
*st = MemoryStorage
case `"file"`:
*st = FileStorage
default:
return fmt.Errorf("unknown storage type: %s", string(data))
}
return nil
}