Skip to content

Commit

Permalink
Use "-rw_timeout" option to control the maximum time of network opera…
Browse files Browse the repository at this point in the history
…tions (#96)

* added flag `-rw_timeout` to ffprobe command

* change timeout naming

* fix tests

* update description and examples

* add test for StreamsInfo::_runShowStreamsProcess

* fix remarks
  • Loading branch information
oleh-poberezhets authored and WoZ committed Dec 17, 2019
1 parent 21f4723 commit 1f18d49
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 61 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const {StreamsInfo} = require('video-quality-tools');

const streamsInfoOptions = {
ffprobePath: '/usr/local/bin/ffprobe',
timeoutInSec: 5
timeoutInMs: 2000
};
const streamsInfo = new StreamsInfo(streamsInfoOptions, 'rtmp://host:port/appInstance/name');
```
Expand Down Expand Up @@ -179,7 +179,7 @@ const {FramesMonitor} = require('video-quality-tools');

const framesMonitorOptions = {
ffprobePath: '/usr/local/bin/ffprobe',
timeoutInSec: 5,
timeoutInMs: 2000,
bufferMaxLengthInBytes: 100000,
errorLevel: 'error',
exitProcessGuardTimeoutInMs: 1000
Expand All @@ -197,7 +197,8 @@ Constructor throws:
The first argument of `FramesMonitor` must be an `options` object. All `options` object's fields are mandatory:

* `ffprobePath` - string, path to ffprobe executable;
* `timeoutInSec` - integer, greater than 0, specifies the waiting time of a live stream’s first frame;
* `timeoutInMs` - integer, greater than 0, specifies the maximum time to wait for (network) read/write operations
to complete;
* `bufferMaxLengthInBytes` - integer, greater than 0, specifies the buffer length for ffprobe frames. This setting
prevents from hanging and receiving incorrect data from the stream, usually 1-2 KB is enough;
* `errorLevel` - specifies log level for debugging purposes, must be equal to ffprobe's
Expand Down
2 changes: 1 addition & 1 deletion examples/realtimeStats.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const STREAM_URI = 'rtmp://host:port/path';

const framesMonitorOptions = {
ffprobePath: '/usr/local/bin/ffprobe',
timeoutInSec: 5,
timeoutInMs: 2000,
bufferMaxLengthInBytes: 100000,
errorLevel: 'error',
exitProcessGuardTimeoutInMs: 1000
Expand Down
25 changes: 16 additions & 9 deletions src/FramesMonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FramesMonitor extends EventEmitter {

const {
ffprobePath,
timeoutInSec,
timeoutInMs,
bufferMaxLengthInBytes,
errorLevel,
exitProcessGuardTimeoutInMs
Expand All @@ -52,7 +52,7 @@ class FramesMonitor extends EventEmitter {
throw new Errors.ConfigError('You should provide a correct path to ffprobe, bastard.');
}

if (!_.isSafeInteger(timeoutInSec) || timeoutInSec <= 0) {
if (!_.isSafeInteger(timeoutInMs) || timeoutInMs <= 0) {
throw new Errors.ConfigError('You should provide a correct timeout, bastard.');
}

Expand All @@ -72,8 +72,15 @@ class FramesMonitor extends EventEmitter {

FramesMonitor._assertExecutable(ffprobePath);

this._config = _.cloneDeep(config);
this._url = url;
this._config = {
ffprobePath,
bufferMaxLengthInBytes,
errorLevel,
exitProcessGuardTimeoutInMs,
timeout: timeoutInMs * 1000
};

this._url = url;

this._cp = null;
this._chunkRemainder = '';
Expand Down Expand Up @@ -260,26 +267,26 @@ class FramesMonitor extends EventEmitter {
}

_runShowFramesProcess() {
const {ffprobePath, timeoutInSec, errorLevel} = this._config;
const {ffprobePath, timeout, errorLevel} = this._config;

try {
const exec = spawn(
return spawn(
ffprobePath,
[
'-hide_banner',
'-v',
errorLevel,
'-fflags',
'nobuffer',
'-rw_timeout',
timeout,
'-show_frames',
'-show_entries',
'frame=pkt_size,pkt_pts_time,media_type,pict_type,key_frame,width,height',
'-i',
`${this._url} timeout=${timeoutInSec}`
this._url
]
);

return exec;
} catch (err) {
if (err instanceof TypeError) {
// spawn method throws TypeError if some argument is invalid
Expand Down
19 changes: 12 additions & 7 deletions src/StreamsInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ class StreamsInfo {
throw new TypeError('You should provide a correct url, bastard.');
}

const {ffprobePath, timeoutInSec} = config;
const {ffprobePath, timeoutInMs} = config;

if (!_.isString(ffprobePath) || _.isEmpty(ffprobePath)) {
throw new Errors.ConfigError('You should provide a correct path to ffprobe, bastard.');
}

if (!_.isInteger(timeoutInSec) || timeoutInSec <= 0) {
if (!_.isInteger(timeoutInMs) || timeoutInMs <= 0) {
throw new Errors.ConfigError('You should provide a correct timeout, bastard.');
}

this._assertExecutable(ffprobePath);

this._config = config;
this._url = url;
this._config = {
ffprobePath,
timeout: timeoutInMs * 1000
};

this._url = url;
}

async fetch() {
Expand Down Expand Up @@ -79,15 +83,16 @@ class StreamsInfo {
}

_runShowStreamsProcess() {
const {ffprobePath, timeoutInSec} = this._config;
const {ffprobePath, timeout} = this._config;

const command = `
const command = `\
${ffprobePath}\
-hide_banner\
-v error\
-show_streams\
-print_format json\
'${this._url} timeout=${timeoutInSec}'
-rw_timeout ${timeout}\
${this._url}\
`;

return promisify(exec)(command);
Expand Down
10 changes: 5 additions & 5 deletions tests/Functional/FramesMonitor/listen.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('FramesMonitor::listen, fetch frames from inactive stream', () => {

framesMonitor = new FramesMonitor({
ffprobePath : process.env.FFPROBE,
timeoutInSec : 1,
timeoutInMs : 1000,
bufferMaxLengthInBytes : bufferMaxLengthInBytes,
errorLevel : errorLevel,
exitProcessGuardTimeoutInMs: exitProcessGuardTimeoutInMs
Expand Down Expand Up @@ -82,7 +82,7 @@ describe('FramesMonitor::listen, fetch frames from active stream', () => {

framesMonitor = new FramesMonitor({
ffprobePath : process.env.FFPROBE,
timeoutInSec : 1,
timeoutInMs : 1000,
bufferMaxLengthInBytes : bufferMaxLengthInBytes,
errorLevel : errorLevel,
exitProcessGuardTimeoutInMs: exitProcessGuardTimeoutInMs
Expand All @@ -102,7 +102,7 @@ describe('FramesMonitor::listen, fetch frames from active stream', () => {
});

it('must receive all stream frames', done => {
const expectedReturnCode = 0;
const expectedReturnCode = 0;

const onFrame = {I: spyOnIFrame, P: spyOnPFrame};

Expand Down Expand Up @@ -146,7 +146,7 @@ describe('FramesMonitor::listen, stop ffprobe process', () => {

framesMonitor = new FramesMonitor({
ffprobePath : process.env.FFPROBE,
timeoutInSec : 1,
timeoutInMs : 1000,
bufferMaxLengthInBytes : bufferMaxLengthInBytes,
errorLevel : errorLevel,
exitProcessGuardTimeoutInMs: exitProcessGuardTimeoutInMs
Expand Down Expand Up @@ -188,7 +188,7 @@ describe('FramesMonitor::listen, exit with correct code after stream has been fi

framesMonitor = new FramesMonitor({
ffprobePath : process.env.FFPROBE,
timeoutInSec : 1,
timeoutInMs : 1000,
bufferMaxLengthInBytes : bufferMaxLengthInBytes,
errorLevel : errorLevel,
exitProcessGuardTimeoutInMs: exitProcessGuardTimeoutInMs
Expand Down
8 changes: 4 additions & 4 deletions tests/Functional/StreamsInfo/fetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ describe('StreamsInfo::fetch, fetch streams info from inactive stream', () => {
streamUrl = `http://localhost:${port}`;

streamsInfo = new StreamsInfo({
ffprobePath : process.env.FFPROBE,
timeoutInSec: 1,
ffprobePath: process.env.FFPROBE,
timeoutInMs: 1000,
}, streamUrl);
});

Expand Down Expand Up @@ -54,8 +54,8 @@ describe('StreamsInfo::fetch, fetch streams info from active stream', () => {
streamUrl = `http://localhost:${port}`;

streamsInfo = new StreamsInfo({
ffprobePath : process.env.FFPROBE,
timeoutInSec: 1,
ffprobePath: process.env.FFPROBE,
timeoutInMs: 1000,
}, streamUrl);

stream = await startStream(testFile, streamUrl);
Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/FramesMonitor/Helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const proxyquire = require('proxyquire');

const ffprobePath = '/correct/path';
const bufferMaxLengthInBytes = 2 ** 20;
const timeoutInSec = 1;
const timeoutInMs = 1000;
const url = 'rtmp://localhost:1935/myapp/mystream';
const errorLevel = 'fatal'; // https://ffmpeg.org/ffprobe.html
const exitProcessGuardTimeoutInMs = 2000;
Expand Down Expand Up @@ -38,7 +38,7 @@ function makeChildProcess() {
module.exports = {
config: {
ffprobePath,
timeoutInSec,
timeoutInMs,
bufferMaxLengthInBytes,
errorLevel,
exitProcessGuardTimeoutInMs
Expand Down
8 changes: 5 additions & 3 deletions tests/Unit/FramesMonitor/_runShowFramesProcess.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@ const {assert} = require('chai');

const {config, url} = require('./Helpers');

function getSpawnArguments(url, timeoutInSec, errorLevel) {
function getSpawnArguments(url, timeoutInMs, errorLevel) {
return [
'-hide_banner',
'-v',
errorLevel,
'-fflags',
'nobuffer',
'-rw_timeout',
timeoutInMs * 1000,
'-show_frames',
'-show_entries',
'frame=pkt_size,pkt_pts_time,media_type,pict_type,key_frame,width,height',
'-i',
`${url} timeout=${timeoutInSec}`
url
];
}

describe('FramesMonitor::_handleProcessingError', () => {
const expectedFfprobePath = config.ffprobePath;
const expectedFfprobeArguments = getSpawnArguments(url, config.timeoutInSec, config.errorLevel);
const expectedFfprobeArguments = getSpawnArguments(url, config.timeoutInMs, config.errorLevel);

it('must returns child process object just fine', () => {
const expectedOutput = {cp: true};
Expand Down
12 changes: 6 additions & 6 deletions tests/Unit/FramesMonitor/constructor.data.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const incorrectFfprobePath = [
new Error('bastard')
];

const incorrectTimeoutInSec = [
const incorrectTimeoutInMs = [
undefined,
null,
false,
Expand Down Expand Up @@ -93,13 +93,13 @@ const incorrectExitProcessGuardTimeoutInMs = [

const incorrectConfigObject = [
{
description: 'config.timeoutInSec param must be a positive integer, float is passed',
config : {timeoutInSec: 1.1},
description: 'config.timeoutInMs param must be a positive integer, float is passed',
config : {timeoutInMs: 1.1},
errorMsg : 'You should provide a correct timeout, bastard.'
},
{
description: 'config.timeoutInSec param must be a positive integer, negative is passed',
config : {timeoutInSec: -1},
description: 'config.timeoutInMs param must be a positive integer, negative is passed',
config : {timeoutInMs: -1},
errorMsg : 'You should provide a correct timeout, bastard.'
},
{
Expand Down Expand Up @@ -133,7 +133,7 @@ module.exports = {
incorrectConfig,
incorrectUrl,
incorrectFfprobePath,
incorrectTimeoutInSec,
incorrectTimeoutInMs,
incorrectBufferMaxLengthInBytes,
incorrectErrorLevel,
incorrectExitProcessGuardTimeoutInMs,
Expand Down
16 changes: 12 additions & 4 deletions tests/Unit/FramesMonitor/constructor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ describe('FramesMonitor::constructor', () => {
);

dataDriven(
testData.incorrectTimeoutInSec.map(item => ({type: typeOf(item), timeoutInSec: item})),
testData.incorrectTimeoutInMs.map(item => ({type: typeOf(item), timeoutInMs: item})),
() => {
it('config.timeoutInSec param has invalid ({type}) type', ctx => {
it('config.timeoutInMs param has invalid ({type}) type', ctx => {
const incorrectConfig = Object.assign({}, config, {
timeoutInSec: ctx.timeoutInSec
timeoutInMs: ctx.timeoutInMs
});

assert.throws(() => {
Expand Down Expand Up @@ -172,13 +172,21 @@ describe('FramesMonitor::constructor', () => {
const expectedChildProcessDefaultValue = null;
const expectedChunkRemainderDefaultValue = '';
const expectedStderrOutputs = [];
const expectedConfig = {
ffprobePath : config.ffprobePath,
bufferMaxLengthInBytes : config.bufferMaxLengthInBytes,
errorLevel : config.errorLevel,
exitProcessGuardTimeoutInMs: config.exitProcessGuardTimeoutInMs,
timeout : config.timeoutInMs * 1000
};

const framesMonitor = new FramesMonitor(config, url);

assert.isTrue(spyAssertExecutable.calledOnce);
assert.isTrue(spyAssertExecutable.calledWithExactly(config.ffprobePath));

assert.deepEqual(framesMonitor._config, config);

assert.deepEqual(framesMonitor._config, expectedConfig);
assert.strictEqual(framesMonitor._url, url);

assert.strictEqual(framesMonitor._cp, expectedChildProcessDefaultValue);
Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/StreamsInfo/_adjustAspectRatio.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ const {invalidParams, validParams} = require('./_adjustAspectRatio.data');
describe('StreamsInfo::_adjustAspectRatio', () => {

const streamsInfo = new StreamsInfo({
ffprobePath : correctPath,
timeoutInSec: 1
ffprobePath: correctPath,
timeoutInMs: 1
}, correctUrl);

dataDriven(invalidParams, function () {
Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/StreamsInfo/_parseStreamsInfo.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ function typeOf(obj) {
describe('StreamsInfo::_parseStreamsInfo', () => {

const streamsInfo = new StreamsInfo({
ffprobePath : correctPath,
timeoutInSec: 1
ffprobePath: correctPath,
timeoutInMs: 1
}, correctUrl);

it('method awaits for stringified json', () => {
Expand Down
Loading

0 comments on commit 1f18d49

Please sign in to comment.