Skip to content

Commit

Permalink
Fix, clean up, test and optimize http.batch()
Browse files Browse the repository at this point in the history
This adds a minor breaking change - if some of the requests in the http.batch() fail, now the first error would be returned instead of the last one.
  • Loading branch information
na-- committed Dec 10, 2019
1 parent 9ed9f77 commit de82f21
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 63 deletions.
1 change: 1 addition & 0 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func TestSentReceivedMetrics(t *testing.T) {
Hosts: tb.Dialer.Hosts,
InsecureSkipTLSVerify: null.BoolFrom(true),
NoVUConnectionReuse: null.BoolFrom(noConnReuse),
Batch: null.IntFrom(20),
}

r.SetOptions(options)
Expand Down
123 changes: 75 additions & 48 deletions js/modules/k6/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"net/url"
"reflect"
"strings"
"sync"
"time"

"github.com/dop251/goja"
Expand Down Expand Up @@ -357,70 +356,98 @@ func (h *HTTP) parseRequest(
return result, nil
}

func (h *HTTP) prepareBatchArray(
ctx context.Context, requests []interface{},
) ([]httpext.BatchParsedHTTPRequest, []*Response, error) {
reqCount := len(requests)
batchReqs := make([]httpext.BatchParsedHTTPRequest, reqCount)
results := make([]*Response, reqCount)

for i, req := range requests {
parsedReq, err := h.parseBatchRequest(ctx, i, req)
if err != nil {
return nil, nil, err
}
response := new(httpext.Response)
batchReqs[i] = httpext.BatchParsedHTTPRequest{
ParsedHTTPRequest: parsedReq,
Response: response,
}
results[i] = &Response{response}
}

return batchReqs, results, nil
}

func (h *HTTP) prepareBatchObject(
ctx context.Context, requests map[string]interface{},
) ([]httpext.BatchParsedHTTPRequest, map[string]*Response, error) {
reqCount := len(requests)
batchReqs := make([]httpext.BatchParsedHTTPRequest, reqCount)
results := make(map[string]*Response, reqCount)

i := 0
for key, req := range requests {
parsedReq, err := h.parseBatchRequest(ctx, key, req)
if err != nil {
return nil, nil, err
}
response := new(httpext.Response)
batchReqs[i] = httpext.BatchParsedHTTPRequest{
ParsedHTTPRequest: parsedReq,
Response: response,
}
results[key] = &Response{response}
i++
}

return batchReqs, results, nil
}

// Batch makes multiple simultaneous HTTP requests. The provideds reqsV should be an array of request
// objects. Batch returns an array of responses and/or error
func (h *HTTP) Batch(ctx context.Context, reqsV goja.Value) (goja.Value, error) {
state := lib.GetState(ctx)
if state == nil {
return nil, ErrBatchForbiddenInInitContext
}
rt := common.GetRuntime(ctx)

reqs := reqsV.ToObject(rt)
keys := reqs.Keys()
parsedReqs := map[string]*httpext.ParsedHTTPRequest{}
for _, key := range keys {
parsedReq, err := h.parseBatchRequest(ctx, key, reqs.Get(key))
if err != nil {
return nil, err
}
parsedReqs[key] = parsedReq
}

var (
// Return values; retval must be guarded by the mutex.
mutex sync.Mutex
retval = rt.NewObject()
errs = make(chan error)

// Concurrency limits.
globalLimiter = NewSlotLimiter(int(state.Options.Batch.Int64))
perHostLimiter = NewMultiSlotLimiter(int(state.Options.BatchPerHost.Int64))
err error
batchReqs []httpext.BatchParsedHTTPRequest
results interface{} // either []*Response or map[string]*Response
)
for k, pr := range parsedReqs {
go func(key string, parsedReq *httpext.ParsedHTTPRequest) {
globalLimiter.Begin()
defer globalLimiter.End()

if hl := perHostLimiter.Slot(parsedReq.URL.GetURL().Host); hl != nil {
hl.Begin()
defer hl.End()
}

res, err := httpext.MakeRequest(ctx, parsedReq)
if err != nil {
errs <- err
return
}

mutex.Lock()
_ = retval.Set(key, responseFromHttpext(res))
mutex.Unlock()
switch v := reqsV.Export().(type) {
case []interface{}:
batchReqs, results, err = h.prepareBatchArray(ctx, v)
case map[string]interface{}:
batchReqs, results, err = h.prepareBatchObject(ctx, v)
default:
return nil, fmt.Errorf("invalid http.batch() argument type %T", v)
}

errs <- nil
}(k, pr)
if err != nil {
return nil, err
}

var err error
for range keys {
if e := <-errs; e != nil {
reqCount := len(batchReqs)
errs := httpext.MakeBatchRequests(
ctx, batchReqs, reqCount,
int(state.Options.Batch.Int64), int(state.Options.BatchPerHost.Int64),
)

for i := 0; i < reqCount; i++ {
if e := <-errs; e != nil && err == nil { // Save only the first error
err = e
}
}
return retval, err
return common.GetRuntime(ctx).ToValue(results), err
}

func (h *HTTP) parseBatchRequest(ctx context.Context, key string, val goja.Value) (*httpext.ParsedHTTPRequest, error) {
func (h *HTTP) parseBatchRequest(
ctx context.Context, key interface{}, val interface{},
) (*httpext.ParsedHTTPRequest, error) {
var (
method = HTTP_METHOD_GET
ok bool
Expand All @@ -431,7 +458,7 @@ func (h *HTTP) parseBatchRequest(ctx context.Context, key string, val goja.Value
rt = common.GetRuntime(ctx)
)

switch data := val.Export().(type) {
switch data := val.(type) {
case []interface{}:
// Handling of ["GET", "http://example.com/"]
dataLen := len(data)
Expand All @@ -456,7 +483,7 @@ func (h *HTTP) parseBatchRequest(ctx context.Context, key string, val goja.Value
case map[string]interface{}:
// Handling of {method: "GET", url: "http://test.loadimpact.com"}
if murl, ok := data["url"]; !ok {
return nil, fmt.Errorf("batch request %s doesn't have an url key", key)
return nil, fmt.Errorf("batch request %q doesn't have an url key", key)
} else if reqURL, err = ToURL(murl); err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion js/modules/k6/http/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func newRuntime(
UserAgent: null.StringFrom("TestUserAgent"),
Throw: null.BoolFrom(true),
SystemTags: &stats.DefaultSystemTagSet,
Batch: null.IntFrom(20),
BatchPerHost: null.IntFrom(20),
//HTTPDebug: null.StringFrom("full"),
}
samples := make(chan stats.SampleContainer, 1000)
Expand Down Expand Up @@ -1037,6 +1039,10 @@ func TestRequestAndBatch(t *testing.T) {
}

t.Run("Batch", func(t *testing.T) {
t.Run("error", func(t *testing.T) {
_, err := common.RunString(rt, `let res = http.batch("https://somevalidurl.com");`)
require.Error(t, err)
})
t.Run("GET", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let reqs = [
Expand All @@ -1048,7 +1054,7 @@ func TestRequestAndBatch(t *testing.T) {
if (res[key].status != 200) { throw new Error("wrong status: " + res[key].status); }
if (res[key].url != reqs[key][1]) { throw new Error("wrong url: " + res[key].url); }
}`))
assert.NoError(t, err)
require.NoError(t, err)
bufSamples := stats.GetBufferedSamples(samples)
assertRequestMetricsEmitted(t, bufSamples, "GET", sr("HTTPBIN_URL/"), "", 200, "")
assertRequestMetricsEmitted(t, bufSamples, "GET", sr("HTTPBIN_IP_URL/"), "", 200, "")
Expand Down
13 changes: 4 additions & 9 deletions js/modules/k6/http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package http

import (
"context"
"errors"
"fmt"
"net/url"
Expand All @@ -34,22 +33,18 @@ import (
)

// Response is a representation of an HTTP response to be returned to the goja VM
// TODO: refactor after https://github.com/dop251/goja/issues/84
type Response httpext.Response

// GetCtx returns the Context of the httpext.Response
func (res *Response) GetCtx() context.Context {
return ((*httpext.Response)(res)).GetCtx()
type Response struct {
*httpext.Response `js:"-"`
}

func responseFromHttpext(resp *httpext.Response) *Response {
res := Response(*resp)
res := Response{resp}
return &res
}

// JSON parses the body of a response as json and returns it to the goja VM
func (res *Response) JSON(selector ...string) goja.Value {
v, err := ((*httpext.Response)(res)).JSON(selector...)
v, err := res.Response.JSON(selector...)
if err != nil {
common.Throw(common.GetRuntime(res.GetCtx()), err)
}
Expand Down
2 changes: 1 addition & 1 deletion js/modules/k6/http/limiter.go → lib/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
*/

package http
package lib

import (
"sync"
Expand Down
9 changes: 5 additions & 4 deletions js/modules/k6/http/limiter_test.go → lib/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package http
package lib

import (
"fmt"
Expand Down Expand Up @@ -69,11 +69,12 @@ func TestSlotLimiters(t *testing.T) {
l := NewSlotLimiter(tc.limit)
wg := sync.WaitGroup{}

if tc.limit == 0 {
switch {
case tc.limit == 0:
wg.Add(tc.launches)
} else if tc.launches < tc.limit {
case tc.launches < tc.limit:
wg.Add(tc.launches)
} else {
default:
wg.Add(tc.limit)
}

Expand Down
84 changes: 84 additions & 0 deletions lib/netext/httpext/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2019 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package httpext

import (
"context"
"sync/atomic"

"github.com/loadimpact/k6/lib"
)

// BatchParsedHTTPRequest extends the normal parsed HTTP request with a pointer
// to a Response object, so that the batch goroutines can concurrently store the
// responses they receive, without any locking.
type BatchParsedHTTPRequest struct {
*ParsedHTTPRequest
Response *Response // this is modified by MakeBatchRequests()
}

// MakeBatchRequests concurrently makes multiple requests. It spawns
// min(reqCount, globalLimit) goroutines that asynchronously process all
// requests coming from the requests channel. Responses are recorded in the
// pointers contained in each BatchParsedHTTPRequest object, so they need to be
// pre-initialized. In addition, each processed request would emit either a nil
// value, or an error, via the returned errors channel. The goroutines exit when
// the requests channel is closed.
func MakeBatchRequests(
ctx context.Context,
requests []BatchParsedHTTPRequest,
reqCount, globalLimit, perHostLimit int,
) <-chan error {
workers := globalLimit
if reqCount < workers {
workers = reqCount
}
result := make(chan error, reqCount)
perHostLimiter := lib.NewMultiSlotLimiter(perHostLimit)

makeRequest := func(req BatchParsedHTTPRequest) {
if hl := perHostLimiter.Slot(req.URL.GetURL().Host); hl != nil {
hl.Begin()
defer hl.End()
}

resp, err := MakeRequest(ctx, req.ParsedHTTPRequest)
if resp != nil {
*req.Response = *resp
}
result <- err
}

counter, i32reqCount := int32(-1), int32(reqCount)
for i := 0; i < workers; i++ {
go func() {
for {
reqNum := atomic.AddInt32(&counter, 1)
if reqNum >= i32reqCount {
return
}
makeRequest(requests[reqNum])
}
}()
}

return result
}

0 comments on commit de82f21

Please sign in to comment.