package dao import ( "context" "customer-server/model/entity" "time" "gitea.com/red-future/common/db/mongo" "github.com/gogf/gf/v2/frame/g" "go.mongodb.org/mongo-driver/v2/bson" ) // archive 归档 DAO type archive struct{} // Archive 归档 DAO 单例 var Archive = new(archive) // CopyToTempByRange 将指定时间范围的数据复制到临时表 // startTime: 开始时间(包含),endTime: 结束时间(不包含) func (d *archive) CopyToTempByRange(ctx context.Context, startTime, endTime time.Time) (count int64, err error) { db := mongo.GetDB() // 查询指定时间范围的数据 filter := bson.M{ "createdAt": bson.M{ "$gte": startTime, "$lt": endTime, }, "isDeleted": false, } cursor, err := db.Collection(entity.ConversationCollection).Find(ctx, filter) if err != nil { return } defer cursor.Close(ctx) // 批量插入临时表 batchSize := g.Cfg().MustGet(ctx, "archive.mongoBatchSize", 1000).Int() var docs []interface{} for cursor.Next(ctx) { var conv entity.Conversation if err = cursor.Decode(&conv); err != nil { return } // 转换为临时表结构 temp := entity.ConversationArchiveTemp{ MongoBaseDO: conv.MongoBaseDO, UserId: conv.UserId, Platform: conv.Platform, SessionId: conv.SessionId, Question: conv.Question, Answer: conv.Answer, MessageId: conv.MessageId, MsgTime: conv.MsgTime, OriginalId: conv.Id.Hex(), // 保存原始 ID } // 清空 ID,让 MongoDB 自动生成新 ID temp.Id = nil docs = append(docs, temp) // 批量插入 if len(docs) >= batchSize { if _, err = db.Collection(entity.ConversationArchiveTempCollection).InsertMany(ctx, docs); err != nil { return } count += int64(len(docs)) docs = docs[:0] } } // 插入剩余数据 if len(docs) > 0 { if _, err = db.Collection(entity.ConversationArchiveTempCollection).InsertMany(ctx, docs); err != nil { return } count += int64(len(docs)) } return } // DeleteByTempIds 根据临时表中的 originalId 删除原表数据 func (d *archive) DeleteByTempIds(ctx context.Context) (count int64, err error) { db := mongo.GetDB() // 从临时表获取所有 originalId cursor, err := db.Collection(entity.ConversationArchiveTempCollection).Find(ctx, bson.M{}) if err != nil { return } defer cursor.Close(ctx) var ids []bson.ObjectID for cursor.Next(ctx) { var temp entity.ConversationArchiveTemp if err = cursor.Decode(&temp); err != nil { return } if oid, parseErr := bson.ObjectIDFromHex(temp.OriginalId); parseErr == nil { ids = append(ids, oid) } // 每 1000 条批量删除一次 if len(ids) >= 1000 { result, delErr := db.Collection(entity.ConversationCollection).DeleteMany(ctx, bson.M{ "_id": bson.M{"$in": ids}, }) if delErr != nil { err = delErr return } count += result.DeletedCount ids = ids[:0] } } // 删除剩余数据 if len(ids) > 0 { result, delErr := db.Collection(entity.ConversationCollection).DeleteMany(ctx, bson.M{ "_id": bson.M{"$in": ids}, }) if delErr != nil { err = delErr return } count += result.DeletedCount } return } // GetTempData 获取临时表数据(用于写入 ES) func (d *archive) GetTempData(ctx context.Context) (data []*entity.ConversationArchiveTemp, err error) { db := mongo.GetDB() cursor, err := db.Collection(entity.ConversationArchiveTempCollection).Find(ctx, bson.M{}) if err != nil { return } defer cursor.Close(ctx) err = cursor.All(ctx, &data) return } // DropTempCollection 删除临时表 func (d *archive) DropTempCollection(ctx context.Context) (err error) { return mongo.GetDB().Collection(entity.ConversationArchiveTempCollection).Drop(ctx) } // CountTemp 统计临时表记录数 func (d *archive) CountTemp(ctx context.Context) (count int64, err error) { return mongo.GetDB().Collection(entity.ConversationArchiveTempCollection).CountDocuments(ctx, bson.M{}) }