Skip to content

Commit

Permalink
pubsub.Queue: support WriteHeader()
Browse files Browse the repository at this point in the history
  • Loading branch information
nareix committed Aug 31, 2016
1 parent 46d5df0 commit 3d9cae6
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions av/pubsub/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,13 @@ type Queue struct {
closed bool
}

func NewQueue(streams []av.CodecData) *Queue {
func NewQueue() *Queue {
q := &Queue{}
q.buf = pktque.NewBuf()
q.streams = streams
q.maxdur = time.Second*10
q.lock = &sync.RWMutex{}
q.cond = sync.NewCond(q.lock.RLocker())
q.videoidx = -1
for i, stream := range streams {
if stream.Type().IsVideo() {
q.videoidx = i
}
}
return q
}

Expand All @@ -55,6 +49,20 @@ func (self *Queue) SetMaxSize(size int) {
return
}

func (self *Queue) WriteHeader(streams []av.CodecData) {
self.lock.Lock()

self.streams = streams
for i, stream := range streams {
if stream.Type().IsVideo() {
self.videoidx = i
}
}
self.cond.Broadcast()

self.lock.Unlock()
}

// After Close() called, all QueueCursor's ReadPacket will return io.EOF.
func (self *Queue) Close() (err error) {
self.lock.Lock()
Expand Down Expand Up @@ -146,9 +154,16 @@ func (self *Queue) DelayedGopCount(n int) *QueueCursor {
}

func (self *QueueCursor) Streams() (streams []av.CodecData, err error) {
self.que.lock.RLock()
streams = self.que.streams
self.que.lock.RUnlock()
self.que.cond.L.Lock()
for self.que.streams == nil && !self.que.closed {
self.que.cond.Wait()
}
if self.que.streams != nil {
streams = self.que.streams
} else {
err = io.EOF
}
self.que.cond.L.Unlock()
return
}

Expand Down

0 comments on commit 3d9cae6

Please sign in to comment.