package dao import ( "cidService/model/dto" "cidService/model/entity" "context" "gitee.com/red-future---jilin-g/common/http" "gitee.com/red-future---jilin-g/common/mongo" "github.com/gogf/gf/v2/frame/g" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo/options" ) var Data = &data{} type data struct{} // Insert 插入数据 func (d *data) Insert(ctx context.Context, data *entity.Data) (err error) { // 获取stream消息 redis := g.Redis() streamMsg, err := redis.Do(ctx, "XREAD", "STREAMS", "data_stream", "$") if err != nil { g.Log().Errorf(ctx, "获取stream消息失败: %v", err) } else { g.Log().Infof(ctx, "获取到stream消息: %v", streamMsg) } _, err = mongo.Insert(ctx, []interface{}{data}, entity.DataCollection) return } // Update 更新数据 func (d *data) Update(ctx context.Context, req *dto.UpdateDataReq) (err error) { objectId, err := bson.ObjectIDFromHex(req.Id) if err != nil { return } filter := bson.M{"_id": objectId} // 构建动态更新字段 updateFields := bson.M{} if !g.IsEmpty(req.CustomerId) { updateFields["customerId"] = req.CustomerId } if !g.IsEmpty(req.CustomerServiceId) { updateFields["customerServiceId"] = req.CustomerServiceId } if req.IsInbound != nil { updateFields["isInbound"] = *req.IsInbound } if req.IsActive != nil { updateFields["isActive"] = *req.IsActive } if req.IsServed != nil { updateFields["isServed"] = *req.IsServed } if req.HasSentContactCard != nil { updateFields["hasSentContactCard"] = *req.HasSentContactCard } if req.HasSentNameCard != nil { updateFields["hasSentNameCard"] = *req.HasSentNameCard } if req.HasLeftContactInfo != nil { updateFields["hasLeftContactInfo"] = *req.HasLeftContactInfo } if len(updateFields) > 0 { update := bson.M{"$set": updateFields} _, err = mongo.Update(ctx, filter, update, entity.DataCollection) } return } // buildListFilter 构建列表查询的过滤条件 func (d *data) buildListFilter(req *dto.ListDataReq) bson.M { filter := bson.M{} if !g.IsEmpty(req.CustomerId) { filter["customerId"] = req.CustomerId } if !g.IsEmpty(req.CustomerServiceId) { filter["customerServiceId"] = req.CustomerServiceId } return filter } // checkTotalCount 检查总数 func (d *data) checkTotalCount(ctx context.Context, filter bson.M) (total int64, err error) { total, err = mongo.Count(ctx, filter, entity.DataCollection) return } // List 获取数据列表 func (d *data) List(ctx context.Context, req *dto.ListDataReq) (list []*entity.Data, total int64, err error) { // 构建查询过滤条件 filter := d.buildListFilter(req) // 检查总数 total, err = d.checkTotalCount(ctx, filter) if err != nil { return } // 分页参数处理 pageNum := req.PageNum if pageNum <= 0 { pageNum = 1 } pageSize := req.PageSize if pageSize <= 0 { pageSize = http.PageSize } limit := int64(pageSize) skip := int64((pageNum - 1) * pageSize) opts := options.Find().SetLimit(limit).SetSkip(skip).SetSort(bson.M{"sessionStartTime": -1}) err = mongo.Find(ctx, filter, &list, entity.DataCollection, opts) return }