Skip to content

Commit

Permalink
Add sync server.
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed Mar 10, 2018
1 parent 0504561 commit bb3d4ef
Show file tree
Hide file tree
Showing 15 changed files with 1,857 additions and 282 deletions.
7 changes: 6 additions & 1 deletion biz_server/biz_server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ tTL = "10s"
#serviceName = "sync"
#addr = "localhost:10002"

[syncRpcClient]
[syncRpcClient1]
serviceName = "sync"
etcdAddrs = ["http://127.0.0.1:2379"]
balancer = "round_robin"

[syncRpcClient2]
serviceName = "sync2"
etcdAddrs = ["http://127.0.0.1:2379"]
balancer = "round_robin"

[[redis]]
name = "cache"
addr = "127.0.0.1:6379"
Expand Down
134 changes: 66 additions & 68 deletions biz_server/messages/rpc/messages.sendMessage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ import (
"time"
"github.com/nebulaim/telegramd/biz_model/base"
"github.com/nebulaim/telegramd/biz_model/model"
"github.com/nebulaim/telegramd/biz_server/delivery"
"github.com/nebulaim/telegramd/zproto"
base2 "github.com/nebulaim/telegramd/base/base"
"fmt"
"github.com/nebulaim/telegramd/biz_server/sync"
)

// 流程:
Expand All @@ -54,7 +53,7 @@ func (s *MessagesServiceImpl) MessagesSendMessage(ctx context.Context, request *
case mtproto.TLConstructor_CRC32_inputPeerChat:
sentMessage, err = sendPeerChatMessage(md, request)
case mtproto.TLConstructor_CRC32_inputPeerChannel:
sentMessage, err = sendPeerChatMessage(md, request)
sentMessage, err = sendPeerChannelMessage(md, request)
default:
err = fmt.Errorf("MessagesSendMessage - Invalid inpuet peer {%v}", request.GetPeer())
glog.Error(err)
Expand Down Expand Up @@ -285,51 +284,54 @@ func sendPeerSelfMessage(md *zproto.RpcMetadata, request *mtproto.TLMessagesSend
peer := &base.PeerUtil{PeerType: base.PEER_USER, PeerId: md.UserId}
message.SetToId(peer.ToPeer())


sentMessage := mtproto.NewTLUpdateShortSentMessage()

// message
ids := model.GetMessageModel().SendMessage(md.UserId, base.PEER_USER, md.UserId, request.GetRandomId(), message.To_Message())

//// 1. SaveMessage
//messageId := model.GetMessageModel().CreateHistoryMessage2(md.UserId, peer, request.RandomId, message.GetDate(), message.To_Message())
//// 2. MessageBoxes
//pts := model.GetMessageModel().CreateMessageBoxes(md.UserId, message.GetFromId(), peer.PeerType, md.UserId, false, messageId)
//// 3. dialog
//model.GetDialogModel().CreateOrUpdateByLastMessage(md.UserId, peer.PeerType, md.UserId, messageId, message.GetMentioned(), false)

// 推送给sync
pts := int32(model.GetSequenceModel().NextPtsId(base2.Int32ToString(md.UserId)))
model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, pts, base.PEER_USER, md.UserId, model.PTS_MESSAGE_INBOX, ids[0].MessageBoxId, 0)

// model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, )
// 推给客户端的updates
updates := mtproto.NewTLUpdateShortMessage()
sentMessage.SetOut(true)
// sentMessage.SetOut(true)
updates.SetId(ids[0].MessageBoxId)
updates.SetUserId(md.UserId)
// TODO(@benqi): 暂时这样实现验证发消息是否有问题,有问题的
updates.SetPts(pts)
updates.SetPtsCount(1)
// updates.SetPts(pts)
// updates.SetPtsCount(1)
updates.SetMessage(request.Message)
updates.SetDate(message.GetDate())
updatesData := updates.To_Updates().Encode()
delivery.GetDeliveryInstance().DeliveryUpdatesNotMe(
md.AuthId,
md.SessionId,
md.NetlibSessionId,
[]int32{md.UserId},
updatesData)
// 返回给客户端
state, err := sync.GetSyncClient().SyncUpdateShortMessage(md.AuthId, md.SessionId, md.NetlibSessionId, md.UserId, md.UserId, updates)
if err != nil {
return nil, err
}

sentMessage := mtproto.NewTLUpdateShortSentMessage()
// sentMessage := &mtproto.TLUpdateShortSentMessage{}
sentMessage.SetOut(true)
sentMessage.SetId(int32(ids[0].MessageBoxId))
// TODO(@benqi): 暂时这样实现验证发消息是否有问题,有问题的
sentMessage.SetPts(pts)
sentMessage.SetPtsCount(1)
sentMessage.SetPts(state.Pts)
sentMessage.SetPtsCount(state.PtsCount)
sentMessage.SetDate(message.GetDate())
sentMessage.SetMedia(mtproto.NewTLMessageMediaEmpty().To_MessageMedia())

////// 1. SaveMessage
////messageId := model.GetMessageModel().CreateHistoryMessage2(md.UserId, peer, request.RandomId, message.GetDate(), message.To_Message())
////// 2. MessageBoxes
////pts := model.GetMessageModel().CreateMessageBoxes(md.UserId, message.GetFromId(), peer.PeerType, md.UserId, false, messageId)
////// 3. dialog
////model.GetDialogModel().CreateOrUpdateByLastMessage(md.UserId, peer.PeerType, md.UserId, messageId, message.GetMentioned(), false)
//
//// 推送给sync
//pts := int32(model.GetSequenceModel().NextPtsId(base2.Int32ToString(md.UserId)))
//model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, pts, base.PEER_USER, md.UserId, model.PTS_MESSAGE_INBOX, ids[0].MessageBoxId, 0)
//
//
//delivery.GetDeliveryInstance().DeliveryUpdatesNotMe(
// md.AuthId,
// md.SessionId,
// md.NetlibSessionId,
// []int32{md.UserId},
// updatesData)
//// 返回给客户端
glog.Infof("MessagesSendMessage - reply: %s", logger.JsonDebugData(sentMessage))
// reply = sentMessage.ToUpdates()
return sentMessage, nil
Expand All @@ -343,11 +345,35 @@ func sendPeerUserMessage(md *zproto.RpcMetadata, request *mtproto.TLMessagesSend
peer := &base.PeerUtil{PeerType: base.PEER_USER, PeerId: request.GetPeer().GetData2().GetUserId()}
message.SetToId(peer.ToPeer())

sentMessage := mtproto.NewTLUpdateShortSentMessage()

// message
ids := model.GetMessageModel().SendMessage(md.UserId, base.PEER_USER, peer.PeerId, request.GetRandomId(), message.To_Message())


// model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, )
// 推给客户端的updates
updates := mtproto.NewTLUpdateShortMessage()
// sentMessage.SetOut(true)
updates.SetId(ids[0].MessageBoxId)
updates.SetUserId(md.UserId)
// TODO(@benqi): 暂时这样实现验证发消息是否有问题,有问题的
// updates.SetPts(pts)
// updates.SetPtsCount(1)
updates.SetMessage(request.Message)
updates.SetDate(message.GetDate())
state, err := sync.GetSyncClient().SyncUpdateShortMessage(md.AuthId, md.SessionId, md.NetlibSessionId, md.UserId, md.UserId, updates)
if err != nil {
return nil, err
}

sentMessage := mtproto.NewTLUpdateShortSentMessage()
sentMessage.SetOut(true)
sentMessage.SetId(int32(ids[0].MessageBoxId))
// TODO(@benqi): 暂时这样实现验证发消息是否有问题,有问题的
sentMessage.SetPts(state.Pts)
sentMessage.SetPtsCount(state.PtsCount)
sentMessage.SetDate(message.GetDate())
sentMessage.SetMedia(mtproto.NewTLMessageMediaEmpty().To_MessageMedia())

//// 1. SaveMessage
//messageId := model.GetMessageModel().CreateHistoryMessage2(md.UserId, peer, request.RandomId, message.GetDate(), message.To_Message())
//// 2. MessageBoxes
Expand All @@ -359,43 +385,15 @@ func sendPeerUserMessage(md *zproto.RpcMetadata, request *mtproto.TLMessagesSend
for _, idPair := range ids {
// model.GetUpdatesModel().AddPtsToUpdatesQueue(md.UserId, )
// 推给客户端的updates
updates := mtproto.NewTLUpdateShortMessage()
// 推送给sync
pts := int32(model.GetSequenceModel().NextPtsId(base2.Int32ToString(idPair.UserId)))
if idPair.UserId == md.UserId {
updates.SetOut(true)
model.GetUpdatesModel().AddPtsToUpdatesQueue(idPair.UserId, pts, base.PEER_USER, peer.PeerId, model.PTS_MESSAGE_OUTBOX, idPair.MessageBoxId, 0)
} else {
updates.SetOut(false)
model.GetUpdatesModel().AddPtsToUpdatesQueue(idPair.UserId, pts, base.PEER_USER, peer.PeerId, model.PTS_MESSAGE_INBOX, idPair.MessageBoxId, 0)
}

updates.SetId(idPair.MessageBoxId)
updates.SetUserId(md.UserId)
updates2 := mtproto.NewTLUpdateShortMessage()
updates2.SetId(idPair.MessageBoxId)
updates2.SetUserId(md.UserId)
// TODO(@benqi): 暂时这样实现验证发消息是否有问题,有问题的
updates.SetPts(pts)
updates.SetPtsCount(1)
updates.SetMessage(request.Message)
updates.SetDate(message.GetDate())
updatesData := updates.To_Updates().Encode()
delivery.GetDeliveryInstance().DeliveryUpdatesNotMe(
md.AuthId,
md.SessionId,
md.NetlibSessionId,
[]int32{idPair.UserId},
updatesData)
// 返回给客户端

if idPair.UserId == md.UserId {
// sentMessage := &mtproto.TLUpdateShortSentMessage{}
sentMessage.SetOut(true)
sentMessage.SetId(idPair.MessageBoxId)
// TODO(@benqi): 暂时这样实现验证发消息是否有问题,有问题的
sentMessage.SetPts(pts)
sentMessage.SetPtsCount(1)
sentMessage.SetDate(message.GetDate())
sentMessage.SetMedia(mtproto.NewTLMessageMediaEmpty().To_MessageMedia())
}
//updates.SetPts(pts)
//updates.SetPtsCount(1)
updates2.SetMessage(request.Message)
updates2.SetDate(message.GetDate())
sync.GetSyncClient().PushUpdateShortMessage(md.UserId, idPair.UserId, updates2)
}

glog.Infof("MessagesSendMessage - reply: %s", logger.JsonDebugData(sentMessage))
Expand Down
8 changes: 5 additions & 3 deletions biz_server/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/nebulaim/telegramd/grpc_util"
"github.com/nebulaim/telegramd/grpc_util/service_discovery"
"google.golang.org/grpc"
"github.com/nebulaim/telegramd/biz_server/sync"
)

func init() {
Expand All @@ -69,7 +70,8 @@ type BizServerConfig struct{
// RpcClient *RpcClientConfig
Mysql []mysql_client.MySQLConfig
Redis []redis_client.RedisConfig
SyncRpcClient *service_discovery.ServiceDiscoveryClientConfig
SyncRpcClient1 *service_discovery.ServiceDiscoveryClientConfig
SyncRpcClient2 *service_discovery.ServiceDiscoveryClientConfig
}

// 整合各服务,方便开发调试
Expand Down Expand Up @@ -98,8 +100,8 @@ func main() {
// glog.Fatalf("failed to listen: %v", err)
//}

delivery.InstallDeliveryInstance(bizServerConfig.SyncRpcClient)

delivery.InstallDeliveryInstance(bizServerConfig.SyncRpcClient1)
sync.InstallSyncClient(bizServerConfig.SyncRpcClient2)
// Start server
grpcServer := grpc_util.NewRpcServer(bizServerConfig.Server.Addr, &bizServerConfig.Discovery)
grpcServer.Serve(func(s *grpc.Server) {
Expand Down
104 changes: 104 additions & 0 deletions biz_server/sync/sync_rpc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2017, https://github.com/nebulaim
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sync

import (
"context"
"github.com/golang/glog"
"github.com/nebulaim/telegramd/grpc_util/service_discovery"
"github.com/nebulaim/telegramd/grpc_util"
"github.com/nebulaim/telegramd/mtproto"
)

type syncClient struct {
client mtproto.RPCSyncClient
}

var (
syncInstance = &syncClient{}
)

func GetSyncClient() *syncClient {
return syncInstance
}

func InstallSyncClient(discovery *service_discovery.ServiceDiscoveryClientConfig) {
conn, err := grpc_util.NewRPCClientByServiceDiscovery(discovery)

if err != nil {
glog.Error(err)
panic(err)
}

syncInstance.client = mtproto.NewRPCSyncClient(conn)
}

func (c *syncClient) SyncUpdateShortMessage(authKeyId, sessionId, netlibSessionId int64, senderId, peerId int32, update *mtproto.TLUpdateShortMessage) (reply *mtproto.ClientUpdatesState, err error) {
m := &mtproto.SyncShortMessageRequest{
ClientId: &mtproto.PushClientID{
AuthKeyId: authKeyId,
SessionId: sessionId,
NetlibSessionId: netlibSessionId,
},
SenderUserId: senderId,
// PeerUserId: peerId,
PushData: &mtproto.PushShortMessage{
PushType: mtproto.SyncType_SYNC_TYPE_USER_NOTME,
PushUserId: peerId,
PushData: update,
},
}
reply, err = c.client.SyncUpdateShortMessage(context.Background(), m)
return
}

func (c *syncClient) PushUpdateShortMessage(senderId, peerId int32, update *mtproto.TLUpdateShortMessage) (reply *mtproto.VoidRsp, err error) {
m := &mtproto.UpdateShortMessageRequest{
PeerUserId: peerId,
PushData: &mtproto.PushShortMessage{
PushType: mtproto.SyncType_SYNC_TYPE_USER,
PushUserId: peerId,
PushData: update,
},
}
reply, err = c.client.PushUpdateShortMessage(context.Background(), m)
return
}

func (c *syncClient) SyncUpdateShortChatMessage(authKeyId, sessionId, netlibSessionId int64, senderId, peerId int32, update *mtproto.TLUpdateShortChatMessage) (reply *mtproto.ClientUpdatesState, err error) {
m := &mtproto.SyncShortChatMessageRequest{
ClientId: &mtproto.PushClientID{
AuthKeyId: authKeyId,
SessionId: sessionId,
NetlibSessionId: netlibSessionId,
},
SenderUserId: senderId,
// PeerUserId: peerId,
PushData: &mtproto.PushShortChatMessage{
PushType: mtproto.SyncType_SYNC_TYPE_USER_NOTME,
PushUserId: peerId,
PushData: update,
},
}
reply, err = c.client.SyncUpdateShortChatMessage(context.Background(), m)
return
}

func (c *syncClient) PushUpdateShortChatMessage(senderId, peerId int32, update *mtproto.TLUpdateShortChatMessage) (reply *mtproto.VoidRsp, err error) {
return
}
4 changes: 4 additions & 0 deletions grpc_util/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ func (s *RPCServer) Serve(regFunc RegisterRPCServerFunc) {
glog.Fatalf("failed to serve: %s", err)
}
}

func (s *RPCServer) Stop() {
s.s.GracefulStop()
}
Loading

0 comments on commit bb3d4ef

Please sign in to comment.