Skip to content

Commit

Permalink
Merge pull request #73 from joestarzxh/master
Browse files Browse the repository at this point in the history
gb 兼容ssrc为0的情况
  • Loading branch information
ZSC714725 authored Apr 16, 2024
2 parents 382cde9 + a4f6ad7 commit 9eecf3c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 13 deletions.
6 changes: 3 additions & 3 deletions gb28181/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,15 @@ func (channel *Channel) Invite(opt *InviteOptions, conf config.GB28181Config, st
}
opt.CreateSSRC(channel.serial, channel.number)

protocol := ""
nazalog.Info("networkType:", playInfo.NetWork)
var mediaserver *mediaserver.GB28181MediaServer
if channel.observer != nil {
mediaserver = channel.observer.OnStartMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId)
}
if mediaserver == nil {
return http.StatusNotFound, err
}

protocol := ""
if playInfo.NetWork == "tcp" {
opt.MediaPort = mediaserver.GetListenerPort()
protocol = "TCP/"
Expand Down Expand Up @@ -220,14 +220,14 @@ func (channel *Channel) Invite(opt *InviteOptions, conf config.GB28181Config, st
nazalog.Info("Device support tcp")
} else {
nazalog.Info("Device not support tcp")
playInfo.NetWork = "udp"
}
}
}
}
channel.MediaInfo.IsInvite = true
channel.MediaInfo.Ssrc = opt.SSRC
channel.MediaInfo.StreamName = streamName
channel.MediaInfo.MediaKey = fmt.Sprintf("%s%d", playInfo.NetWork, mediaserver.GetListenerPort())

ackReq := sip.NewAckRequest("", invite, inviteRes, "", nil)
//保存一下播放信息
Expand Down
24 changes: 18 additions & 6 deletions gb28181/mediaserver/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Conn struct {
psDumpFile *base.DumpFile

buffer *bytes.Buffer
key string
}

func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn {
Expand All @@ -63,7 +64,9 @@ func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn {

return c
}

func (c *Conn) SetKey(key string) {
c.key = key
}
func (c *Conn) Serve() (err error) {
defer func() {
nazalog.Info("conn close, err:", err)
Expand Down Expand Up @@ -116,12 +119,21 @@ func (c *Conn) Serve() (err error) {
}

if !c.check && c.observer != nil {
mediaInfo, ok := c.observer.CheckSsrc(pkt.SSRC)
if !ok {
nazalog.Error("invalid ssrc:", pkt.SSRC)
return fmt.Errorf("invalid ssrc:%d", pkt.SSRC)
var mediaInfo *MediaInfo
var ok bool
if pkt.SSRC != 0 {
mediaInfo, ok = c.observer.CheckSsrc(pkt.SSRC)
if !ok {
nazalog.Error("invalid ssrc:", pkt.SSRC)
return fmt.Errorf("invalid ssrc:%d", pkt.SSRC)
}
} else {
mediaInfo, ok = c.observer.GetMediaInfoByKey(c.key)
if !ok {
nazalog.Error("get mediaInfo :", c.key)
return fmt.Errorf("get mediaInfo:%d", c.key)
}
}

c.check = true
c.streamName = mediaInfo.StreamName
if len(mediaInfo.DumpFileName) > 0 {
Expand Down
1 change: 1 addition & 0 deletions gb28181/mediaserver/mediaserver_t.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type MediaInfo struct {
StreamName string
SinglePort bool
DumpFileName string
MediaKey string
}

func (m *MediaInfo) Clear() (err error) {
Expand Down
6 changes: 5 additions & 1 deletion gb28181/mediaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type IGbObserver interface {
CheckSsrc(ssrc uint32) (*MediaInfo, bool)
GetMediaInfoByKey(key string) (*MediaInfo, bool)
NotifyClose(streamName string)
}

Expand All @@ -23,13 +24,15 @@ type GB28181MediaServer struct {

disposeOnce sync.Once
observer IGbObserver
mediaKey string
}

func NewGB28181MediaServer(listenPort int, observer IGbObserver, lal logic.ILalServer) *GB28181MediaServer {
func NewGB28181MediaServer(listenPort int, mediaKey string, observer IGbObserver, lal logic.ILalServer) *GB28181MediaServer {
return &GB28181MediaServer{
listenPort: listenPort,
lalServer: lal,
observer: observer,
mediaKey: mediaKey,
}
}
func (s *GB28181MediaServer) GetListenerPort() uint16 {
Expand All @@ -55,6 +58,7 @@ func (s *GB28181MediaServer) Start(listener net.Listener) (err error) {
}

c := NewConn(conn, s.observer, s.lalServer)
c.SetKey(s.mediaKey)
go c.Serve()
}
}()
Expand Down
44 changes: 41 additions & 3 deletions gb28181/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,16 @@ func (s *GB28181Server) OnStartMediaServer(netWork string, singlePort bool, devi
var port uint16
if mediasvr == nil {
if singlePort {
mediasvr = mediaserver.NewGB28181MediaServer(int(s.conf.MediaConfig.ListenPort), s, s.lalServer)
if isTcpFlag {
mediasvr = mediaserver.NewGB28181MediaServer(int(s.conf.MediaConfig.ListenPort), fmt.Sprintf("%s%d", "tcp", s.conf.MediaConfig.ListenPort), s, s.lalServer)
listener, err = s.tcpAvailConnPool.ListenWithPort(s.conf.MediaConfig.ListenPort)
if err != nil {
nazalog.Error("gb28181 media server tcp Listen failed:%s", err.Error())
return nil
}
s.MediaServerMap.Store(fmt.Sprintf("%s%d", "tcp", s.conf.MediaConfig.ListenPort), mediasvr)
} else {
mediasvr = mediaserver.NewGB28181MediaServer(int(s.conf.MediaConfig.ListenPort), fmt.Sprintf("%s%d", "udp", s.conf.MediaConfig.ListenPort), s, s.lalServer)
listener, err = s.udpAvailConnPool.ListenWithPort(s.conf.MediaConfig.ListenPort)
if err != nil {
nazalog.Error("gb28181 media server udp Listen failed:%s", err.Error())
Expand All @@ -193,20 +194,23 @@ func (s *GB28181Server) OnStartMediaServer(netWork string, singlePort bool, devi
s.MediaServerMap.Store(fmt.Sprintf("%s%d", "udp", s.conf.MediaConfig.ListenPort), mediasvr)
}
} else {
mediaKey := ""
if isTcpFlag {
listener, port, err = s.tcpAvailConnPool.Acquire()
if err != nil {
nazalog.Error("gb28181 media server tcp acquire failed:%s", err.Error())
return nil
}
mediaKey = fmt.Sprintf("%s%d", "tcp", port)
} else {
listener, port, err = s.udpAvailConnPool.Acquire()
if err != nil {
nazalog.Error("gb28181 media server udp acquire failed:%s", err.Error())
return nil
}
mediaKey = fmt.Sprintf("%s%d", "tcp", port)
}
mediasvr = mediaserver.NewGB28181MediaServer(int(port), s, s.lalServer)
mediasvr = mediaserver.NewGB28181MediaServer(int(port), mediaKey, s, s.lalServer)
s.MediaServerMap.Store(fmt.Sprintf("%s%s", deviceId, channelId), mediasvr)
}
go mediasvr.Start(listener)
Expand Down Expand Up @@ -264,6 +268,9 @@ func (s *GB28181Server) CheckSsrc(ssrc uint32) (*mediaserver.MediaInfo, bool) {
}
return true
})
if isValidSsrc {
return false
}
return true
})

Expand All @@ -273,8 +280,36 @@ func (s *GB28181Server) CheckSsrc(ssrc uint32) (*mediaserver.MediaInfo, bool) {

return nil, false
}
func (s *GB28181Server) GetMediaInfoByKey(key string) (*mediaserver.MediaInfo, bool) {
var isValidMediaInfo bool
var mediaInfo *mediaserver.MediaInfo

Devices.Range(func(_, value any) bool {
d := value.(*Device)
d.channelMap.Range(func(_, value any) bool {
ch := value.(*Channel)
if ch.MediaInfo.MediaKey == key {
isValidMediaInfo = true
mediaInfo = &ch.MediaInfo
return false
}
return true
})
if isValidMediaInfo {
return false
}
return true
})

if isValidMediaInfo {
return mediaInfo, true
}

return nil, false
}

func (s *GB28181Server) NotifyClose(streamName string) {
var ok bool
Devices.Range(func(_, value any) bool {
d := value.(*Device)
d.channelMap.Range(func(key, value any) bool {
Expand All @@ -284,11 +319,14 @@ func (s *GB28181Server) NotifyClose(streamName string) {
ch.Bye(streamName)
}
ch.MediaInfo.Clear()
ok = true
return false
}
return true
})

if ok {
return false
}
return true
})
}
Expand Down

0 comments on commit 9eecf3c

Please sign in to comment.