Skip to content

Commit

Permalink
Add LengthBasedFrame codec; testing TcpClientGroupManager
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed Jan 31, 2018
1 parent 1a61869 commit 5b65a6d
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 58 deletions.
92 changes: 92 additions & 0 deletions baselib/net2/codec/length_based_frame.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2017, https://github.com/nebulaim
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Test codec
package codec

import (
"github.com/nebulaim/telegramd/baselib/net2"
"io"
"bufio"
)

func init() {
net2.RegisterPtotocol("length_based_frame", NewLengthBasedFrame(kDedaultReadBufferSize))
}

const (
kDedaultReadBufferSize = 1024
)
func NewLengthBasedFrame(readBuf int) net2.Protocol {
if readBuf <=0 {
readBuf = kDedaultReadBufferSize
}

return &LengthBasedFrame{
readBuf: readBuf,
}
}

type LengthBasedFrame struct {
readBuf int
}

func (b *LengthBasedFrame) NewCodec(rw io.ReadWriter) (cc net2.Codec, err error) {
codec := new(LengthBasedFrameCodec)

codec.stream.w = rw.(io.Writer)
codec.stream.r = bufio.NewReaderSize(rw, b.readBuf)
codec.stream.c = rw.(io.Closer)

return codec, nil
}

type LengthBasedFrameStream struct {
w io.Writer
r *bufio.Reader
c io.Closer
}

func (s *LengthBasedFrameStream) close() error {
if s.c != nil {
return s.c.Close()
}
return nil
}

type LengthBasedFrameCodec struct {
stream LengthBasedFrameStream
}

func (c *LengthBasedFrameCodec) Send(msg interface{}) error {
buf := []byte(msg.(string))

if _, err := c.stream.w.Write(buf); err != nil {
return err
}

return nil
}

func (c *LengthBasedFrameCodec) Receive() (interface{}, error) {
line, err := c.stream.r.ReadString('\n');
return line, err
}

func (c *LengthBasedFrameCodec) Close() error {
return c.stream.close()
}
68 changes: 11 additions & 57 deletions baselib/net2/examples/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,62 +21,12 @@ import (
"github.com/nebulaim/telegramd/baselib/net2"
"github.com/golang/glog"
"net"
"bufio"
"io"
"github.com/nebulaim/telegramd/baselib/app"
)

const (
kDedaultReadBufferSize = 1024
"github.com/nebulaim/telegramd/baselib/net2/codec"
)

func init() {
net2.RegisterPtotocol("echo", NewEcho(kDedaultReadBufferSize))
}

func NewEcho(bufLen int) net2.Protocol {
if bufLen <= 0 {
bufLen = kDedaultReadBufferSize
}

return &Echo{
readBuf: bufLen,
}
}

type Echo struct {
readBuf int
}

func (b *Echo) NewCodec(rw io.ReadWriter) (net2.Codec, error) {
codec := new(EchoCodec)
codec.conn = rw.(*net.TCPConn)
codec.r = bufio.NewReaderSize(rw, b.readBuf)
return codec, nil
}

type EchoCodec struct {
conn *net.TCPConn
r *bufio.Reader
}

func (c *EchoCodec) Send(msg interface{}) error {
buf := []byte(msg.(string))

if _, err := c.conn.Write(buf); err != nil {
return err
}

return nil
}

func (c *EchoCodec) Receive() (interface{}, error) {
line, err := c.r.ReadString('\n');
return line, err
}

func (c *EchoCodec) Close() error {
return c.conn.Close()
net2.RegisterPtotocol("echo", codec.NewLengthBasedFrame(1024))
}

type EchoServer struct {
Expand All @@ -103,7 +53,7 @@ func (s *EchoServer) OnNewConnection(conn *net2.TcpConnection) {
}

func (s *EchoServer) OnDataArrived(conn *net2.TcpConnection, msg interface{}) error {
glog.Infof("OnDataArrived - %v", msg)
glog.Infof("echo_server recv peer(%v) data: %v", conn.RemoteAddr(), msg)
conn.Send(msg)
return nil
}
Expand All @@ -113,17 +63,17 @@ func (s *EchoServer) OnConnectionClosed(conn *net2.TcpConnection) {
}

type EchoClient struct {
client* net2.TcpClient
client *net2.TcpClientGroupManager
}

func NewEchoClient(protoName string) *EchoClient {
func NewEchoClient(protoName string, clients map[string][]string) *EchoClient {
//listener, err := net.Listen("tcp", "0.0.0.0:12345")
//if err != nil {
// glog.Fatalf("listen error: %v", err)
// // return
//}
c := &EchoClient{}
c.client = net2.NewTcpClient("", 1, protoName, "127.0.0.1:22345", c)
c.client = net2.NewTcpClientGroupManager(protoName, clients, c)
return c
}

Expand Down Expand Up @@ -162,7 +112,11 @@ func (this *EchoInsance) Initialize() error {
}

this.server = NewEchoServer(listener, "echo")
this.client = NewEchoClient("echo")

clients := map[string][]string{
"echo":[]string{"127.0.0.1:22345", "192.168.1.101:22345"},
}
this.client = NewEchoClient("echo", clients)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion baselib/net2/tcp_client_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewTcpClientGroupManager(protoName string, clients map[string][]string, cb
m := make(map[string]*TcpClient)

for _, address := range v {
client := NewTcpClient(k, 10 * 1024, group.protoName, address, group.callback)
client := NewTcpClient(k, 10*1024, group.protoName, address, group.callback)
if client != nil {
m[address] = client
}
Expand Down

0 comments on commit 5b65a6d

Please sign in to comment.