Skip to content

Commit

Permalink
Add push update in sync server.
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed Mar 12, 2018
1 parent 46fd4a6 commit 0aa5ede
Show file tree
Hide file tree
Showing 14 changed files with 674 additions and 307 deletions.
2 changes: 1 addition & 1 deletion access/session/server/session_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (c *sessionClient) onUserOnline(serverId int32) {
ServerId: serverId,
UserId: c.authUserId,
AuthKeyId: c.authKeyId,
SessionId: int64(c.clientSession.clientSessionId),
SessionId: int64(c.sessionId),
NetlibSessionId: int64(c.clientSession.conn.GetConnID()),
Now: time.Now().Unix(),
}
Expand Down
Binary file modified access/session/session
Binary file not shown.
81 changes: 81 additions & 0 deletions biz_model/model/message_object_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2018, https://github.com/nebulaim
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package model

import (
"github.com/nebulaim/telegramd/mtproto"
"github.com/golang/glog"
"time"
)


//messages.sendMedia#b8d1262b flags:# silent:flags.5?true background:flags.6?true clear_draft:flags.7?true peer:InputPeer reply_to_msg_id:flags.0?int media:InputMedia message:string random_id:long reply_markup:flags.2?ReplyMarkup entities:flags.3?Vector<MessageEntity> = Updates;
//messages.forwardMessages#708e0195 flags:# silent:flags.5?true background:flags.6?true with_my_score:flags.8?true grouped:flags.9?true from_peer:InputPeer id:Vector<int> random_id:Vector<long> to_peer:InputPeer = Updates;
func sendMessageToMessageData(m *mtproto.TLMessagesSendMessage) *mtproto.TLMessage {
//messages.sendMessage#fa88427a flags:# no_webpage:flags.1?true silent:flags.5?true background:flags.6?true clear_draft:flags.7?true peer:InputPeer reply_to_msg_id:flags.0?int message:string random_id:long reply_markup:flags.2?ReplyMarkup entities:flags.3?Vector<MessageEntity> = Updates;

//// TODO(@benqi): ???
//// request.Background
//// request.NoWebpage
//// request.ClearDraft
//message.SetFromId(md.UserId)
//if peer.PeerType == base.PEER_SELF {
// to := &mtproto.TLPeerUser{ Data2: &mtproto.Peer_Data{
// UserId: md.UserId,
// }}
// message.SetToId(to.To_Peer())
//} else {
// message.SetToId(peer.ToPeer())
//}

return &mtproto.TLMessage{ Data2: &mtproto.Message_Data{
Silent: m.GetSilent(),
ReplyToMsgId: m.GetReplyToMsgId(),
Message: m.GetMessage(),
ReplyMarkup: m.GetReplyMarkup(),
Entities: m.GetEntities(),
Date: int32(time.Now().Unix()),
}}
}

func sendMediaToMessageData(m *mtproto.TLMessagesSendMedia) *mtproto.TLMessage {
return &mtproto.TLMessage{ Data2: &mtproto.Message_Data{
Silent: m.GetSilent(),
ReplyToMsgId: m.GetReplyToMsgId(),
// Media: m.GetMedia(),
// Message: m.GetMessage(),
ReplyMarkup: m.GetReplyMarkup(),
// Entities: m.GetEntities(),
Date: int32(time.Now().Unix()),
}}
}

func MakeMessageBySendMessage(m mtproto.TLObject) (message *mtproto.TLMessage, err error) {
switch m.(type) {
case *mtproto.TLMessagesSendMessage:
message = sendMessageToMessageData(m.(*mtproto.TLMessagesSendMessage))
case *mtproto.TLMessagesSendMedia:
message = sendMediaToMessageData(m.(*mtproto.TLMessagesSendMedia))
default:
err = mtproto.NewRpcError(int32(mtproto.TLRpcErrorCodes_INTERNAL_SERVER_ERROR), "internal server error")
glog.Error(err)
return
}
return
}

4 changes: 2 additions & 2 deletions biz_model/model/online_status_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ import (
// - ......
// auth_key_id ->
const (
ONLINE_TIMEOUT = 15 // 15秒
CHECK_ONLINE_TIMEOUT = 17 // 17秒, 15+2秒的误差
ONLINE_TIMEOUT = 60 // 15秒
CHECK_ONLINE_TIMEOUT = 70 // 17秒, 15+2秒的误差
onlineKeyPrefix = "online" //
)

Expand Down
29 changes: 15 additions & 14 deletions biz_server/account/rpc/account.updateStatus_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,30 @@ import (
"github.com/nebulaim/telegramd/grpc_util"
"github.com/nebulaim/telegramd/mtproto"
"golang.org/x/net/context"
"time"
"github.com/nebulaim/telegramd/biz_model/model"
// "time"
// "github.com/nebulaim/telegramd/biz_model/model"
)

// account.updateStatus#6628562c offline:Bool = Bool;
func (s *AccountServiceImpl) AccountUpdateStatus(ctx context.Context, request *mtproto.TLAccountUpdateStatus) (*mtproto.Bool, error) {
md := grpc_util.RpcMetadataFromIncoming(ctx)
glog.Infof("AccountUpdateStatus - metadata: %s, request: %s", logger.JsonDebugData(md), logger.JsonDebugData(request))

status := &model.SessionStatus{}
status.UserId = md.UserId
status.AuthKeyId = md.AuthId
status.SessionId = md.SessionId
status.ServerId = md.ServerId
status.Now = time.Now().Unix()

//status := &model.SessionStatus{}
//status.UserId = md.UserId
//status.AuthKeyId = md.AuthId
//status.SessionId = md.SessionId
//status.ServerId = md.ServerId
//status.Now = time.Now().Unix()
//
// 不是这个逻辑
// Offline可能为nil,由grpc中间件保证Offline必须设置值
// offline := request.GetOffline()
if mtproto.FromBool(request.GetOffline()) {
model.GetOnlineStatusModel().SetOnline(status)
} else {
model.GetOnlineStatusModel().SetOffline(status)
}
//if mtproto.FromBool(request.GetOffline()) {
// model.GetOnlineStatusModel().SetOnline(status)
//} else {
// model.GetOnlineStatusModel().SetOffline(status)
//}

// TODO(@benqi): broadcast online status???

Expand Down
1 change: 1 addition & 0 deletions biz_server/auth/rpc/auth.checkPhone_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *AuthServiceImpl) AuthCheckPhone(ctx context.Context, request *mtproto.T

usersDAO := dao.GetUsersDAO(dao.DB_SLAVE)
usersDO := usersDAO.SelectByPhoneNumber(phoneNumber)
glog.Infof("phoneNumber: %d, usersDO: {%v}", phoneNumber, usersDO)
checkedPhone := mtproto.TLAuthCheckedPhone{Data2: &mtproto.Auth_CheckedPhone_Data{
PhoneRegistered: mtproto.ToBool(usersDO != nil),
}}
Expand Down
91 changes: 51 additions & 40 deletions biz_server/messages/rpc/messages.readHistory_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import (
"golang.org/x/net/context"
"github.com/nebulaim/telegramd/biz_model/base"
"github.com/nebulaim/telegramd/biz_model/dal/dao"
"time"
"github.com/nebulaim/telegramd/biz_server/delivery"
"github.com/nebulaim/telegramd/biz_model/model"
base2 "github.com/nebulaim/telegramd/baselib/base"
"github.com/nebulaim/telegramd/biz_server/sync_client"
)

/*
Expand All @@ -52,63 +49,77 @@ func (s *MessagesServiceImpl) MessagesReadHistory(ctx context.Context, request *
// 1. inbox,设置unread_count为0以及read_inbox_max_id
// inBoxDO := dao.GetUserDialogsDAO(dao.DB_SLAVE).SelectByPeer(md.UserId, int8(peer.PeerType), peer.PeerId)
dao.GetUserDialogsDAO(dao.DB_MASTER).UpdateUnreadByPeer(request.GetMaxId(), md.UserId, int8(peer.PeerType), peer.PeerId)
// return me
pts := int32(model.GetSequenceModel().NextPtsId(base2.Int32ToString(md.UserId)))
model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, pts, base.PEER_USER, peer.PeerId, model.PTS_READ_HISTORY_INBOX, 0, request.GetMaxId())

updateReadHistoryInbox := mtproto.NewTLUpdateReadHistoryInbox()
updateReadHistoryInbox.SetPeer(peer.ToPeer())
//updateReadHistoryInbox.SetPts(pts)
//updateReadHistoryInbox.SetPtsCount(1)
updateReadHistoryInbox.SetMaxId(request.MaxId)

state, err := sync_client.GetSyncClient().SyncUpdateData(md.AuthId, md.SessionId, md.NetlibSessionId, md.UserId, base.PEER_USER, md.UserId, updateReadHistoryInbox.To_Update())
if err != nil {
return nil, err
}

//// return me
//pts := int32(model.GetSequenceModel().NextPtsId(base2.Int32ToString(md.UserId)))
//model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, pts, base.PEER_USER, peer.PeerId, model.PTS_READ_HISTORY_INBOX, 0, request.GetMaxId())

affected := mtproto.NewTLMessagesAffectedMessages()
// pts = model.GetSequenceModel().NextPtsId(base2.Int32ToString(peer.PeerId))
affected.SetPts(int32(pts))
affected.SetPtsCount(1)
affected.SetPts(int32(state.Pts))
affected.SetPtsCount(state.PtsCount)

// outboxPeer := &mtproto.TLPeerUser{Data2: &mtproto.Peer_Data{
// UserId: md.UserId,
// }}
// 消息漫游
updateReadHistoryInbox := mtproto.NewTLUpdateReadHistoryInbox()
updateReadHistoryInbox.SetPeer(peer.ToPeer())
updateReadHistoryInbox.SetPts(pts)
updateReadHistoryInbox.SetPtsCount(1)
updateReadHistoryInbox.SetMaxId(request.MaxId)

updates := mtproto.NewTLUpdates()
updates.SetSeq(0)
updates.SetDate(int32(time.Now().Unix()))
updates.SetUpdates([]*mtproto.Update{updateReadHistoryInbox.To_Update()})

delivery.GetDeliveryInstance().DeliveryUpdatesNotMe(
md.AuthId,
md.SessionId,
md.NetlibSessionId,
[]int32{md.UserId},
updates.To_Updates().Encode())
//updateReadHistoryInbox := mtproto.NewTLUpdateReadHistoryInbox()
//updateReadHistoryInbox.SetPeer(peer.ToPeer())
//updateReadHistoryInbox.SetPts(pts)
//updateReadHistoryInbox.SetPtsCount(1)
//updateReadHistoryInbox.SetMaxId(request.MaxId)
//
//updates := mtproto.NewTLUpdates()
//updates.SetSeq(0)
//updates.SetDate(int32(time.Now().Unix()))
//updates.SetUpdates([]*mtproto.Update{updateReadHistoryInbox.To_Update()})
//
//delivery.GetDeliveryInstance().DeliveryUpdatesNotMe(
// md.AuthId,
// md.SessionId,
// md.NetlibSessionId,
// []int32{md.UserId},
// updates.To_Updates().Encode())

// 2. outbox, 设置read_outbox_max_id
outboxDO := dao.GetUserDialogsDAO(dao.DB_SLAVE).SelectByPeer(peer.PeerId, int8(peer.PeerType), md.UserId)
dao.GetUserDialogsDAO(dao.DB_MASTER).UpdateReadOutboxMaxIdByPeer(outboxDO.TopMessage, peer.PeerId, int8(peer.PeerType), md.UserId)
pts = int32(model.GetSequenceModel().NextPtsId(base2.Int32ToString(peer.PeerId)))
model.GetUpdatesModel().AddPtsToUpdatesQueue(peer.PeerId, pts, base.PEER_USER, md.UserId, model.PTS_READ_HISTORY_OUTBOX, 0, outboxDO.TopMessage)
// pts = int32(model.GetSequenceModel().NextPtsId(base2.Int32ToString(peer.PeerId)))
// model.GetUpdatesModel().AddPtsToUpdatesQueue(peer.PeerId, pts, base.PEER_USER, md.UserId, model.PTS_READ_HISTORY_OUTBOX, 0, outboxDO.TopMessage)

updateReadHistoryOutbox := mtproto.NewTLUpdateReadHistoryOutbox()
// oudboxDO := dao.GetUserDialogsDAO(dao.DB_SLAVE).SelectByPeer(peer.PeerId, int8(peer.PeerType), md.UserId)
outboxPeer := &mtproto.TLPeerUser{Data2: &mtproto.Peer_Data{
UserId: md.UserId,
}}
updateReadHistoryOutbox.SetPeer(outboxPeer.To_Peer())
updateReadHistoryOutbox.SetPts(pts)
updateReadHistoryOutbox.SetPtsCount(1)
// updateReadHistoryOutbox.SetPts(pts)
// updateReadHistoryOutbox.SetPtsCount(1)
updateReadHistoryOutbox.SetMaxId(outboxDO.TopMessage)

updates = mtproto.NewTLUpdates()
updates.SetSeq(0)
updates.SetDate(int32(time.Now().Unix()))
updates.SetUpdates([]*mtproto.Update{updateReadHistoryOutbox.To_Update()})
delivery.GetDeliveryInstance().DeliveryUpdatesNotMe(
md.AuthId,
md.SessionId,
md.NetlibSessionId,
[]int32{peer.PeerId},
updates.To_Updates().Encode())
sync_client.GetSyncClient().PushUpdateData(peer.PeerId, base.PEER_USER, peer.PeerId, updateReadHistoryOutbox.To_Update())

//updates = mtproto.NewTLUpdates()
//updates.SetSeq(0)
//updates.SetDate(int32(time.Now().Unix()))
//updates.SetUpdates([]*mtproto.Update{updateReadHistoryOutbox.To_Update()})
//delivery.GetDeliveryInstance().DeliveryUpdatesNotMe(
// md.AuthId,
// md.SessionId,
// md.NetlibSessionId,
// []int32{peer.PeerId},
// updates.To_Updates().Encode())

glog.Infof("MessagesReadHistory - reply: %s", logger.JsonDebugData(affected))
return affected.To_Messages_AffectedMessages(), nil
Expand Down
13 changes: 8 additions & 5 deletions biz_server/messages/rpc/messages.sendMessage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import (
"time"
"github.com/nebulaim/telegramd/biz_model/base"
"github.com/nebulaim/telegramd/biz_model/model"
// "github.com/nebulaim/telegramd/zproto"
"fmt"
"github.com/nebulaim/telegramd/biz_server/sync"
"github.com/nebulaim/telegramd/biz_server/sync_client"
)

// 流程:
Expand Down Expand Up @@ -298,7 +297,7 @@ func sendPeerSelfMessage(md *grpc_util.RpcMetadata, request *mtproto.TLMessagesS
// updates.SetPtsCount(1)
updates.SetMessage(request.Message)
updates.SetDate(message.GetDate())
state, err := sync.GetSyncClient().SyncUpdateShortMessage(md.AuthId, md.SessionId, md.NetlibSessionId, md.UserId, md.UserId, updates)
state, err := sync_client.GetSyncClient().SyncUpdateShortMessage(md.AuthId, md.SessionId, md.NetlibSessionId, md.UserId, md.UserId, updates)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -360,7 +359,7 @@ func sendPeerUserMessage(md *grpc_util.RpcMetadata, request *mtproto.TLMessagesS
// updates.SetPtsCount(1)
updates.SetMessage(request.Message)
updates.SetDate(message.GetDate())
state, err := sync.GetSyncClient().SyncUpdateShortMessage(md.AuthId, md.SessionId, md.NetlibSessionId, md.UserId, md.UserId, updates)
state, err := sync_client.GetSyncClient().SyncUpdateShortMessage(md.AuthId, md.SessionId, md.NetlibSessionId, md.UserId, md.UserId, updates)
if err != nil {
return nil, err
}
Expand All @@ -383,6 +382,10 @@ func sendPeerUserMessage(md *grpc_util.RpcMetadata, request *mtproto.TLMessagesS

// var myPts int = 0
for _, idPair := range ids {
if idPair.UserId == md.UserId {
continue
}

// model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, )
// 推给客户端的updates
updates2 := mtproto.NewTLUpdateShortMessage()
Expand All @@ -393,7 +396,7 @@ func sendPeerUserMessage(md *grpc_util.RpcMetadata, request *mtproto.TLMessagesS
//updates.SetPtsCount(1)
updates2.SetMessage(request.Message)
updates2.SetDate(message.GetDate())
sync.GetSyncClient().PushUpdateShortMessage(md.UserId, idPair.UserId, updates2)
sync_client.GetSyncClient().PushUpdateShortMessage(idPair.UserId, md.UserId, updates2)
}

glog.Infof("MessagesSendMessage - reply: %s", logger.JsonDebugData(sentMessage))
Expand Down
4 changes: 2 additions & 2 deletions biz_server/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/nebulaim/telegramd/grpc_util"
"github.com/nebulaim/telegramd/grpc_util/service_discovery"
"google.golang.org/grpc"
"github.com/nebulaim/telegramd/biz_server/sync"
"github.com/nebulaim/telegramd/biz_server/sync_client"
)

func init() {
Expand Down Expand Up @@ -101,7 +101,7 @@ func main() {
//}

delivery.InstallDeliveryInstance(bizServerConfig.SyncRpcClient1)
sync.InstallSyncClient(bizServerConfig.SyncRpcClient2)
sync_client.InstallSyncClient(bizServerConfig.SyncRpcClient2)
// Start server
grpcServer := grpc_util.NewRpcServer(bizServerConfig.Server.Addr, &bizServerConfig.Discovery)
grpcServer.Serve(func(s *grpc.Server) {
Expand Down
Loading

0 comments on commit 0aa5ede

Please sign in to comment.