Skip to content

Commit

Permalink
implement native recording (bluenviron#1399) (bluenviron#2255)
Browse files Browse the repository at this point in the history
* implement native recording (bluenviron#1399)

* support saving VP9 tracks

* support saving MPEG-1 audio tracks

* switch segment when codec parameters change

* allow to disable recording on a path basis

* allow disabling recording cleaner

* support recording MPEG-1/2/4 video tracks

* add microseconds to file names

* add tests
  • Loading branch information
aler9 authored Sep 16, 2023
1 parent b7e7758 commit 73ddb21
Show file tree
Hide file tree
Showing 22 changed files with 1,685 additions and 51 deletions.
27 changes: 16 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ And can be read from the server with:
* Read live streams from the server
* Streams are automatically converted from a protocol to another. For instance, it's possible to publish a stream with RTSP and read it with HLS
* Serve multiple streams at once in separate paths
* Record streams to disk
* Authenticate users; use internal or external authentication
* Redirect readers to other RTSP servers (load balancing)
* Query and control the server through the API
Expand Down Expand Up @@ -106,7 +107,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Authentication](#authentication)
* [Encrypt the configuration](#encrypt-the-configuration)
* [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression)
* [Save streams to disk](#save-streams-to-disk)
* [Record streams to disk](#record-streams-to-disk)
* [Forward streams to another server](#forward-streams-to-another-server)
* [On-demand publishing](#on-demand-publishing)
* [Start on boot](#start-on-boot)
Expand Down Expand Up @@ -1136,21 +1137,25 @@ paths:
runOnReadyRestart: yes
```
### Save streams to disk
### Record streams to disk
To save available streams to disk, use _FFmpeg_ inside the `runOnReady` parameter:
To save available streams to disk, set the `record` and the `recordPath` parameter in the configuration file:

```yml
paths:
all:
runOnReady: >
ffmpeg -i rtsp://localhost:$RTSP_PORT/$MTX_PATH
-c copy
-f segment -strftime 1 -segment_time 60 -segment_format mpegts saved_%Y-%m-%d_%H-%M-%S.ts
runOnReadyRestart: yes
# Record streams to disk.
record: yes
# Path of recording segments.
# Extension is added automatically.
# Available variables are %path (path name), %Y %m %d %H %M %S %f (time in strftime format)
recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f
```

In the configuration above, streams are saved in MPEG-TS format, that is resilient to system crashes.
All available recording parameters are listed in the [sample configuration file](/mediamtx.yml).

Currently the server supports recording tracks encoded with the following codecs:

* Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video
* Audio: Opus, MPEG-4 Audio (AAC), MPEG-1 Audio (MP3)

### Forward streams to another server

Expand Down
16 changes: 16 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,20 @@ components:
srtAddress:
type: string

# Record
record:
type: boolean
recordPath:
type: string
recordFormat:
type: string
recordPartDuration:
type: string
recordSegmentDuration:
type: string
recordDeleteAfter:
type: string

# Paths
paths:
type: object
Expand All @@ -197,6 +211,8 @@ components:
type: string
maxReaders:
type: integer
record:
type: boolean

# Authentication
publishUser:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ require (
code.cloudfoundry.org/bytefmt v0.0.0
github.com/abema/go-mp4 v0.13.0
github.com/alecthomas/kong v0.8.0
github.com/aler9/writerseeker v1.1.0
github.com/bluenviron/gohlslib v1.0.2
github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9
github.com/bluenviron/mediacommon v1.2.0
github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260
github.com/datarhei/gosrt v0.5.4
github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.1
Expand All @@ -30,7 +31,6 @@ require (
)

require (
github.com/aler9/writerseeker v1.1.0 // indirect
github.com/asticode/go-astikit v0.30.0 // indirect
github.com/asticode/go-astits v1.13.0 // indirect
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/bluenviron/gohlslib v1.0.2 h1:LDA/CubL525e9rLWw+G/9GbFS6iXwozmOg8KJBT
github.com/bluenviron/gohlslib v1.0.2/go.mod h1:oam0wsI2XqcHLTG6NM8HRvxAQsa3hIA0MLRiTOE7CB8=
github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 h1:NIJRhT/AYhPNe1GdWqAhDsYOTo6/hvAz5pEe1Ss+NuE=
github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9/go.mod h1:aAHfFXD3LE19h86jUGJK7PpM1uwp9VFVReuH89ZlpuE=
github.com/bluenviron/mediacommon v1.2.0 h1:5tz92r2S4gPSiTlycepjXFZCgwGfVL2htCeVsoBac+U=
github.com/bluenviron/mediacommon v1.2.0/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM=
github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260 h1:s0LwQH/+cV2DdCcmqNXoIwpeoT94xnd2UtuvJEkjhiQ=
github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
Expand Down
21 changes: 21 additions & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ type Conf struct {
SRT bool `json:"srt"`
SRTAddress string `json:"srtAddress"`

// Record
Record bool `json:"record"`
RecordPath string `json:"recordPath"`
RecordFormat string `json:"recordFormat"`
RecordPartDuration StringDuration `json:"recordPartDuration"`
RecordSegmentDuration StringDuration `json:"recordSegmentDuration"`
RecordDeleteAfter StringDuration `json:"recordDeleteAfter"`

// Paths
Paths map[string]*PathConf `json:"paths"`
}
Expand Down Expand Up @@ -294,6 +302,12 @@ func (conf *Conf) Check() error {
}
}

// Record

if conf.RecordFormat != "fmp4" {
return fmt.Errorf("unsupported record format '%s'", conf.RecordFormat)
}

// do not add automatically "all", since user may want to
// initialize all paths through API or hot reloading.
if conf.Paths == nil {
Expand Down Expand Up @@ -379,6 +393,13 @@ func (conf *Conf) UnmarshalJSON(b []byte) error {
conf.SRT = true
conf.SRTAddress = ":8890"

// Record
conf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S"
conf.RecordFormat = "fmp4"
conf.RecordPartDuration = 100 * StringDuration(time.Millisecond)
conf.RecordSegmentDuration = 3600 * StringDuration(time.Second)
conf.RecordDeleteAfter = 24 * 3600 * StringDuration(time.Second)

type alias Conf
d := json.NewDecoder(bytes.NewReader(b))
d.DisallowUnknownFields()
Expand Down
1 change: 1 addition & 0 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PathConf struct {
SourceOnDemandStartTimeout StringDuration `json:"sourceOnDemandStartTimeout"`
SourceOnDemandCloseAfter StringDuration `json:"sourceOnDemandCloseAfter"`
MaxReaders int `json:"maxReaders"`
Record bool `json:"record"`

// Authentication
PublishUser Credential `json:"publishUser"`
Expand Down
31 changes: 31 additions & 0 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"reflect"
"time"

"github.com/alecthomas/kong"
"github.com/bluenviron/gortsplib/v4"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/bluenviron/mediamtx/internal/confwatcher"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/rlimit"
)

Expand All @@ -37,6 +39,7 @@ type Core struct {
externalCmdPool *externalcmd.Pool
metrics *metrics
pprof *pprof
recordCleaner *record.Cleaner
pathManager *pathManager
rtspServer *rtspServer
rtspsServer *rtspServer
Expand Down Expand Up @@ -237,6 +240,16 @@ func (p *Core) createResources(initial bool) error {
}
}

if p.conf.Record &&
p.conf.RecordDeleteAfter != 0 &&
p.recordCleaner == nil {
p.recordCleaner = record.NewCleaner(
p.conf.RecordPath,
time.Duration(p.conf.RecordDeleteAfter),
p,
)
}

if p.pathManager == nil {
p.pathManager = newPathManager(
p.conf.ExternalAuthenticationURL,
Expand All @@ -246,6 +259,10 @@ func (p *Core) createResources(initial bool) error {
p.conf.WriteTimeout,
p.conf.WriteQueueSize,
p.conf.UDPMaxPayloadSize,
p.conf.Record,
p.conf.RecordPath,
p.conf.RecordPartDuration,
p.conf.RecordSegmentDuration,
p.conf.Paths,
p.externalCmdPool,
p.metrics,
Expand Down Expand Up @@ -491,6 +508,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.ReadTimeout != p.conf.ReadTimeout ||
closeLogger

closeRecorderCleaner := newConf == nil ||
newConf.Record != p.conf.Record ||
newConf.RecordPath != p.conf.RecordPath ||
newConf.RecordDeleteAfter != p.conf.RecordDeleteAfter

closePathManager := newConf == nil ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
Expand All @@ -499,6 +521,10 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.WriteQueueSize != p.conf.WriteQueueSize ||
newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize ||
newConf.Record != p.conf.Record ||
newConf.RecordPath != p.conf.RecordPath ||
newConf.RecordPartDuration != p.conf.RecordPartDuration ||
newConf.RecordSegmentDuration != p.conf.RecordSegmentDuration ||
closeMetrics ||
closeLogger
if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
Expand Down Expand Up @@ -692,6 +718,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
p.pathManager = nil
}

if closeRecorderCleaner && p.recordCleaner != nil {
p.recordCleaner.Close()
p.recordCleaner = nil
}

if closePPROF && p.pprof != nil {
p.pprof.close()
p.pprof = nil
Expand Down
66 changes: 49 additions & 17 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/bluenviron/gortsplib/v4/pkg/description"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/stream"
)

Expand Down Expand Up @@ -171,26 +171,30 @@ type pathAPIPathsGetReq struct {
}

type path struct {
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
writeQueueSize int
udpMaxPayloadSize int
confName string
conf *conf.PathConf
name string
matches []string
wg *sync.WaitGroup
externalCmdPool *externalcmd.Pool
parent pathParent
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
writeQueueSize int
udpMaxPayloadSize int
record bool
recordPath string
recordPartDuration conf.StringDuration
recordSegmentDuration conf.StringDuration
confName string
conf *conf.PathConf
name string
matches []string
wg *sync.WaitGroup
externalCmdPool *externalcmd.Pool
parent pathParent

ctx context.Context
ctxCancel func()
confMutex sync.RWMutex
source source
stream *stream.Stream
recordAgent *record.Agent
readyTime time.Time
bytesReceived *uint64
readers map[reader]struct{}
describeRequestsOnHold []pathDescribeReq
readerAddRequestsOnHold []pathAddReaderReq
Expand Down Expand Up @@ -227,6 +231,10 @@ func newPath(
writeTimeout conf.StringDuration,
writeQueueSize int,
udpMaxPayloadSize int,
record bool,
recordPath string,
recordPartDuration conf.StringDuration,
recordSegmentDuration conf.StringDuration,
confName string,
cnf *conf.PathConf,
name string,
Expand All @@ -243,6 +251,10 @@ func newPath(
writeTimeout: writeTimeout,
writeQueueSize: writeQueueSize,
udpMaxPayloadSize: udpMaxPayloadSize,
record: record,
recordPath: recordPath,
recordPartDuration: recordPartDuration,
recordSegmentDuration: recordSegmentDuration,
confName: confName,
conf: cnf,
name: name,
Expand All @@ -252,7 +264,6 @@ func newPath(
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
bytesReceived: new(uint64),
readers: make(map[reader]struct{}),
onDemandStaticSourceReadyTimer: newEmptyTimer(),
onDemandStaticSourceCloseTimer: newEmptyTimer(),
Expand Down Expand Up @@ -754,7 +765,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
}
return mediasDescription(pa.stream.Desc().Medias)
}(),
BytesReceived: atomic.LoadUint64(pa.bytesReceived),
BytesReceived: func() uint64 {
if pa.stream == nil {
return 0
}
return pa.stream.BytesReceived()
}(),
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {
Expand Down Expand Up @@ -868,13 +884,24 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
pa.udpMaxPayloadSize,
desc,
allocateEncoder,
pa.bytesReceived,
logger.NewLimitedLogger(pa.source),
)
if err != nil {
return err
}

if pa.record && pa.conf.Record {
pa.recordAgent = record.NewAgent(
pa.writeQueueSize,
pa.recordPath,
time.Duration(pa.recordPartDuration),
time.Duration(pa.recordSegmentDuration),
pa.name,
pa.stream,
pa,
)
}

pa.readyTime = time.Now()

if pa.conf.RunOnReady != "" {
Expand Down Expand Up @@ -908,6 +935,11 @@ func (pa *path) setNotReady() {
pa.Log(logger.Info, "runOnReady command stopped")
}

if pa.recordAgent != nil {
pa.recordAgent.Close()
pa.recordAgent = nil
}

if pa.stream != nil {
pa.stream.Close()
pa.stream = nil
Expand Down
Loading

0 comments on commit 73ddb21

Please sign in to comment.