forked from uber/tchannel-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_test.go
344 lines (285 loc) · 10.6 KB
/
stream_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
// Copyright (c) 2015 Uber Technologies, Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package tchannel_test
import (
"errors"
"fmt"
"io"
"io/ioutil"
"strings"
"testing"
"time"
. "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/testutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
const (
streamRequestError = byte(255)
streamRequestClose = byte(254)
)
func makeRepeatedBytes(n byte) []byte {
data := make([]byte, int(n))
for i := byte(0); i < n; i++ {
data[i] = n
}
return data
}
func writeFlushBytes(w ArgWriter, bs []byte) error {
if _, err := w.Write(bs); err != nil {
return err
}
return w.Flush()
}
type streamHelper struct {
t testing.TB
}
// startCall starts a call to echoStream and returns the arg3 reader and writer.
func (h streamHelper) startCall(ctx context.Context, ch *Channel, hostPort, serviceName string) (ArgWriter, ArgReader) {
call, err := ch.BeginCall(ctx, hostPort, serviceName, "echoStream", nil)
require.NoError(h.t, err, "BeginCall to echoStream failed")
// Write empty headers
require.NoError(h.t, NewArgWriter(call.Arg2Writer()).Write(nil), "Write empty headers failed")
// Flush arg3 to force the call to start without any arg3.
writer, err := call.Arg3Writer()
require.NoError(h.t, err, "Arg3Writer failed")
require.NoError(h.t, writer.Flush(), "Arg3Writer flush failed")
// Read empty Headers
response := call.Response()
var arg2 []byte
require.NoError(h.t, NewArgReader(response.Arg2Reader()).Read(&arg2), "Read headers failed")
require.False(h.t, response.ApplicationError(), "echoStream failed due to application error")
reader, err := response.Arg3Reader()
require.NoError(h.t, err, "Arg3Reader failed")
return writer, reader
}
// streamPartialHandler returns a streaming handler that has the following contract:
// read a byte, write N bytes where N = the byte that was read.
// The results are be written as soon as the byte is read.
func streamPartialHandler(t testing.TB, reportErrors bool) HandlerFunc {
return func(ctx context.Context, call *InboundCall) {
response := call.Response()
onError := func(err error) {
if reportErrors {
t.Errorf("Handler error: %v", err)
}
response.SendSystemError(fmt.Errorf("failed to read arg2"))
}
var arg2 []byte
if err := NewArgReader(call.Arg2Reader()).Read(&arg2); err != nil {
onError(fmt.Errorf("failed to read arg2"))
return
}
if err := NewArgWriter(response.Arg2Writer()).Write(nil); err != nil {
onError(fmt.Errorf(""))
return
}
argReader, err := call.Arg3Reader()
if err != nil {
onError(fmt.Errorf("failed to read arg3"))
return
}
argWriter, err := response.Arg3Writer()
if err != nil {
onError(fmt.Errorf("arg3 writer failed"))
return
}
// Flush arg3 which will force a frame with just arg2 to be sent.
// The test reads arg2 before arg3 has been sent.
if err := argWriter.Flush(); err != nil {
onError(fmt.Errorf("arg3 flush failed"))
return
}
arg3 := make([]byte, 1)
for {
n, err := argReader.Read(arg3)
if err == io.EOF {
break
}
if n == 0 && err == nil {
err = fmt.Errorf("read 0 bytes")
}
if err != nil {
onError(fmt.Errorf("arg3 Read failed: %v", err))
return
}
// Magic number to cause a failure
if arg3[0] == streamRequestError {
// Make sure that the reader is closed.
if err := argReader.Close(); err != nil {
onError(fmt.Errorf("request error failed to close argReader: %v", err))
return
}
response.SendSystemError(errors.New("intentional failure"))
return
}
if arg3[0] == streamRequestClose {
if err := argWriter.Close(); err != nil {
onError(err)
}
return
}
// Write the number of bytes as specified by arg3[0]
if _, err := argWriter.Write(makeRepeatedBytes(arg3[0])); err != nil {
onError(fmt.Errorf("argWriter Write failed: %v", err))
return
}
if err := argWriter.Flush(); err != nil {
onError(fmt.Errorf("argWriter flush failed: %v", err))
return
}
}
if err := argReader.Close(); err != nil {
onError(fmt.Errorf("argReader Close failed: %v", err))
return
}
if err := argWriter.Close(); err != nil {
onError(fmt.Errorf("arg3writer Close failed: %v", err))
return
}
}
}
func testStreamArg(t *testing.T, f func(argWriter ArgWriter, argReader ArgReader)) {
defer testutils.SetTimeout(t, 2*time.Second)()
ctx, cancel := NewContext(time.Second)
defer cancel()
helper := streamHelper{t}
WithVerifiedServer(t, nil, func(ch *Channel, hostPort string) {
ch.Register(streamPartialHandler(t, true /* report errors */), "echoStream")
argWriter, argReader := helper.startCall(ctx, ch, hostPort, ch.ServiceName())
verifyBytes := func(n byte) {
require.NoError(t, writeFlushBytes(argWriter, []byte{n}), "arg3 write failed")
arg3 := make([]byte, int(n))
_, err := io.ReadFull(argReader, arg3)
require.NoError(t, err, "arg3 read failed")
assert.Equal(t, makeRepeatedBytes(n), arg3, "arg3 result mismatch")
}
verifyBytes(0)
verifyBytes(5)
verifyBytes(100)
verifyBytes(1)
f(argWriter, argReader)
})
}
func TestStreamPartialArg(t *testing.T) {
testStreamArg(t, func(argWriter ArgWriter, argReader ArgReader) {
require.NoError(t, argWriter.Close(), "arg3 close failed")
// Once closed, we expect the reader to return EOF
n, err := io.Copy(ioutil.Discard, argReader)
assert.Equal(t, int64(0), n, "arg2 reader expected to EOF after arg3 writer is closed")
assert.NoError(t, err, "Copy should not fail")
assert.NoError(t, argReader.Close(), "close arg reader failed")
})
}
func TestStreamSendError(t *testing.T) {
testStreamArg(t, func(argWriter ArgWriter, argReader ArgReader) {
// Send the magic number to request an error.
_, err := argWriter.Write([]byte{streamRequestError})
require.NoError(t, err, "arg3 write failed")
require.NoError(t, argWriter.Close(), "arg3 close failed")
// Now we expect an error on our next read.
_, err = ioutil.ReadAll(argReader)
assert.Error(t, err, "ReadAll should fail")
assert.True(t, strings.Contains(err.Error(), "intentional failure"), "err %v unexpected", err)
})
}
func TestStreamCancelled(t *testing.T) {
// Since the cancel message is unimplemented, the relay does not know that the
// call was cancelled, andwill block closing till the timeout.
opts := testutils.NewOpts().NoRelay()
testutils.WithTestServer(t, opts, func(t testing.TB, ts *testutils.TestServer) {
ts.Register(streamPartialHandler(t, false /* report errors */), "echoStream")
ctx, cancel := NewContext(testutils.Timeout(time.Second))
defer cancel()
helper := streamHelper{t}
client := ts.NewClient(nil)
cancelContext := make(chan struct{})
arg3Writer, arg3Reader := helper.startCall(ctx, client, ts.HostPort(), ts.ServiceName())
go func() {
for i := 0; i < 10; i++ {
assert.NoError(t, writeFlushBytes(arg3Writer, []byte{1}), "Write failed")
}
// Our reads and writes should fail now.
<-cancelContext
cancel()
_, err := arg3Writer.Write([]byte{1})
// The write will succeed since it's buffered.
assert.NoError(t, err, "Write after fail should be buffered")
assert.Error(t, arg3Writer.Flush(), "writer.Flush should fail after cancel")
assert.Error(t, arg3Writer.Close(), "writer.Close should fail after cancel")
}()
for i := 0; i < 10; i++ {
arg3 := make([]byte, 1)
n, err := arg3Reader.Read(arg3)
assert.Equal(t, 1, n, "Read did not correct number of bytes")
assert.NoError(t, err, "Read failed")
}
close(cancelContext)
n, err := io.Copy(ioutil.Discard, arg3Reader)
assert.EqualValues(t, 0, n, "Read should not read any bytes after cancel")
assert.Error(t, err, "Read should fail after cancel")
assert.Error(t, arg3Reader.Close(), "reader.Close should fail after cancel")
// Close the client to clear out the pending exchange. Otherwise the test
// waits for the timeout, causing a slowdown.
client.Close()
})
}
func TestResponseClosedBeforeRequest(t *testing.T) {
testutils.WithTestServer(t, nil, func(t testing.TB, ts *testutils.TestServer) {
ts.Register(streamPartialHandler(t, false /* report errors */), "echoStream")
ctx, cancel := NewContext(testutils.Timeout(time.Second))
defer cancel()
helper := streamHelper{t}
ch := ts.NewClient(nil)
responseClosed := make(chan struct{})
writerDone := make(chan struct{})
arg3Writer, arg3Reader := helper.startCall(ctx, ch, ts.HostPort(), ts.Server().ServiceName())
go func() {
defer close(writerDone)
for i := 0; i < 10; i++ {
assert.NoError(t, writeFlushBytes(arg3Writer, []byte{1}), "Write failed")
}
// Ignore the error of writeFlushBytes here since once we flush, the
// remote side could receive and close the response before we've created
// a new fragment (see fragmentingWriter.Flush). This could result
// in the Flush returning a "mex is already shutdown" error.
writeFlushBytes(arg3Writer, []byte{streamRequestClose})
// Wait until our reader gets the EOF.
<-responseClosed
// Now our writes should fail, since the stream is shutdown
err := writeFlushBytes(arg3Writer, []byte{1})
if assert.Error(t, err, "Req write should fail since response stream ended") {
assert.Contains(t, err.Error(), "mex has been shutdown")
}
}()
for i := 0; i < 10; i++ {
arg3 := make([]byte, 1)
n, err := arg3Reader.Read(arg3)
assert.Equal(t, 1, n, "Read did not correct number of bytes")
assert.NoError(t, err, "Read failed")
}
eofBuf := make([]byte, 1)
_, err := arg3Reader.Read(eofBuf)
assert.Equal(t, io.EOF, err, "Response should EOF after request close")
assert.NoError(t, arg3Reader.Close(), "Close should succeed")
close(responseClosed)
<-writerDone
})
}