Skip to content

Commit

Permalink
opt preheat for big grp
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwnloyblog committed Oct 16, 2024
1 parent 37d1571 commit 2370838
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 31 deletions.
16 changes: 13 additions & 3 deletions services/message/services/converservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ func GetConversation(ctx context.Context, userId, targetId string, channelType p
}
}

func CacheUserConver(appkey, userId, targetId string, channelType pbobjs.ChannelType, conver *UserConversationItem) {
key := fmt.Sprintf("%s_%s_%s_%d", appkey, userId, targetId, channelType)
l := userLocks.GetLocks(key)
l.Lock()
defer l.Unlock()
if !UserConverCacheContains(appkey, userId, targetId, channelType) {
converCache.Add(key, conver)
}
}

func BatchInitUserConvers(ctx context.Context, targetId string, channelType pbobjs.ChannelType, userIds []string) {
appkey := bases.GetAppKeyFromCtx(ctx)
groups := bases.GroupTargets("qry_conver", userIds)
Expand All @@ -90,12 +100,12 @@ func BatchInitUserConvers(ctx context.Context, targetId string, channelType pbob
if ok && convers != nil {
for _, conver := range convers.Conversations {
key := fmt.Sprintf("%s_%s_%s_%d", appkey, conver.UserId, targetId, channelType)
item := &UserConversationItem{
CacheUserConver(appkey, conver.UserId, targetId, channelType, &UserConversationItem{
key: key,
UndisturbType: conver.UndisturbType,
UnreadIndex: conver.LatestUnreadIndex,
ConverTags: conver.ConverTags,
}
converCache.Add(key, item)
})
}
}
}
Expand Down
66 changes: 40 additions & 26 deletions services/message/services/dispatchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
)

var MsgSinglePools *tools.SinglePools
var GrpSinglePools *tools.SinglePools

func init() {
MsgSinglePools = tools.NewSinglePools(8192)
GrpSinglePools = tools.NewSinglePools(256)
}

func DispatchMsg(ctx context.Context, downMsg *pbobjs.DownMsg) {
Expand All @@ -31,36 +33,48 @@ func DispatchMsg(ctx context.Context, downMsg *pbobjs.DownMsg) {
if exist && appinfo != nil {
threadhold = appinfo.BigGrpThreshold
}
closeOffline := false
if downMsg.MemberCount < int32(threadhold) {
closeOffline = true
//preheat user status
noStatusCacheUids := []string{}
noConverCacheUids := []string{}
for _, receiverId := range memberIds {
if !UserStatusCacheContains(appkey, receiverId) {
noStatusCacheUids = append(noStatusCacheUids, receiverId)
}
if !UserConverCacheContains(appkey, receiverId, downMsg.TargetId, downMsg.ChannelType) {
noConverCacheUids = append(noConverCacheUids, receiverId)
if downMsg.MemberCount > int32(threadhold) {
GrpSinglePools.GetPool(strings.Join([]string{appkey, downMsg.TargetId}, "_")).Submit(func() {
preheat(ctx, appkey, memberIds, downMsg)
for _, receiverId := range memberIds {
newDownMsg := copyDownMsg(downMsg)
receId := receiverId
userStatus := GetUserStatus(appkey, receId)
closeOffline := (!userStatus.IsOnline())
MsgSinglePools.GetPool(strings.Join([]string{appkey, receId}, "_")).Submit(func() {
doDispatch(ctx, receId, newDownMsg, closeOffline)
})
}
}
if len(noStatusCacheUids) > 0 {
BatchInitUserStatus(ctx, appkey, noStatusCacheUids)
}
if len(noConverCacheUids) > 0 {
BatchInitUserConvers(ctx, downMsg.TargetId, downMsg.ChannelType, noConverCacheUids)
})
} else {
for _, receiverId := range memberIds {
newDownMsg := copyDownMsg(downMsg)
receId := receiverId
MsgSinglePools.GetPool(strings.Join([]string{appkey, receId}, "_")).Submit(func() {
doDispatch(ctx, receId, newDownMsg, false)
})
}
}
for _, receiverId := range memberIds {
newDownMsg := copyDownMsg(downMsg)
receId := receiverId
userStatus := GetUserStatus(appkey, receId)
closeOffline = closeOffline && (!userStatus.IsOnline())
MsgSinglePools.GetPool(strings.Join([]string{appkey, receId}, "_")).Submit(func() {
doDispatch(ctx, receId, newDownMsg, closeOffline)
})
}
}

func preheat(ctx context.Context, appkey string, memberIds []string, downMsg *pbobjs.DownMsg) {
//preheat user status
noStatusCacheUids := []string{}
noConverCacheUids := []string{}
for _, receiverId := range memberIds {
if !UserStatusCacheContains(appkey, receiverId) {
noStatusCacheUids = append(noStatusCacheUids, receiverId)
}
if !UserConverCacheContains(appkey, receiverId, downMsg.TargetId, downMsg.ChannelType) {
noConverCacheUids = append(noConverCacheUids, receiverId)
}
}
if len(noStatusCacheUids) > 0 {
BatchInitUserStatus(ctx, appkey, noStatusCacheUids)
}
if len(noConverCacheUids) > 0 {
BatchInitUserConvers(ctx, downMsg.TargetId, downMsg.ChannelType, noConverCacheUids)
}
}

Expand Down
14 changes: 12 additions & 2 deletions services/message/services/userstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ func GetUserStatus(appKey, userId string) *UserStatus {
}
}

func CacheUserStatus(appkey, userId string, status *UserStatus) {
key := getKey(appkey, userId)
l := userLocks.GetLocks(key)
l.Lock()
defer l.Unlock()
if !UserStatusCacheContains(appkey, userId) {
userOnlineStatusCache.Add(key, status)
}
}

func BatchInitUserStatus(ctx context.Context, appkey string, userIds []string) {
//check status from connect manager
groups := bases.GroupTargets("qry_online_status", userIds)
Expand All @@ -168,11 +178,11 @@ func BatchInitUserStatus(ctx context.Context, appkey string, userIds []string) {
onlineResp, ok := resp.(*pbobjs.UserOnlineStatusResp)
if ok && len(onlineResp.Items) > 0 {
for _, item := range onlineResp.Items {
cacheKey := getKey(appkey, item.UserId)
userOnlineStatusCache.Add(cacheKey, &UserStatus{
CacheUserStatus(appkey, item.UserId, &UserStatus{
appkey: appkey,
userId: item.UserId,
OnlineStatus: item.IsOnline,
CanPush: 1,
})
}
}
Expand Down

0 comments on commit 2370838

Please sign in to comment.