Skip to content

Commit

Permalink
Merge pull request openshift#49 from ggreer/websocket-ping
Browse files Browse the repository at this point in the history
proxy: Add pings to websockets.
  • Loading branch information
ggreer authored May 23, 2018
2 parents 7fadf57 + 32e3e7c commit 9b354f2
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ import (
"net/http/httputil"
"net/url"
"strings"
"sync"
"time"

"github.com/gorilla/websocket"
)

var websocketPingInterval = 30 * time.Second
var websocketTimeout = 30 * time.Second

type Config struct {
HeaderBlacklist []string
Endpoint *url.URL
Expand Down Expand Up @@ -223,26 +227,53 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

defer frontend.Close()
ticker := time.NewTicker(websocketPingInterval)
var writeMutex sync.Mutex // Needed because ticker & copy are writing to frontend in separate goroutines

defer func() {
ticker.Stop()
frontend.Close()
}()

errc := make(chan error, 2)

// Can't just use io.Copy here since browsers care about frame headers.
go func() { errc <- copyMsgs(frontend, backend) }()
go func() { errc <- copyMsgs(backend, frontend) }()
go func() { errc <- copyMsgs(nil, frontend, backend) }()
go func() { errc <- copyMsgs(&writeMutex, backend, frontend) }()

// Only wait for a single error and let the defers close both connections.
<-errc
for {
select {
case <-errc:
// Only wait for a single error and let the defers close both connections.
return
case <-ticker.C:
writeMutex.Lock()
// Send pings to client to prevent load balancers and other middlemen from closing the connection early
err := frontend.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(websocketTimeout))
writeMutex.Unlock()
if err != nil {
return
}
}
}
}

func copyMsgs(dest, src *websocket.Conn) error {
func copyMsgs(writeMutex *sync.Mutex, dest, src *websocket.Conn) error {
for {
messageType, msg, err := src.ReadMessage()
if err != nil {
return err
}

if err := dest.WriteMessage(messageType, msg); err != nil {
if writeMutex == nil {
err = dest.WriteMessage(messageType, msg)
} else {
writeMutex.Lock()
err = dest.WriteMessage(messageType, msg)
writeMutex.Unlock()
}

if err != nil {
return err
}
}
Expand Down

0 comments on commit 9b354f2

Please sign in to comment.