Skip to content

Commit

Permalink
record: normalize decoding and encoding of segment paths (bluenviron#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Dec 2, 2023
1 parent 5da2ded commit 7c8e593
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 210 deletions.
21 changes: 11 additions & 10 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
for _, pa := range paths {
if pa.Record && pa.RecordDeleteAfter != 0 {
entry := record.CleanerEntry{
RecordPath: pa.RecordPath,
RecordFormat: pa.RecordFormat,
RecordDeleteAfter: time.Duration(pa.RecordDeleteAfter),
SegmentPathFormat: pa.RecordPath,
Format: pa.RecordFormat,
DeleteAfter: time.Duration(pa.RecordDeleteAfter),
}
out[entry] = struct{}{}
}
Expand All @@ -57,10 +57,10 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
}

sort.Slice(out2, func(i, j int) bool {
if out2[i].RecordPath != out2[j].RecordPath {
return out2[i].RecordPath < out2[j].RecordPath
if out2[i].SegmentPathFormat != out2[j].SegmentPathFormat {
return out2[i].SegmentPathFormat < out2[j].SegmentPathFormat
}
return out2[i].RecordDeleteAfter < out2[j].RecordDeleteAfter
return out2[i].DeleteAfter < out2[j].DeleteAfter
})

return out2
Expand Down Expand Up @@ -295,10 +295,11 @@ func (p *Core) createResources(initial bool) error {
cleanerEntries := gatherCleanerEntries(p.conf.Paths)
if len(cleanerEntries) != 0 &&
p.recordCleaner == nil {
p.recordCleaner = record.NewCleaner(
cleanerEntries,
p,
)
p.recordCleaner = &record.Cleaner{
Entries: cleanerEntries,
Parent: p,
}
p.recordCleaner.Initialize()
}

if p.pathManager == nil {
Expand Down
14 changes: 7 additions & 7 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,13 +897,13 @@ func (pa *path) setNotReady() {

func (pa *path) startRecording() {
pa.recordAgent = &record.Agent{
WriteQueueSize: pa.writeQueueSize,
RecordPath: pa.conf.RecordPath,
Format: pa.conf.RecordFormat,
PartDuration: time.Duration(pa.conf.RecordPartDuration),
SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration),
PathName: pa.name,
Stream: pa.stream,
WriteQueueSize: pa.writeQueueSize,
SegmentPathFormat: pa.conf.RecordPath,
Format: pa.conf.RecordFormat,
PartDuration: time.Duration(pa.conf.RecordPartDuration),
SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration),
PathName: pa.name,
Stream: pa.stream,
OnSegmentCreate: func(segmentPath string) {
if pa.conf.RunOnRecordSegmentCreate != "" {
env := pa.externalCmdEnv()
Expand Down
8 changes: 4 additions & 4 deletions internal/record/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)

// Agent is a record agent.
// Agent writes recordings to disk.
type Agent struct {
WriteQueueSize int
RecordPath string
SegmentPathFormat string
Format conf.RecordFormat
PartDuration time.Duration
SegmentDuration time.Duration
Expand Down Expand Up @@ -47,7 +47,7 @@ func (w *Agent) Initialize() {
w.done = make(chan struct{})

w.currentInstance = &agentInstance{
wrapper: w,
agent: w,
}
w.currentInstance.initialize()

Expand Down Expand Up @@ -85,7 +85,7 @@ func (w *Agent) run() {
}

w.currentInstance = &agentInstance{
wrapper: w,
agent: w,
}
w.currentInstance.initialize()
}
Expand Down
28 changes: 15 additions & 13 deletions internal/record/agent_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,35 @@ type sample struct {
}

type agentInstance struct {
wrapper *Agent
agent *Agent

resolvedPath string
writer *asyncwriter.Writer
format format
segmentPathFormat string
writer *asyncwriter.Writer
format format

terminate chan struct{}
done chan struct{}
}

func (a *agentInstance) initialize() {
a.resolvedPath = strings.ReplaceAll(a.wrapper.RecordPath, "%path", a.wrapper.PathName)
a.segmentPathFormat = a.agent.SegmentPathFormat

switch a.wrapper.Format {
a.segmentPathFormat = strings.ReplaceAll(a.segmentPathFormat, "%path", a.agent.PathName)

switch a.agent.Format {
case conf.RecordFormatMPEGTS:
a.resolvedPath += ".ts"
a.segmentPathFormat += ".ts"

default:
a.resolvedPath += ".mp4"
a.segmentPathFormat += ".mp4"
}

a.terminate = make(chan struct{})
a.done = make(chan struct{})

a.writer = asyncwriter.New(a.wrapper.WriteQueueSize, a.wrapper)
a.writer = asyncwriter.New(a.agent.WriteQueueSize, a.agent)

switch a.wrapper.Format {
switch a.agent.Format {
case conf.RecordFormatMPEGTS:
a.format = &formatMPEGTS{
a: a,
Expand Down Expand Up @@ -75,11 +77,11 @@ func (a *agentInstance) run() {

select {
case err := <-a.writer.Error():
a.wrapper.Log(logger.Error, err.Error())
a.wrapper.Stream.RemoveReader(a.writer)
a.agent.Log(logger.Error, err.Error())
a.agent.Stream.RemoveReader(a.writer)

case <-a.terminate:
a.wrapper.Stream.RemoveReader(a.writer)
a.agent.Stream.RemoveReader(a.writer)
a.writer.Stop()
}

Expand Down
30 changes: 15 additions & 15 deletions internal/record/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ func TestAgent(t *testing.T) {
}

w := &Agent{
WriteQueueSize: 1024,
RecordPath: recordPath,
Format: f,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
WriteQueueSize: 1024,
SegmentPathFormat: recordPath,
Format: f,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
OnSegmentCreate: func(fpath string) {
segCreated <- struct{}{}
},
Expand Down Expand Up @@ -266,14 +266,14 @@ func TestAgentFMP4NegativeDTS(t *testing.T) {
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")

w := &Agent{
WriteQueueSize: 1024,
RecordPath: recordPath,
Format: conf.RecordFormatFMP4,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
Parent: &nilLogger{},
WriteQueueSize: 1024,
SegmentPathFormat: recordPath,
Format: conf.RecordFormatFMP4,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
Parent: &nilLogger{},
}
w.Initialize()

Expand Down
66 changes: 28 additions & 38 deletions internal/record/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,28 @@ func commonPath(v string) string {

// CleanerEntry is a cleaner entry.
type CleanerEntry struct {
RecordPath string
RecordFormat conf.RecordFormat
RecordDeleteAfter time.Duration
SegmentPathFormat string
Format conf.RecordFormat
DeleteAfter time.Duration
}

// Cleaner removes expired recording segments from disk.
type Cleaner struct {
Entries []CleanerEntry
Parent logger.Writer

ctx context.Context
ctxCancel func()
entries []CleanerEntry
parent logger.Writer

done chan struct{}
}

// NewCleaner allocates a Cleaner.
func NewCleaner(
entries []CleanerEntry,
parent logger.Writer,
) *Cleaner {
ctx, ctxCancel := context.WithCancel(context.Background())

c := &Cleaner{
ctx: ctx,
ctxCancel: ctxCancel,
entries: entries,
parent: parent,
done: make(chan struct{}),
}
// Initialize initializes a Cleaner.
func (c *Cleaner) Initialize() {
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.done = make(chan struct{})

go c.run()

return c
}

// Close closes the Cleaner.
Expand All @@ -84,16 +73,16 @@ func (c *Cleaner) Close() {

// Log is the main logging function.
func (c *Cleaner) Log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[record cleaner]"+format, args...)
c.Parent.Log(level, "[record cleaner]"+format, args...)
}

func (c *Cleaner) run() {
defer close(c.done)

interval := 30 * 60 * time.Second
for _, e := range c.entries {
if interval > (e.RecordDeleteAfter / 2) {
interval = e.RecordDeleteAfter / 2
for _, e := range c.Entries {
if interval > (e.DeleteAfter / 2) {
interval = e.DeleteAfter / 2
}
}

Expand All @@ -111,27 +100,27 @@ func (c *Cleaner) run() {
}

func (c *Cleaner) doRun() {
for _, e := range c.entries {
for _, e := range c.Entries {
c.doRunEntry(&e) //nolint:errcheck
}
}

func (c *Cleaner) doRunEntry(e *CleanerEntry) error {
recordPath := e.RecordPath
segmentPathFormat := e.SegmentPathFormat

// we have to convert to absolute paths
// otherwise, commonPath and fpath inside Walk() won't have common elements
recordPath, _ = filepath.Abs(recordPath)

switch e.RecordFormat {
switch e.Format {
case conf.RecordFormatMPEGTS:
recordPath += ".ts"
segmentPathFormat += ".ts"

default:
recordPath += ".mp4"
segmentPathFormat += ".mp4"
}

commonPath := commonPath(recordPath)
// we have to convert to absolute paths
// otherwise, commonPath and fpath inside Walk() won't have common elements
segmentPathFormat, _ = filepath.Abs(segmentPathFormat)

commonPath := commonPath(segmentPathFormat)
now := timeNow()

filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { //nolint:errcheck
Expand All @@ -140,9 +129,10 @@ func (c *Cleaner) doRunEntry(e *CleanerEntry) error {
}

if !info.IsDir() {
params := decodeRecordPath(recordPath, fpath)
if params != nil {
if now.Sub(params.time) > e.RecordDeleteAfter {
var pa segmentPath
ok := pa.decode(segmentPathFormat, fpath)
if ok {
if now.Sub(pa.time) > e.DeleteAfter {
c.Log(logger.Debug, "removing %s", fpath)
os.Remove(fpath)
}
Expand Down
15 changes: 8 additions & 7 deletions internal/record/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ func TestCleaner(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, "_-+*?^$()[]{}|_mypath", "2009-05-20_22-15-25-000427.mp4"), []byte{1}, 0o644)
require.NoError(t, err)

c := NewCleaner(
[]CleanerEntry{{
RecordPath: recordPath,
RecordFormat: conf.RecordFormatFMP4,
RecordDeleteAfter: 10 * time.Second,
c := &Cleaner{
Entries: []CleanerEntry{{
SegmentPathFormat: recordPath,
Format: conf.RecordFormatFMP4,
DeleteAfter: 10 * time.Second,
}},
nilLogger{},
)
Parent: nilLogger{},
}
c.Initialize()
defer c.Close()

time.Sleep(500 * time.Millisecond)
Expand Down
Loading

0 comments on commit 7c8e593

Please sign in to comment.