Skip to content

Commit

Permalink
Update flow control test to have multiple concurrent streams. (grpc#2170
Browse files Browse the repository at this point in the history
)
  • Loading branch information
MakMukhi authored Jun 22, 2018
1 parent 92d38b0 commit 8e18752
Showing 1 changed file with 71 additions and 43 deletions.
114 changes: 71 additions & 43 deletions transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,47 +1812,69 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
}
server.mu.Unlock()
ct := client.(*http2Client)
cstream, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Failed to create stream. Err: %v", err)
}
msg := make([]byte, msgSize)
buf := make([]byte, msgSize+5)
buf[0] = byte(0)
binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
copy(buf[5:], msg)
opts := Options{}
header := make([]byte, 5)
for i := 1; i <= 10; i++ {
if err := ct.Write(cstream, nil, buf, &opts); err != nil {
t.Fatalf("Error on client while writing message: %v", err)
}
if _, err := cstream.Read(header); err != nil {
t.Fatalf("Error on client while reading data frame header: %v", err)
}
sz := binary.BigEndian.Uint32(header[1:])
recvMsg := make([]byte, int(sz))
if _, err := cstream.Read(recvMsg); err != nil {
t.Fatalf("Error on client while reading data: %v", err)
}
if len(recvMsg) != len(msg) {
t.Fatalf("Length of message received by client: %v, want: %v", len(recvMsg), len(msg))
const numStreams = 10
clientStreams := make([]*Stream, numStreams)
for i := 0; i < numStreams; i++ {
var err error
clientStreams[i], err = client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Failed to create stream. Err: %v", err)
}
}
var sstream *Stream
var wg sync.WaitGroup
// For each stream send pingpong messages to the server.
for _, stream := range clientStreams {
wg.Add(1)
go func(stream *Stream) {
defer wg.Done()
buf := make([]byte, msgSize+5)
buf[0] = byte(0)
binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
opts := Options{}
header := make([]byte, 5)
for i := 1; i <= 10; i++ {
if err := ct.Write(stream, nil, buf, &opts); err != nil {
t.Errorf("Error on client while writing message: %v", err)
return
}
if _, err := stream.Read(header); err != nil {
t.Errorf("Error on client while reading data frame header: %v", err)
return
}
sz := binary.BigEndian.Uint32(header[1:])
recvMsg := make([]byte, int(sz))
if _, err := stream.Read(recvMsg); err != nil {
t.Errorf("Error on client while reading data: %v", err)
return
}
if len(recvMsg) != msgSize {
t.Errorf("Length of message received by client: %v, want: %v", len(recvMsg), msgSize)
return
}
}
}(stream)
}
wg.Wait()
serverStreams := map[uint32]*Stream{}
loopyClientStreams := map[uint32]*outStream{}
loopyServerStreams := map[uint32]*outStream{}
// Get all the streams from server reader and writer and client writer.
st.mu.Lock()
for _, v := range st.activeStreams {
sstream = v
for _, stream := range clientStreams {
id := stream.id
serverStreams[id] = st.activeStreams[id]
loopyServerStreams[id] = st.loopy.estdStreams[id]
loopyClientStreams[id] = ct.loopy.estdStreams[id]

}
st.mu.Unlock()
loopyServerStream := st.loopy.estdStreams[sstream.id]
loopyClientStream := ct.loopy.estdStreams[cstream.id]
ct.Write(cstream, nil, nil, &Options{Last: true}) // Close the stream.
if _, err := cstream.Read(header); err != io.EOF {
t.Fatalf("Client expected an EOF from the server. Got: %v", err)
}
// Sleep for a little to make sure both sides flush out their buffers.
time.Sleep(time.Millisecond * 500)
// Close all streams
for _, stream := range clientStreams {
ct.Write(stream, nil, nil, &Options{Last: true})
if _, err := stream.Read(make([]byte, 5)); err != io.EOF {
t.Fatalf("Client expected an EOF from the server. Got: %v", err)
}
}
// Close down both server and client so that their internals can be read without data
// races.
ct.Close()
Expand All @@ -1861,20 +1883,26 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
<-st.writerDone
<-ct.readerDone
<-ct.writerDone
for _, cstream := range clientStreams {
id := cstream.id
sstream := serverStreams[id]
loopyServerStream := loopyServerStreams[id]
loopyClientStream := loopyClientStreams[id]
// Check stream flow control.
if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding {
t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding)
}
if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding {
t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding)
}
}
// Check transport flow control.
if ct.fc.limit != ct.fc.unacked+st.loopy.sendQuota {
t.Fatalf("Account mismatch: client transport inflow(%d) != client unacked(%d) + server sendQuota(%d)", ct.fc.limit, ct.fc.unacked, st.loopy.sendQuota)
}
if st.fc.limit != st.fc.unacked+ct.loopy.sendQuota {
t.Fatalf("Account mismatch: server transport inflow(%d) != server unacked(%d) + client sendQuota(%d)", st.fc.limit, st.fc.unacked, ct.loopy.sendQuota)
}
// Check stream flow control.
if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding {
t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding)
}
if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding {
t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding)
}
}

func waitWhileTrue(t *testing.T, condition func() (bool, error)) {
Expand Down

0 comments on commit 8e18752

Please sign in to comment.