Skip to content

Commit

Permalink
Add RTMP support
Browse files Browse the repository at this point in the history
  • Loading branch information
fzdy1914 committed Jan 21, 2022
1 parent e8d36e4 commit 637cf74
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 20 deletions.
3 changes: 3 additions & 0 deletions configure/liveconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ServerCfg struct {
RedisPwd string `mapstructure:"redis_pwd"`
ReadTimeout int `mapstructure:"read_timeout"`
WriteTimeout int `mapstructure:"write_timeout"`
EnableTLSVerify bool `mapstructure:"enable_tls_verify"`
GopNum int `mapstructure:"gop_num"`
JWT JWT `mapstructure:"jwt"`
Server Applications `mapstructure:"server"`
Expand All @@ -71,6 +72,7 @@ var defaultConf = ServerCfg{
APIAddr: ":8090",
WriteTimeout: 10,
ReadTimeout: 10,
EnableTLSVerify: true,
GopNum: 1,
Server: Applications{{
Appname: "live",
Expand Down Expand Up @@ -127,6 +129,7 @@ func initDefault() {
pflag.Int("read_timeout", 10, "read time out")
pflag.Int("write_timeout", 10, "write time out")
pflag.Int("gop_num", 1, "gop num")
pflag.Bool("enable_tls_verify", true, "Use system root CA to verify RTMPS connection, set this flag to false on Windows")
pflag.Parse()
Config.BindPFlags(pflag.CommandLine)

Expand Down
14 changes: 7 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package main

import (
"fmt"
"net"
"path"
"runtime"
"time"

"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/api"
"github.com/gwuhaolin/livego/protocol/hls"
"github.com/gwuhaolin/livego/protocol/httpflv"
"github.com/gwuhaolin/livego/protocol/rtmp"
"net"
"path"
"runtime"
"time"

log "github.com/sirupsen/logrus"
)
Expand All @@ -37,10 +38,8 @@ func startHls() *hls.Server {
return hlsServer
}

var rtmpAddr string

func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) {
rtmpAddr = configure.Config.GetString("rtmp_addr")
rtmpAddr := configure.Config.GetString("rtmp_addr")

rtmpListen, err := net.Listen("tcp", rtmpAddr)
if err != nil {
Expand Down Expand Up @@ -88,6 +87,7 @@ func startHTTPFlv(stream *rtmp.RtmpStream) {

func startAPI(stream *rtmp.RtmpStream) {
apiAddr := configure.Config.GetString("api_addr")
rtmpAddr := configure.Config.GetString("rtmp_addr")

if apiAddr != "" {
opListen, err := net.Listen("tcp", apiAddr)
Expand Down
7 changes: 4 additions & 3 deletions protocol/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) {
return true
})
} else {
// Warning: The room should be in the "live/stream" format!
// Warning: The room should be in the "live/stream" format!
roomInfo, exists := (rtmpStream.GetStreams()).Load(room)
if exists == false {
res.Status = 404
Expand Down Expand Up @@ -299,12 +299,13 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) {
log.Debugf("rtmprelay start push %s from %s", remoteurl, localurl)
err = pullRtmprelay.Start()
if err != nil {
res.Status = 400
retString = fmt.Sprintf("push error=%v", err)
} else {
s.session[keyString] = pullRtmprelay
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url)
retString = fmt.Sprintf("<h1>pull url start %s ok</h1></br>", url)
}
res.Status = 400

res.Data = retString
log.Debugf("pull start return %s", retString)
}
Expand Down
5 changes: 4 additions & 1 deletion protocol/rtmp/cache/special.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,8 @@ func (specialCache *SpecialCache) Send(w av.WriteCloser) error {
if !specialCache.full {
return nil
}
return w.Write(specialCache.p)

// demux in hls will change p.Data, only send a copy here
newPacket := *specialCache.p
return w.Write(&newPacket)
}
52 changes: 44 additions & 8 deletions protocol/rtmp/core/conn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package core

import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"math/rand"
Expand All @@ -10,6 +12,7 @@ import (
"strings"

"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/amf"

log "github.com/sirupsen/logrus"
Expand All @@ -36,9 +39,9 @@ type ConnClient struct {
tcurl string
app string
title string
query string
curcmdName string
streamid uint32
isRTMPS bool
conn *Conn
encoder *amf.Encoder
decoder *amf.Decoder
Expand Down Expand Up @@ -221,9 +224,20 @@ func (connClient *ConnClient) Start(url string, method string) error {
}
connClient.app = ps[0]
connClient.title = ps[1]
connClient.query = u.RawQuery
connClient.tcurl = "rtmp://" + u.Host + "/" + connClient.app
port := ":1935"
if u.RawQuery != "" {
connClient.title += "?" + u.RawQuery
}
connClient.isRTMPS = strings.HasPrefix(url, "rtmps://")

var port string
if connClient.isRTMPS {
connClient.tcurl = "rtmps://" + u.Host + "/" + connClient.app
port = ":443"
} else {
connClient.tcurl = "rtmp://" + u.Host + "/" + connClient.app
port = ":1935"
}

host := u.Host
localIP := ":0"
var remoteIP string
Expand Down Expand Up @@ -256,10 +270,32 @@ func (connClient *ConnClient) Start(url string, method string) error {
log.Warning(err)
return err
}
conn, err := net.DialTCP("tcp", local, remote)
if err != nil {
log.Warning(err)
return err

var conn net.Conn
if connClient.isRTMPS {
var config tls.Config
if configure.Config.GetBool("enable_tls_verify") {
roots, err := x509.SystemCertPool()
if err != nil {
log.Warning(err)
return err
}
config.RootCAs = roots
} else {
config.InsecureSkipVerify = true
}

conn, err = tls.Dial("tcp", remoteIP, &config)
if err != nil {
log.Warning(err)
return err
}
} else {
conn, err = net.DialTCP("tcp", local, remote)
if err != nil {
log.Warning(err)
return err
}
}

log.Debug("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr())
Expand Down
3 changes: 2 additions & 1 deletion protocol/rtmp/rtmprelay/rtmprelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package rtmprelay
import (
"bytes"
"fmt"
"github.com/gwuhaolin/livego/av"
"io"

"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/protocol/amf"
"github.com/gwuhaolin/livego/protocol/rtmp/core"

Expand Down Expand Up @@ -62,6 +62,7 @@ func (self *RtmpRelay) rcvPlayChunkStream() {
log.Debugf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err)
case 18:
log.Debug("rcvPlayRtmpMediaPacket: metadata....")
self.cs_chan <- rc
case 8, 9:
self.cs_chan <- rc
}
Expand Down

0 comments on commit 637cf74

Please sign in to comment.