From 561eedeb982427ab5dae39d38089a273ceb30fab Mon Sep 17 00:00:00 2001 From: nareix Date: Fri, 29 Jul 2016 16:16:59 +0800 Subject: [PATCH] example add http_flv_and_rtmp_server --- examples/http_flv_and_rtmp_server/main.go | 104 ++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 examples/http_flv_and_rtmp_server/main.go diff --git a/examples/http_flv_and_rtmp_server/main.go b/examples/http_flv_and_rtmp_server/main.go new file mode 100644 index 00000000..dabb33af --- /dev/null +++ b/examples/http_flv_and_rtmp_server/main.go @@ -0,0 +1,104 @@ +package main + +import ( + "sync" + "io" + "time" + "net/http" + "github.com/nareix/joy4/format" + "github.com/nareix/joy4/av/avutil" + "github.com/nareix/joy4/av/pubsub" + "github.com/nareix/joy4/format/rtmp" + "github.com/nareix/joy4/format/flv" +) + +func init() { + format.RegisterAll() +} + +type writeFlusher struct { + httpflusher http.Flusher + io.Writer +} + +func (self writeFlusher) Flush() error { + self.httpflusher.Flush() + return nil +} + +func main() { + server := &rtmp.Server{} + + l := &sync.RWMutex{} + type Channel struct { + que *pubsub.Queue + } + channels := map[string]*Channel{} + + server.HandlePlay = func(conn *rtmp.Conn) { + l.RLock() + ch := channels[conn.URL.Path] + l.RUnlock() + + if ch != nil { + cursor := ch.que.Latest() + avutil.CopyFile(conn, cursor) + } + } + + server.HandlePublish = func(conn *rtmp.Conn) { + streams, _ := conn.Streams() + + l.Lock() + ch := channels[conn.URL.Path] + if ch == nil { + ch = &Channel{} + ch.que = pubsub.NewQueue(streams) + ch.que.SetMaxDuration(time.Minute) + channels[conn.URL.Path] = ch + } else { + ch = nil + } + l.Unlock() + if ch == nil { + return + } + + avutil.CopyPackets(ch.que, conn) + + l.Lock() + delete(channels, conn.URL.Path) + l.Unlock() + ch.que.Close() + } + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + l.RLock() + ch := channels[r.URL.Path] + l.RUnlock() + + if ch != nil { + w.Header().Set("Content-Type", "video/x-flv") + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(200) + flusher := w.(http.Flusher) + flusher.Flush() + + muxer := flv.NewMuxerWriteFlusher(writeFlusher{httpflusher: flusher, Writer: w}) + cursor := ch.que.Latest() + + avutil.CopyFile(muxer, cursor) + } else { + http.NotFound(w, r) + } + }) + + go http.ListenAndServe(":8089", nil) + + server.ListenAndServe() + + // ffmpeg -re -i movie.flv -c copy -f flv rtmp://localhost/movie + // ffmpeg -f avfoundation -i "0:0" .... -f flv rtmp://localhost/screen + // ffplay http://localhost:8089/movie + // ffplay http://localhost:8089/screen +}