Skip to content

Commit

Permalink
http_server i/o no longer treat all paths as prefixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jul 31, 2023
1 parent 27f03a1 commit fecd1cb
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 21 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ All notable changes to this project will be documented in this file.

## Unreleased

### Changed
### Added

- The `snowflake_put` output is now beta.
- Field `topics_pattern` added to the `pulsar` input.

### Fixed
Expand All @@ -16,6 +15,12 @@ All notable changes to this project will be documented in this file.
- Upgraded `kafka` input and output underlying sarama client library to v1.40.0 at new module path github.com/IBM/sarama
- The CUE schema for `switch` processor now correctly reflects that it takes a list of clauses.
- Fixed the CUE schema for fields that take a 2d-array such as `workflow.order`.
- The `http_server` input and output now follow the same multiplexer rules regardless of whether the general `http` server block is used or a custom endpoint.

### Changed

- The `snowflake_put` output is now beta.
- Endpoints specified by `http_server` components using both the general `http` server block or their own custom server addresses should no longer be treated as path prefixes unless the path ends with a slash (`/`), in which case all extensions of the path will match. This corrects a behavioural change introduced in v4.14.0.

## 4.18.0 - 2023-07-02

Expand Down
4 changes: 2 additions & 2 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func (t *Type) RegisterEndpoint(path, desc string, handlerFunc http.HandlerFunc)
h(w, r)
})

t.mux.PathPrefix(path).Handler(wrapHandler)
t.mux.PathPrefix(t.conf.RootPath + path).Handler(wrapHandler)
GetMuxRoute(t.mux, path).Handler(wrapHandler)
GetMuxRoute(t.mux, t.conf.RootPath+path).Handler(wrapHandler)
}
t.handlers[path] = handlerFunc
}
Expand Down
15 changes: 15 additions & 0 deletions internal/api/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,18 @@ http:

return buf.Bytes(), err
}

// EndpointCaveats is a documentation section for HTTP components that explains
// some of the caveats in registering endpoints due to their non-deterministic
// ordering and lack of explicit path terminators.
func EndpointCaveats() string {
return `:::caution Endpoint Caveats
Components within a Benthos config will register their respective endpoints in a non-deterministic order. This means that establishing precedence of endpoints that are registered via multiple ` + "`http_server`" + ` inputs or outputs (either within brokers or from cohabiting streams) is not possible in a predictable way.
This ambiguity makes it difficult to ensure that paths which are both a subset of a path registered by a separate component, and end in a slash (` + "`/`" + `) and will therefore match against all extensions of that path, do not prevent the more specific path from matching against requests.
It is therefore recommended that you ensure paths of separate components do not collide unless they are explicitly non-competing.
For example, if you were to deploy two separate ` + "`http_server`" + ` inputs, one with a path ` + "`/foo/`" + ` and the other with a path ` + "`/foo/bar`" + `, it would not be possible to ensure that the path ` + "`/foo/`" + ` does not swallow requests made to ` + "`/foo/bar`" + `.
:::`
}
19 changes: 19 additions & 0 deletions internal/api/package.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,21 @@
// Package api implements a type used for creating the Benthos HTTP API.
package api

import (
"github.com/gorilla/mux"
)

// GetMuxRoute returns a *mux.Route (the result of calling .Path or .PathPrefix
// on the provided router), where in cases where the path ends in a slash it
// will be treated as a prefix. This isn't ideal but it's as close as we can
// realistically get to the http.ServeMux behaviour with added path variables.
//
// NOTE: Eventually we can move back to http.ServeMux once
// https://github.com/golang/go/issues/61410 is available, and that'll allow us
// to make all paths explicit.
func GetMuxRoute(gMux *mux.Router, path string) *mux.Route {
if len(path) > 0 && path[len(path)-1] == '/' {
return gMux.PathPrefix(path)
}
return gMux.Path(path)
}
9 changes: 6 additions & 3 deletions internal/impl/io/input_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/gorilla/websocket"
"github.com/klauspost/compress/gzip"

"github.com/benthosdev/benthos/v4/internal/api"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/component/interop"
Expand Down Expand Up @@ -172,7 +173,7 @@ It's possible to return a response for each message received using [synchronous
### Endpoints
The following fields specify endpoints that are registered for sending messages, and support path parameters of the form `+"`/{foo}`"+`, which are added to ingested messages as metadata:
The following fields specify endpoints that are registered for sending messages, and support path parameters of the form `+"`/{foo}`"+`, which are added to ingested messages as metadata. A path ending in `+"`/`"+` will match against all extensions of that path:
#### `+"`path` (defaults to `/post`)"+`
Expand All @@ -184,6 +185,8 @@ If the request contains a multipart `+"`content-type`"+` header as per [rfc1341]
Creates a websocket connection, where payloads received on the socket are passed through the pipeline as a batch of one message.
`+api.EndpointCaveats()+`
You may specify an optional `+"`ws_welcome_message`"+`, which is a static payload to be sent to all clients once a websocket connection is first established.
It's also possible to specify a `+"`ws_rate_limit_message`"+`, which is a static payload to be sent to clients that have triggered the servers rate limit.
Expand Down Expand Up @@ -343,10 +346,10 @@ func newHTTPServerInput(conf hsiConfig, mgr bundle.NewManagement) (input.Streame
wsHdlr := gzipHandler(h.wsHandler)
if gMux != nil {
if len(h.conf.Path) > 0 {
gMux.PathPrefix(h.conf.Path).Handler(postHdlr)
api.GetMuxRoute(gMux, h.conf.Path).Handler(postHdlr)
}
if len(h.conf.WSPath) > 0 {
gMux.PathPrefix(h.conf.WSPath).Handler(wsHdlr)
api.GetMuxRoute(gMux, h.conf.WSPath).Handler(wsHdlr)
}
} else {
if len(h.conf.Path) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions internal/impl/io/input_http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type apiRegGorillaMutWrapper struct {
}

func (a apiRegGorillaMutWrapper) RegisterEndpoint(path, desc string, h http.HandlerFunc) {
a.mut.PathPrefix(path).Handler(h)
api.GetMuxRoute(a.mut, path).Handler(h)
}

func TestHTTPBasic(t *testing.T) {
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestHTTPServerPathIsPrefix(t *testing.T) {

conf := parseYAMLInputConf(t, `
http_server:
path: /test/{foo}/{bar}
path: /test/{foo}/{bar}/
allowed_verbs: [ "POST", "PUT" ]
`)
server, err := mgr.NewInput(conf)
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestHTTPServerPathParametersCustomServerPathIsPrefix(t *testing.T) {
conf := parseYAMLInputConf(t, `
http_server:
address: 0.0.0.0:%v
path: /test/{foo}/{bar}
path: /test/{foo}/{bar}/
`, freePort)

server, err := mock.NewManager().NewInput(conf)
Expand Down
25 changes: 15 additions & 10 deletions internal/impl/io/output_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"sync"
"time"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"

"github.com/benthosdev/benthos/v4/internal/api"
"github.com/benthosdev/benthos/v4/internal/batch"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component"
Expand Down Expand Up @@ -109,7 +111,10 @@ Three endpoints will be registered at the paths specified by the fields `+"`path
When messages are batched the `+"`path`"+` endpoint encodes the batch according to [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html). This behaviour can be overridden by [archiving your batches](/docs/configuration/batching#post-batch-processing).
Please note, messages are considered delivered as soon as the data is written to the client. There is no concept of at least once delivery on this output.`).
Please note, messages are considered delivered as soon as the data is written to the client. There is no concept of at least once delivery on this output.
`+api.EndpointCaveats()+`
`).
Fields(
service.NewStringField(hsoFieldAddress).
Description("An alternative address to host from. If left empty the service wide address is used.").
Expand Down Expand Up @@ -175,7 +180,7 @@ type httpServerOutput struct {
conf hsoConfig
log log.Modular

mux *http.ServeMux
mux *mux.Router
server *http.Server

transactions <-chan message.Transaction
Expand All @@ -197,14 +202,14 @@ type httpServerOutput struct {
}

func newHTTPServerOutput(conf hsoConfig, mgr bundle.NewManagement) (output.Streamed, error) {
var mux *http.ServeMux
var gMux *mux.Router
var server *http.Server

var err error
if len(conf.Address) > 0 {
mux = http.NewServeMux()
gMux = mux.NewRouter()
server = &http.Server{Addr: conf.Address}
if server.Handler, err = conf.CORS.WrapHandler(mux); err != nil {
if server.Handler, err = conf.CORS.WrapHandler(gMux); err != nil {
return nil, fmt.Errorf("bad CORS configuration: %w", err)
}
}
Expand All @@ -219,7 +224,7 @@ func newHTTPServerOutput(conf hsoConfig, mgr bundle.NewManagement) (output.Strea
shutSig: shutdown.NewSignaller(),
conf: conf,
log: mgr.Logger(),
mux: mux,
mux: gMux,
server: server,

mGetSent: mSent,
Expand All @@ -235,15 +240,15 @@ func newHTTPServerOutput(conf hsoConfig, mgr bundle.NewManagement) (output.Strea
mStreamError: mError,
}

if mux != nil {
if gMux != nil {
if len(h.conf.Path) > 0 {
h.mux.HandleFunc(h.conf.Path, h.getHandler)
api.GetMuxRoute(gMux, h.conf.Path).HandlerFunc(h.getHandler)
}
if len(h.conf.StreamPath) > 0 {
h.mux.HandleFunc(h.conf.StreamPath, h.streamHandler)
api.GetMuxRoute(gMux, h.conf.StreamPath).HandlerFunc(h.streamHandler)
}
if len(h.conf.WSPath) > 0 {
h.mux.HandleFunc(h.conf.WSPath, h.wsHandler)
api.GetMuxRoute(gMux, h.conf.WSPath).HandlerFunc(h.wsHandler)
}
} else {
if len(h.conf.Path) > 0 {
Expand Down
12 changes: 11 additions & 1 deletion website/docs/components/inputs/http_server.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ It's possible to return a response for each message received using [synchronous

### Endpoints

The following fields specify endpoints that are registered for sending messages, and support path parameters of the form `/{foo}`, which are added to ingested messages as metadata:
The following fields specify endpoints that are registered for sending messages, and support path parameters of the form `/{foo}`, which are added to ingested messages as metadata. A path ending in `/` will match against all extensions of that path:

#### `path` (defaults to `/post`)

Expand All @@ -96,6 +96,16 @@ If the request contains a multipart `content-type` header as per [rfc1341](https

Creates a websocket connection, where payloads received on the socket are passed through the pipeline as a batch of one message.

:::caution Endpoint Caveats
Components within a Benthos config will register their respective endpoints in a non-deterministic order. This means that establishing precedence of endpoints that are registered via multiple `http_server` inputs or outputs (either within brokers or from cohabiting streams) is not possible in a predictable way.

This ambiguity makes it difficult to ensure that paths which are both a subset of a path registered by a separate component, and end in a slash (`/`) and will therefore match against all extensions of that path, do not prevent the more specific path from matching against requests.

It is therefore recommended that you ensure paths of separate components do not collide unless they are explicitly non-competing.

For example, if you were to deploy two separate `http_server` inputs, one with a path `/foo/` and the other with a path `/foo/bar`, it would not be possible to ensure that the path `/foo/` does not swallow requests made to `/foo/bar`.
:::

You may specify an optional `ws_welcome_message`, which is a static payload to be sent to all clients once a websocket connection is first established.

It's also possible to specify a `ws_rate_limit_message`, which is a static payload to be sent to clients that have triggered the servers rate limit.
Expand Down
11 changes: 11 additions & 0 deletions website/docs/components/outputs/http_server.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ When messages are batched the `path` endpoint encodes the batch according to [RF

Please note, messages are considered delivered as soon as the data is written to the client. There is no concept of at least once delivery on this output.

:::caution Endpoint Caveats
Components within a Benthos config will register their respective endpoints in a non-deterministic order. This means that establishing precedence of endpoints that are registered via multiple `http_server` inputs or outputs (either within brokers or from cohabiting streams) is not possible in a predictable way.

This ambiguity makes it difficult to ensure that paths which are both a subset of a path registered by a separate component, and end in a slash (`/`) and will therefore match against all extensions of that path, do not prevent the more specific path from matching against requests.

It is therefore recommended that you ensure paths of separate components do not collide unless they are explicitly non-competing.

For example, if you were to deploy two separate `http_server` inputs, one with a path `/foo/` and the other with a path `/foo/bar`, it would not be possible to ensure that the path `/foo/` does not swallow requests made to `/foo/bar`.
:::


## Fields

### `address`
Expand Down

0 comments on commit fecd1cb

Please sign in to comment.