Skip to content

Commit

Permalink
Merge pull request improbable-eng#166 from improbable-eng/feature/fix…
Browse files Browse the repository at this point in the history
…-exports

Change visibility of new GRPC WebSocket functions.
  • Loading branch information
Jonny Reeves authored Apr 11, 2018
2 parents 6fb683f + d843429 commit 920d812
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 104 deletions.
86 changes: 6 additions & 80 deletions go/grpcweb/DOC.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,86 +124,6 @@ bidirectional requests.

The default behaviour is false, i.e. to disallow websockets

#### type WebSocketResponseWriter

```go
type WebSocketResponseWriter struct {
}
```


#### func (*WebSocketResponseWriter) CloseNotify

```go
func (w *WebSocketResponseWriter) CloseNotify() <-chan bool
```

#### func (*WebSocketResponseWriter) Flush

```go
func (w *WebSocketResponseWriter) Flush()
```

#### func (*WebSocketResponseWriter) FlushTrailers

```go
func (w *WebSocketResponseWriter) FlushTrailers()
```

#### func (*WebSocketResponseWriter) Header

```go
func (w *WebSocketResponseWriter) Header() http.Header
```

#### func (*WebSocketResponseWriter) Write

```go
func (w *WebSocketResponseWriter) Write(b []byte) (int, error)
```

#### func (*WebSocketResponseWriter) WriteHeader

```go
func (w *WebSocketResponseWriter) WriteHeader(code int)
```

#### type WebSocketWrappedReader

```go
type WebSocketWrappedReader struct {
}
```


#### func NewWebsocketWrappedReader

```go
func NewWebsocketWrappedReader(wsConn *websocket.Conn, respWriter *WebSocketResponseWriter) *WebSocketWrappedReader
```

#### func (*WebSocketWrappedReader) Close

```go
func (w *WebSocketWrappedReader) Close() error
```

#### func (*WebSocketWrappedReader) Read

```go
func (w *WebSocketWrappedReader) Read(p []byte) (int, error)
```
First byte of a binary WebSocket frame is used for control flow: 0 = Data 1 =
End of client send

#### type WebSocketWrapper

```go
type WebSocketWrapper struct {
}
```


#### type WrappedGrpcServer

```go
Expand Down Expand Up @@ -242,6 +162,10 @@ with the gRPC-Web protocol.
```go
func (w *WrappedGrpcServer) HandleGrpcWebsocketRequest(resp http.ResponseWriter, req *http.Request)
```
HandleGrpcWebsocketRequest takes a HTTP request that is assumed to be a
gRPC-Websocket request and wraps it with a compatibility layer to transform it
to a standard gRPC request for the wrapped gRPC server and transforms the
response to comply with the gRPC-Web protocol.

#### func (*WrappedGrpcServer) IsAcceptableGrpcCorsRequest

Expand All @@ -267,6 +191,8 @@ the "content-type" is "application/grpc-web" and that the method is POST.
```go
func (w *WrappedGrpcServer) IsGrpcWebSocketRequest(req *http.Request) bool
```
IsGrpcWebSocketRequest determines if a request is a gRPC-Web request by checking
that the "Sec-Websocket-Protocol" header value is "grpc-websockets"

#### func (*WrappedGrpcServer) ServeHTTP

Expand Down
40 changes: 18 additions & 22 deletions go/grpcweb/websocket_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,16 @@ import (
"golang.org/x/net/http2"
)

type WebSocketWrapper struct {
wsConn *websocket.Conn
}

type WebSocketResponseWriter struct {
type webSocketResponseWriter struct {
writtenHeaders bool
wsConn *websocket.Conn
headers http.Header
flushedHeaders http.Header
closeNotifyChan chan bool
}

func newWebSocketResponseWriter(wsConn *websocket.Conn) *WebSocketResponseWriter {
return &WebSocketResponseWriter{
func newWebSocketResponseWriter(wsConn *websocket.Conn) *webSocketResponseWriter {
return &webSocketResponseWriter{
writtenHeaders: false,
headers: make(http.Header),
flushedHeaders: make(http.Header),
Expand All @@ -36,18 +32,18 @@ func newWebSocketResponseWriter(wsConn *websocket.Conn) *WebSocketResponseWriter
}
}

func (w *WebSocketResponseWriter) Header() http.Header {
func (w *webSocketResponseWriter) Header() http.Header {
return w.headers
}

func (w *WebSocketResponseWriter) Write(b []byte) (int, error) {
func (w *webSocketResponseWriter) Write(b []byte) (int, error) {
if !w.writtenHeaders {
w.WriteHeader(http.StatusOK)
}
return len(b), w.wsConn.WriteMessage(websocket.BinaryMessage, b)
}

func (w *WebSocketResponseWriter) writeHeaderFrame(headers http.Header) {
func (w *webSocketResponseWriter) writeHeaderFrame(headers http.Header) {
headerBuffer := new(bytes.Buffer)
headers.Write(headerBuffer)
headerGrpcDataHeader := []byte{1 << 7, 0, 0, 0, 0} // MSB=1 indicates this is a header data frame.
Expand All @@ -56,7 +52,7 @@ func (w *WebSocketResponseWriter) writeHeaderFrame(headers http.Header) {
w.wsConn.WriteMessage(websocket.BinaryMessage, headerBuffer.Bytes())
}

func (w *WebSocketResponseWriter) copyFlushedHeaders() {
func (w *webSocketResponseWriter) copyFlushedHeaders() {
for k, vv := range w.headers {
// Skip the pre-annoucement of Trailer headers. Don't add them to the response headers.
if strings.ToLower(k) == "trailer" {
Expand All @@ -68,14 +64,14 @@ func (w *WebSocketResponseWriter) copyFlushedHeaders() {
}
}

func (w *WebSocketResponseWriter) WriteHeader(code int) {
func (w *webSocketResponseWriter) WriteHeader(code int) {
w.copyFlushedHeaders()
w.writtenHeaders = true
w.writeHeaderFrame(w.headers)
return
}

func (w *WebSocketResponseWriter) extractTrailerHeaders() http.Header {
func (w *webSocketResponseWriter) extractTrailerHeaders() http.Header {
trailerHeaders := make(http.Header)
for k, vv := range w.headers {
// Skip the pre-annoucement of Trailer headers. Don't add them to the response headers.
Expand All @@ -97,34 +93,34 @@ func (w *WebSocketResponseWriter) extractTrailerHeaders() http.Header {
return trailerHeaders
}

func (w *WebSocketResponseWriter) FlushTrailers() {
func (w *webSocketResponseWriter) FlushTrailers() {
w.writeHeaderFrame(w.extractTrailerHeaders())
}

func (w *WebSocketResponseWriter) Flush() {
func (w *webSocketResponseWriter) Flush() {
// no-op
}

func (w *WebSocketResponseWriter) CloseNotify() <-chan bool {
func (w *webSocketResponseWriter) CloseNotify() <-chan bool {
return w.closeNotifyChan
}

type WebSocketWrappedReader struct {
type webSocketWrappedReader struct {
wsConn *websocket.Conn
respWriter *WebSocketResponseWriter
respWriter *webSocketResponseWriter
remainingBuffer []byte
remainingError error
}

func (w *WebSocketWrappedReader) Close() error {
func (w *webSocketWrappedReader) Close() error {
w.respWriter.FlushTrailers()
return w.wsConn.Close()
}

// First byte of a binary WebSocket frame is used for control flow:
// 0 = Data
// 1 = End of client send
func (w *WebSocketWrappedReader) Read(p []byte) (int, error) {
func (w *webSocketWrappedReader) Read(p []byte) (int, error) {
// If a buffer remains from a previous WebSocket frame read then continue reading it
if w.remainingBuffer != nil {

Expand Down Expand Up @@ -197,8 +193,8 @@ func (w *WebSocketWrappedReader) Read(p []byte) (int, error) {
return len(p), nil
}

func NewWebsocketWrappedReader(wsConn *websocket.Conn, respWriter *WebSocketResponseWriter) *WebSocketWrappedReader {
return &WebSocketWrappedReader{
func newWebsocketWrappedReader(wsConn *websocket.Conn, respWriter *webSocketResponseWriter) *webSocketWrappedReader {
return &webSocketWrappedReader{
wsConn: wsConn,
respWriter: respWriter,
remainingBuffer: nil,
Expand Down
8 changes: 6 additions & 2 deletions go/grpcweb/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (w *WrappedGrpcServer) ServeHTTP(resp http.ResponseWriter, req *http.Reques
w.server.ServeHTTP(resp, req)
}

// IsGrpcWebSocketRequest determines if a request is a gRPC-Web request by checking that the "Sec-Websocket-Protocol"
// header value is "grpc-websockets"
func (w *WrappedGrpcServer) IsGrpcWebSocketRequest(req *http.Request) bool {
return req.Header.Get("Upgrade") == "websocket" && req.Header.Get("Sec-Websocket-Protocol") == "grpc-websockets"
}
Expand All @@ -116,13 +118,15 @@ var websocketUpgrader = websocket.Upgrader{
Subprotocols: []string{"grpc-websockets"},
}

// HandleGrpcWebsocketRequest takes a HTTP request that is assumed to be a gRPC-Websocket request and wraps it with a
// compatibility layer to transform it to a standard gRPC request for the wrapped gRPC server and transforms the
// response to comply with the gRPC-Web protocol.
func (w *WrappedGrpcServer) HandleGrpcWebsocketRequest(resp http.ResponseWriter, req *http.Request) {
conn, err := websocketUpgrader.Upgrade(resp, req, nil)
if err != nil {
grpclog.Errorf("Unable to upgrade websocket request: %v", err)
return
}

w.handleWebSocket(conn, req)
}

Expand All @@ -145,7 +149,7 @@ func (w *WrappedGrpcServer) handleWebSocket(wsConn *websocket.Conn, req *http.Re
}

respWriter := newWebSocketResponseWriter(wsConn)
wrappedReader := NewWebsocketWrappedReader(wsConn, respWriter)
wrappedReader := newWebsocketWrappedReader(wsConn, respWriter)

req.Body = wrappedReader
req.Method = http.MethodPost
Expand Down

0 comments on commit 920d812

Please sign in to comment.