Skip to content

Commit

Permalink
broadcast: Calling video comparison to improve the security
Browse files Browse the repository at this point in the history
Calling video comparison functions on results received from different Os.
  • Loading branch information
oscar-davids authored Mar 15, 2022
1 parent 3abe796 commit 10705d2
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#### Broadcaster
- \#2296 Increase orchestrator discovery timeout from `500ms` to `1` (@leszko)
- \#2291 Calling video comparison to improve the security strength (@oscar-davids)

#### Orchestrator
- \#2284 Fix issue with not redeeming tickets by Redeemer (@leszko)
Expand Down
23 changes: 23 additions & 0 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,13 @@ func (bsm *BroadcastSessionsManager) chooseResults(ctx context.Context, submitRe
trustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl, err)
return nil, nil, err
}
// download trusted video segment
trustedSegm, err := drivers.GetSegmentData(ctx, trustedResult.TranscodeResult.Segments[segmToCheckIndex].Url)
if err != nil {
err = fmt.Errorf("error downloading segment from url=%s err=%w",
trustedResult.TranscodeResult.Segments[segmToCheckIndex].Url, err)
return nil, nil, err
}

// verify untrusted hashes
var sessionsToSuspend []*BroadcastSession
Expand All @@ -514,7 +521,23 @@ func (bsm *BroadcastSessionsManager) chooseResults(ctx context.Context, submitRe
clog.Infof(ctx, "Hashes from url=%s and url=%s are equal=%v",
trustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl,
untrustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl, equal)
vequal := false
if equal {
// download untrusted video segment
untrustedSegm, err := drivers.GetSegmentData(ctx, untrustedResult.TranscodeResult.Segments[segmToCheckIndex].Url)
if err != nil {
err = fmt.Errorf("error downloading segment from url=%s err=%w",
untrustedResult.TranscodeResult.Segments[segmToCheckIndex].Url, err)
return nil, nil, err
}
vequal, err = ffmpeg.CompareVideoByBuffer(trustedSegm, untrustedSegm)
if err != nil {
clog.Errorf(ctx, "error comparing video from url=%s err=%q",
untrustedResult.TranscodeResult.Segments[segmToCheckIndex].Url, err)
return nil, nil, err
}
}
if vequal && equal {
// stick to this verified orchestrator for further segments.
if untrustedResult.Err == nil {
bsm.sessionVerified(untrustedResult.Session)
Expand Down
190 changes: 177 additions & 13 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,9 @@ func TestPush_ReuseIntmidWithDiffExtmid(t *testing.T) {

func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert := assert.New(t)

// need real video data for fast verification
transcodeddata, err := ioutil.ReadFile("../core/test.ts")
assert.NoError(err)
goodHash, err := ioutil.ReadFile("../core/test.phash")
assert.NoError(err)

Expand Down Expand Up @@ -1574,7 +1576,7 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
})
mux.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("trusted transcoded binary data"))
w.Write(transcodeddata)
})
mux.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand All @@ -1593,7 +1595,7 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
})
mux2.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("UNtrusted transcoded binary data"))
w.Write(transcodeddata)
})
unverifiedHash := goodHash
unverifiedHashCalled := 0
Expand All @@ -1617,7 +1619,7 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
})
mux3.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("second UNtrusted transcoded binary data"))
w.Write(transcodeddata)
})
mux3.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -1693,16 +1695,11 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert.Equal("P144p25fps16x9_17.ts", params["name"])
assert.Equal(`attachment; filename="P144p25fps16x9_17.ts"`, p.Header.Get("Content-Disposition"))
assert.Equal("P144p25fps16x9", p.Header.Get("Rendition-Name"))
bodyPart, err := ioutil.ReadAll(p)
assert.NoError(err)
assert.Equal("video/mp2t", strings.ToLower(mediaType))
assert.Equal("UNtrusted transcoded binary data", string(bodyPart))

i++
}
assert.Equal(1, i)
assert.Equal(uint64(12), cxn.sourceBytes)
assert.Equal(uint64(32), cxn.transcodedBytes)

// now make unverified to respond with bad hash
unverifiedHash = []byte{0}
Expand Down Expand Up @@ -1734,17 +1731,184 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert.Equal("P144p25fps16x9_18.ts", params["name"])
assert.Equal(`attachment; filename="P144p25fps16x9_18.ts"`, p.Header.Get("Content-Disposition"))
assert.Equal("P144p25fps16x9", p.Header.Get("Rendition-Name"))
bodyPart, err := ioutil.ReadAll(p)
assert.NoError(err)
assert.Equal("video/mp2t", strings.ToLower(mediaType))
assert.Equal("second UNtrusted transcoded binary data", string(bodyPart))

i++
}
assert.Equal(1, i)
assert.Equal(uint64(12*2), cxn.sourceBytes)
assert.Equal(uint64(71), cxn.transcodedBytes)
assert.Equal(2, unverifiedHashCalled)
assert.Contains(bsm.untrustedPool.sus.list, ts2.URL)
assert.Equal(0, bsm.untrustedPool.sus.count)
}
func TestPush_MultiSessionVideoCompare(t *testing.T) {
assert := assert.New(t)
// need real video data for fast verification
segmentgooddata, err := ioutil.ReadFile("../core/test.ts")
assert.NoError(err)
segmentbaddata, err := ioutil.ReadFile("../core/test2.ts")
assert.NoError(err)
goodHash, err := ioutil.ReadFile("../core/test.phash")
assert.NoError(err)

// wait for any earlier tests to complete
assert.True(wgWait(&pushResetWg), "timed out waiting for earlier tests")

s, cancel := setupServerWithCancel()
defer serverCleanup(s)
defer cancel()
reader := strings.NewReader("InsteadOf.TS")
w := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/live/mani/17.ts", reader)

dummyRes := func(tSegData []*net.TranscodedSegmentData) *net.TranscodeResult {
return &net.TranscodeResult{
Result: &net.TranscodeResult_Data{
Data: &net.TranscodeData{
Segments: tSegData,
Sig: []byte("bar"),
},
},
}
}

// Create stub server
ts, mux := stubTLSServer()
defer ts.Close()

segPath := "/transcoded/segment.ts"
tSegData := []*net.TranscodedSegmentData{{Url: ts.URL + segPath, Pixels: 100, PerceptualHashUrl: ts.URL + segPath + ".phash"}}
tr := dummyRes(tSegData)
buf, err := proto.Marshal(tr)
require.Nil(t, err)

mux.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(buf)
})
mux.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(segmentgooddata)
})
mux.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(goodHash)
})

ts2, mux2 := stubTLSServer()
defer ts2.Close()
tSegData2 := []*net.TranscodedSegmentData{{Url: ts2.URL + segPath, Pixels: 100, PerceptualHashUrl: ts2.URL + segPath + ".phash"}}
tr2 := dummyRes(tSegData2)
buf2, err := proto.Marshal(tr2)
require.Nil(t, err)
mux2.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(buf2)
})
mux2.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(segmentbaddata)
})
mux2.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(goodHash)
})

ts3, mux3 := stubTLSServer()
defer ts3.Close()
tSegData3 := []*net.TranscodedSegmentData{{Url: ts3.URL + segPath, Pixels: 100, PerceptualHashUrl: ts3.URL + segPath + ".phash"}}
tr3 := dummyRes(tSegData3)
buf3, err := proto.Marshal(tr3)
require.Nil(t, err)
mux3.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
// delay so it will be chosen second
time.Sleep(50 * time.Millisecond)
w.WriteHeader(http.StatusOK)
w.Write(buf3)
})
mux3.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(segmentgooddata)
})
mux3.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(goodHash)
})

sess1 := StubBroadcastSession(ts.URL)
sess1.Params.Profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
sess1.Params.ManifestID = "mani"

sess2 := StubBroadcastSession(ts2.URL)
sess2.Params.Profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
sess2.Params.ManifestID = "mani"
sess2.OrchestratorScore = common.Score_Untrusted

sess3 := StubBroadcastSession(ts3.URL)
sess3.Params.Profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
sess3.Params.ManifestID = "mani"
sess3.OrchestratorScore = common.Score_Untrusted

bsm := bsmWithSessListExt([]*BroadcastSession{sess1}, []*BroadcastSession{sess2, sess3}, false)
bsm.VerificationFreq = 1
assert.Equal(0, bsm.untrustedPool.sus.count)
// hack: stop pool from refreshing
bsm.untrustedPool.refreshing = true

url, _ := url.ParseRequestURI("test://some.host")
osd := drivers.NewMemoryDriver(url)
osSession := osd.NewSession("testPath")
sess1.BroadcasterOS = osSession
sess2.BroadcasterOS = osSession
sess3.BroadcasterOS = osSession

oldjpqt := core.JsonPlaylistQuitTimeout
defer func() {
core.JsonPlaylistQuitTimeout = oldjpqt
}()
core.JsonPlaylistQuitTimeout = 0 * time.Second
pl := core.NewBasicPlaylistManager("xx", osSession, nil)

cxn := &rtmpConnection{
mid: core.ManifestID("mani"),
nonce: 7,
pl: pl,
profile: &ffmpeg.P144p30fps16x9,
sessManager: bsm,
params: &core.StreamParameters{Profiles: []ffmpeg.VideoProfile{ffmpeg.P144p25fps16x9}, VerificationFreq: 1},
}

s.rtmpConnections["mani"] = cxn

req.Header.Set("Accept", "multipart/mixed")
s.HandlePush(w, req)
resp := w.Result()
defer resp.Body.Close()
assert.Equal(200, resp.StatusCode)

mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
assert.Equal("multipart/mixed", mediaType)
assert.Nil(err)
mr := multipart.NewReader(resp.Body, params["boundary"])
var i = 0
for {
p, err := mr.NextPart()
if err == io.EOF {
break
}
assert.NoError(err)
mediaType, params, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
assert.Nil(err)
assert.Contains(params, "name")
assert.Len(params, 1)
assert.Equal("P144p25fps16x9_17.ts", params["name"])
assert.Equal(`attachment; filename="P144p25fps16x9_17.ts"`, p.Header.Get("Content-Disposition"))
assert.Equal("P144p25fps16x9", p.Header.Get("Rendition-Name"))
assert.Equal("video/mp2t", strings.ToLower(mediaType))
i++
}
assert.Equal(1, i)
assert.Equal(uint64(12), cxn.sourceBytes)
assert.Contains(bsm.untrustedPool.sus.list, ts2.URL)
assert.Equal(0, bsm.untrustedPool.sus.count)
}

0 comments on commit 10705d2

Please sign in to comment.