Skip to content

Commit

Permalink
rtmp: add HandleConn / OnPlayOrPublish / rxbytes,txbytes
Browse files Browse the repository at this point in the history
  • Loading branch information
nareix committed Aug 31, 2016
1 parent 3d9cae6 commit 8802fb5
Showing 1 changed file with 83 additions and 21 deletions.
104 changes: 83 additions & 21 deletions format/rtmp/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,26 @@ type Server struct {
Addr string
HandlePublish func(*Conn)
HandlePlay func(*Conn)
HandleConn func(*Conn)
}

func (self *Server) handleConn(conn *Conn) (err error) {
if err = conn.prepare(stageCommandDone, 0); err != nil {
return
}

if conn.playing {
if self.HandlePlay != nil {
self.HandlePlay(conn)
}
} else if conn.publishing {
if self.HandlePublish != nil {
self.HandlePublish(conn)
if self.HandleConn != nil {
self.HandleConn(conn)
} else {
if err = conn.prepare(stageCommandDone, 0); err != nil {
return
}
}

if err = conn.Close(); err != nil {
return
if conn.playing {
if self.HandlePlay != nil {
self.HandlePlay(conn)
}
} else if conn.publishing {
if self.HandlePublish != nil {
self.HandlePublish(conn)
}
}
}

return
Expand Down Expand Up @@ -136,17 +137,22 @@ const (

type Conn struct {
URL *url.URL
OnPlayOrPublish func(string,flvio.AMFMap) error

prober *flv.Prober
streams []av.CodecData

txbytes uint64
rxbytes uint64

bufr *bufio.Reader
bufw *bufio.Writer
readn uint32
ackn uint32
writebuf []byte
readbuf []byte

netconn net.Conn
txrxcount *txrxcount

writeMaxChunkSize int
readMaxChunkSize int
Expand Down Expand Up @@ -177,6 +183,24 @@ type Conn struct {
eventtype uint16
}

type txrxcount struct {
io.ReadWriter
txbytes uint64
rxbytes uint64
}

func (self *txrxcount) Read(p []byte) (int, error) {
n, err := self.ReadWriter.Read(p)
self.rxbytes += uint64(n)
return n, err
}

func (self *txrxcount) Write(p []byte) (int, error) {
n, err := self.ReadWriter.Write(p)
self.txbytes += uint64(n)
return n, err
}

func NewConn(netconn net.Conn) *Conn {
conn := &Conn{}
conn.prober = &flv.Prober{}
Expand All @@ -186,6 +210,7 @@ func NewConn(netconn net.Conn) *Conn {
conn.writeMaxChunkSize = 128
conn.bufr = bufio.NewReaderSize(netconn, pio.RecommendBufioSize)
conn.bufw = bufio.NewWriterSize(netconn, pio.RecommendBufioSize)
conn.txrxcount = &txrxcount{ReadWriter: netconn}
conn.writebuf = make([]byte, 4096)
conn.readbuf = make([]byte, 4096)
return conn
Expand Down Expand Up @@ -228,6 +253,18 @@ const (
eventtypeStreamIsRecorded = 4
)

func (self *Conn) NetConn() net.Conn {
return self.netconn
}

func (self *Conn) TxBytes() uint64 {
return self.txrxcount.txbytes
}

func (self *Conn) RxBytes() uint64 {
return self.txrxcount.rxbytes
}

func (self *Conn) Close() (err error) {
return self.netconn.Close()
}
Expand Down Expand Up @@ -366,6 +403,7 @@ func (self *Conn) readConnect() (err error) {
if ok {
tcurl, _ = _tcurl.(string)
}
connectparams := self.commandobj

if err = self.writeBasicConf(); err != nil {
return
Expand Down Expand Up @@ -421,13 +459,28 @@ func (self *Conn) readConnect() (err error) {
}
publishpath, _ := self.commandparams[0].(string)

var cberr error
if self.OnPlayOrPublish != nil {
cberr = self.OnPlayOrPublish(self.commandname, connectparams)
}

var code string
var description string
if cberr != nil {
code = "NetStream.Publish.Failed"
description = cberr.Error()
} else {
code = "NetStream.Publish.Start"
description = "Start publishing"
}

// > onStatus()
if err = self.writeCommandMsg(5, self.avmsgsid,
"onStatus", self.commandtransid, nil,
flvio.AMFMap{
"level": "status",
"code": "NetStream.Publish.Start",
"description": "Start publishing",
"code": code,
"description": description,
},
); err != nil {
return
Expand All @@ -436,6 +489,11 @@ func (self *Conn) readConnect() (err error) {
return
}

if cberr != nil {
err = fmt.Errorf("rtmp: OnPlayOrPublish check failed")
return
}

self.URL = createURL(tcurl, connectpath, publishpath)
self.publishing = true
self.reading = true
Expand Down Expand Up @@ -752,6 +810,10 @@ func (self *Conn) ReadPacket() (pkt av.Packet, err error) {
return
}

func (self *Conn) Prepare() (err error) {
return self.prepare(stageCommandDone, 0)
}

func (self *Conn) prepare(stage int, flags int) (err error) {
for self.stage < stage {
switch self.stage {
Expand Down Expand Up @@ -1244,12 +1306,12 @@ func (self *Conn) readChunk() (err error) {
}
}

self.readn += uint32(n)
if self.readAckSize != 0 && self.readn > self.readAckSize {
if err = self.writeAck(self.readn); err != nil {
self.ackn += uint32(n)
if self.readAckSize != 0 && self.ackn > self.readAckSize {
if err = self.writeAck(self.ackn); err != nil {
return
}
self.readn = 0
self.ackn = 0
}

return
Expand Down

0 comments on commit 8802fb5

Please sign in to comment.