Skip to content

Commit

Permalink
http2: fix compat stream read handling, add tests
Browse files Browse the repository at this point in the history
Handle edge case where stream pause is called between resume being
called and actually evaluated. Other minor adjustments to avoid
various edge cases around stream events. Add new tests that cover
all changes.

Fixes: nodejs#15491
PR-URL: nodejs#15503
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
apapirovski authored and jasnell committed Sep 25, 2017
1 parent ebc58d7 commit c705f10
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 22 deletions.
47 changes: 25 additions & 22 deletions lib/internal/http2/compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,50 @@ function onStreamError(error) {
}

function onRequestPause() {
this[kStream].pause();
const stream = this[kStream];
if (stream)
stream.pause();
}

function onRequestResume() {
this[kStream].resume();
}

function onRequestDrain() {
if (this.isPaused())
this.resume();
const stream = this[kStream];
if (stream)
stream.resume();
}

function onStreamResponseDrain() {
function onStreamDrain() {
this[kResponse].emit('drain');
}

// TODO Http2Stream does not emit 'close'
function onStreamClosedRequest() {
this[kRequest].push(null);
}

// TODO Http2Stream does not emit 'close'
function onStreamClosedResponse() {
const res = this[kResponse];
res.writable = false;
res.emit('finish');
this[kResponse].emit('finish');
}

function onStreamAbortedRequest(hadError, code) {
if ((this.writable) ||
(this._readableState && !this._readableState.ended)) {
this.emit('aborted', hadError, code);
this.emit('close');
const request = this[kRequest];
if (request[kState].closed === false) {
request.emit('aborted', hadError, code);
request.emit('close');
}
}

function onStreamAbortedResponse() {
if (this.writable) {
this.emit('close');
const response = this[kResponse];
if (response[kState].closed === false) {
response.emit('close');
}
}

function resumeStream(stream) {
stream.resume();
}

class Http2ServerRequest extends Readable {
constructor(stream, headers, options, rawHeaders) {
super(options);
Expand All @@ -158,13 +162,12 @@ class Http2ServerRequest extends Readable {
stream.on('end', onStreamEnd);
stream.on('error', onStreamError);
stream.on('close', onStreamClosedRequest);
stream.on('aborted', onStreamAbortedRequest.bind(this));
stream.on('aborted', onStreamAbortedRequest);
const onfinish = this[kFinish].bind(this);
stream.on('streamClosed', onfinish);
stream.on('finish', onfinish);
this.on('pause', onRequestPause);
this.on('resume', onRequestResume);
this.on('drain', onRequestDrain);
}

get closed() {
Expand Down Expand Up @@ -221,7 +224,7 @@ class Http2ServerRequest extends Readable {
_read(nread) {
const stream = this[kStream];
if (stream !== undefined) {
stream.resume();
process.nextTick(resumeStream, stream);
} else {
this.emit('error', new errors.Error('ERR_HTTP2_STREAM_CLOSED'));
}
Expand Down Expand Up @@ -279,9 +282,9 @@ class Http2ServerResponse extends Stream {
this[kStream] = stream;
stream[kResponse] = this;
this.writable = true;
stream.on('drain', onStreamResponseDrain);
stream.on('drain', onStreamDrain);
stream.on('close', onStreamClosedResponse);
stream.on('aborted', onStreamAbortedResponse.bind(this));
stream.on('aborted', onStreamAbortedResponse);
const onfinish = this[kFinish].bind(this);
stream.on('streamClosed', onfinish);
stream.on('finish', onfinish);
Expand Down
53 changes: 53 additions & 0 deletions test/parallel/test-http2-compat-serverrequest-pause.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Flags: --expose-http2
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const h2 = require('http2');

// Check that pause & resume work as expected with Http2ServerRequest

const testStr = 'Request Body from Client';

const server = h2.createServer();

server.on('request', common.mustCall((req, res) => {
let data = '';
req.pause();
req.setEncoding('utf8');
req.on('data', common.mustCall((chunk) => (data += chunk)));
setTimeout(common.mustCall(() => {
assert.strictEqual(data, '');
req.resume();
}), common.platformTimeout(100));
req.on('end', common.mustCall(() => {
assert.strictEqual(data, testStr);
res.end();
}));

// shouldn't throw if underlying Http2Stream no longer exists
res.on('finish', common.mustCall(() => process.nextTick(() => {
assert.doesNotThrow(() => req.pause());
assert.doesNotThrow(() => req.resume());
})));
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;

const client = h2.connect(`http://localhost:${port}`);
const request = client.request({
':path': '/foobar',
':method': 'POST',
':scheme': 'http',
':authority': `localhost:${port}`
});
request.resume();
request.end(testStr);
request.on('end', common.mustCall(function() {
client.destroy();
server.close();
}));
}));
48 changes: 48 additions & 0 deletions test/parallel/test-http2-compat-serverrequest-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Flags: --expose-http2
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');
const fs = require('fs');
const path = require('path');

// piping should work as expected with createWriteStream

const loc = path.join(common.fixturesDir, 'person.jpg');
const fn = path.join(common.tmpDir, 'http2pipe.jpg');
common.refreshTmpDir();

const server = http2.createServer();

server.on('request', common.mustCall((req, res) => {
const dest = req.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
fs.unlinkSync(fn);
res.end();
}));
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

let remaining = 2;
function maybeClose() {
if (--remaining === 0) {
server.close();
client.destroy();
}
}

const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall());
req.resume();
req.on('end', common.mustCall(maybeClose));
const str = fs.createReadStream(loc);
str.on('end', common.mustCall(maybeClose));
str.pipe(req);
}));
44 changes: 44 additions & 0 deletions test/parallel/test-http2-compat-serverresponse-drain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Flags: --expose-http2
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const h2 = require('http2');

// Check that drain event is passed from Http2Stream

const testString = 'tests';

const server = h2.createServer();

server.on('request', common.mustCall((req, res) => {
res.stream._writableState.highWaterMark = testString.length;
assert.strictEqual(res.write(testString), false);
res.on('drain', common.mustCall(() => res.end(testString)));
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;

const client = h2.connect(`http://localhost:${port}`);
const request = client.request({
':path': '/foobar',
':method': 'POST',
':scheme': 'http',
':authority': `localhost:${port}`
});
request.resume();
request.end();

let data = '';
request.setEncoding('utf8');
request.on('data', (chunk) => (data += chunk));

request.on('end', common.mustCall(function() {
assert.strictEqual(data, testString.repeat(2));
client.destroy();
server.close();
}));
}));

0 comments on commit c705f10

Please sign in to comment.