From 18bf2f6a59eff63000aed8ef452f947c64df974f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Gomez?= Date: Wed, 30 Oct 2024 03:22:11 +0100 Subject: [PATCH] Make livereloading of watched dashboards more reliable Instead of using Grafana's websocket to trigger a dashboard refresh, let's use a more crude approach and refresh the entire page. The server-side implementation of the livereload process is taken from https://github.com/gohugoio/hugo/tree/62567d38205a61134a6822d37a534520772419f1/livereload It has been adjusted to ensure that only dashboards that changed will be reloaded. --- go.mod | 2 +- internal/livereload/connection.go | 66 +++++ internal/livereload/hub.go | 56 +++++ internal/livereload/livereload.go | 47 ++++ .../embed/templates/proxy/iframe.html.tmpl | 46 +++- pkg/grizzly/livereload/connection.go | 232 ------------------ pkg/grizzly/livereload/livereload.go | 69 ------ pkg/grizzly/resources.go | 10 - pkg/grizzly/server.go | 19 +- pkg/grizzly/watch.go | 6 +- 10 files changed, 221 insertions(+), 332 deletions(-) create mode 100644 internal/livereload/connection.go create mode 100644 internal/livereload/hub.go create mode 100644 internal/livereload/livereload.go delete mode 100644 pkg/grizzly/livereload/connection.go delete mode 100644 pkg/grizzly/livereload/livereload.go diff --git a/go.mod b/go.mod index 8e970e65..dfcb1f89 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/go-openapi/runtime v0.28.0 github.com/gobwas/glob v0.2.3 github.com/google/go-jsonnet v0.20.0 - github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 github.com/grafana/grafana-openapi-client-go v0.0.0-20240325012504-4958bdd139e7 github.com/grafana/synthetic-monitoring-agent v0.23.1 @@ -51,6 +50,7 @@ require ( github.com/go-openapi/validate v0.24.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/internal/livereload/connection.go b/internal/livereload/connection.go new file mode 100644 index 00000000..e0fb648c --- /dev/null +++ b/internal/livereload/connection.go @@ -0,0 +1,66 @@ +// Copyright 2015 The Hugo Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livereload + +import ( + "bytes" + "sync" + + "github.com/gorilla/websocket" +) + +type connection struct { + // The websocket connection. + ws *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte + + // There is a potential data race, especially visible with large files. + // This is protected by synchronization of the send channel's close. + closer sync.Once +} + +func (c *connection) close() { + c.closer.Do(func() { + close(c.send) + }) +} + +func (c *connection) reader() { + for { + _, message, err := c.ws.ReadMessage() + if err != nil { + break + } + if bytes.Contains(message, []byte(`"command":"hello"`)) { + c.send <- []byte(`{ + "command": "hello", + "protocols": ["http://livereload.com/protocols/official-7"], + "serverName": "Grizzly" + }`) + } + } + c.ws.Close() +} + +func (c *connection) writer() { + for message := range c.send { + err := c.ws.WriteMessage(websocket.TextMessage, message) + if err != nil { + break + } + } + c.ws.Close() +} diff --git a/internal/livereload/hub.go b/internal/livereload/hub.go new file mode 100644 index 00000000..8ab6083a --- /dev/null +++ b/internal/livereload/hub.go @@ -0,0 +1,56 @@ +// Copyright 2015 The Hugo Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livereload + +type hub struct { + // Registered connections. + connections map[*connection]bool + + // Inbound messages from the connections. + broadcast chan []byte + + // Register requests from the connections. + register chan *connection + + // Unregister requests from connections. + unregister chan *connection +} + +var wsHub = hub{ + broadcast: make(chan []byte), + register: make(chan *connection), + unregister: make(chan *connection), + connections: make(map[*connection]bool), +} + +func (h *hub) run() { + for { + select { + case c := <-h.register: + h.connections[c] = true + case c := <-h.unregister: + delete(h.connections, c) + c.close() + case m := <-h.broadcast: + for c := range h.connections { + select { + case c.send <- m: + default: + delete(h.connections, c) + c.close() + } + } + } + } +} diff --git a/internal/livereload/livereload.go b/internal/livereload/livereload.go new file mode 100644 index 00000000..f808afa0 --- /dev/null +++ b/internal/livereload/livereload.go @@ -0,0 +1,47 @@ +// Copyright 2015 The Hugo Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livereload + +import ( + "fmt" + "net/http" + + "github.com/gorilla/websocket" +) + +// Initialize starts the Websocket Hub handling live reloads. +func Initialize() { + go wsHub.run() +} + +// Handler is a HandlerFunc handling the livereload +// Websocket interaction. +func Handler(upgrader *websocket.Upgrader) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + c := &connection{send: make(chan []byte, 256), ws: ws} + wsHub.register <- c + defer func() { wsHub.unregister <- c }() + go c.writer() + c.reader() + } +} + +func ReloadDashboard(uid string) { + msg := fmt.Sprintf(`{"command": "reload", "path": "/grizzly/Dashboard/%s"}`, uid) + wsHub.broadcast <- []byte(msg) +} diff --git a/pkg/grizzly/embed/templates/proxy/iframe.html.tmpl b/pkg/grizzly/embed/templates/proxy/iframe.html.tmpl index d3b14f80..61b7afbf 100644 --- a/pkg/grizzly/embed/templates/proxy/iframe.html.tmpl +++ b/pkg/grizzly/embed/templates/proxy/iframe.html.tmpl @@ -5,10 +5,44 @@ Grizzly + +{{ template "proxy/header.html.tmpl" . }} + - - - {{ template "proxy/header.html.tmpl" . }} - - - \ No newline at end of file + + + + + diff --git a/pkg/grizzly/livereload/connection.go b/pkg/grizzly/livereload/connection.go deleted file mode 100644 index 609c35b6..00000000 --- a/pkg/grizzly/livereload/connection.go +++ /dev/null @@ -1,232 +0,0 @@ -package livereload - -import ( - "encoding/json" - "fmt" - "log" - "strings" - "sync" - - "github.com/google/uuid" - "github.com/gorilla/websocket" -) - -type connection struct { - ws *websocket.Conn - send chan []byte - closer sync.Once - clientID string -} - -func NewConnection(send chan []byte, ws *websocket.Conn) *connection { - return &connection{ - send: send, - ws: ws, - } -} - -func (c *connection) close() { - c.closer.Do(func() { - close(c.send) - }) -} - -type connectRequest struct { - ID int `json:"id"` -} - -type connectResponseInner struct { - Client string `json:"client"` - Ping int `json:"ping"` - Pong bool `json:"pong"` -} -type connectResponse struct { - ID int `json:"id"` - Connect connectResponseInner `json:"connect"` -} - -func (c *connection) handleConnectRequest(line string) ([]byte, error) { - // {"connect":{"name":"js"},"id":1} - request := connectRequest{} - err := json.Unmarshal([]byte(line), &request) - if err != nil { - return nil, err - } - c.clientID = uuid.New().String() - // {"id":1,"connect":{"client":"5a6674c9-2450-46e4-bfff-beaa84966493","ping":25,"pong":true}} - response := connectResponse{ - ID: request.ID, - Connect: connectResponseInner{ - Client: c.clientID, - Ping: 25, - Pong: true, - }, - } - j, err := json.Marshal(response) - if err != nil { - return nil, err - } - return j, nil -} - -type subscribeRequestInner struct { - Channel string `json:"channel"` -} -type subscribeRequest struct { - ID int `json:"id"` - Subscribe subscribeRequestInner `json:"subscribe"` -} - -type joinResponseInfo struct { - User string `json:"user"` - Client string `json:"client"` -} -type joinResponseJoin struct { - Info joinResponseInfo `json:"info"` -} -type joinResponsePush struct { - Channel string `json:"channel"` - Join joinResponseJoin `json:"join"` -} -type joinResponse struct { - Push joinResponsePush `json:"push"` -} - -func (c *connection) handleSubscribeRequest(line string) ([]byte, error) { - // {"subscribe":{"channel":"1/grafana/dashboard/uid/no-folder"},"id":2} - request := subscribeRequest{} - err := json.Unmarshal([]byte(line), &request) - if err != nil { - return nil, err - } - // {"id":2,"subscribe":{}} - subResp := map[string]any{} - subResp["id"] = request.ID - subResp["subscribe"] = map[string]any{} - j, err := json.Marshal(subResp) - if err != nil { - return nil, err - } - c.send <- j - // {"push":{"channel":"1/grafana/dashboard/uid/no-folder","join":{"info":{"user":"1","client":"5a6674c9-2450-46e4-bfff-beaa84966493"}}}} - joinResp := joinResponse{ - Push: joinResponsePush{ - Channel: request.Subscribe.Channel, - Join: joinResponseJoin{ - Info: joinResponseInfo{ - User: "1", - Client: c.clientID, - }, - }, - }, - } - j, err = json.Marshal(joinResp) - if err != nil { - return nil, err - } - return j, nil -} - -func (c *connection) reader() { - for { - _, message, err := c.ws.ReadMessage() - if err != nil { - break - } - lines := strings.Split(string(message), "\n") - for _, line := range lines { - msg := map[string]any{} - err := json.Unmarshal([]byte(line), &msg) - if err != nil { - log.Printf("Error parsing websocket message: %v", err) - continue - } - if _, ok := msg["connect"]; ok { - j, err := c.handleConnectRequest(line) - if err != nil { - log.Printf("Error handling connection request: %s", err) - continue - } - c.send <- j - } else if _, ok := msg["subscribe"]; ok { - j, err := c.handleSubscribeRequest(line) - if err != nil { - log.Printf("Error handling subscribe request: %s", err) - continue - } - c.send <- j - } - } - } - c.ws.Close() -} - -type pushResponseUser struct { - ID int `json:"id"` - Login string `json:"login"` -} -type pushResponseDashboard struct { - UID string `json:"uid"` - FolderID int `json:"folderID"` - IsFolder bool `json:"IsFolder"` - Data map[string]any `json:"data"` -} - -type pushResponseData struct { - UID string `json:"uid"` - Action string `json:"action"` - User pushResponseUser `json:"user"` - Dashboard pushResponseDashboard `json:"dashboard"` -} -type pushResponsePub struct { - Data pushResponseData `json:"data"` -} - -type pushResponsePush struct { - Channel string `json:"channel"` - Pub pushResponsePub `json:"pub"` -} - -type pushResponse struct { - Push pushResponsePush `json:"push"` -} - -func (c *connection) NotifyDashboard(uid string, spec map[string]any) error { - response := pushResponse{ - Push: pushResponsePush{ - Channel: fmt.Sprintf("1/grafana/dashboard/uid/%s", uid), - Pub: pushResponsePub{ - Data: pushResponseData{ - UID: uid, - Action: "saved", - User: pushResponseUser{ - ID: 1, - Login: "admin", - }, - Dashboard: pushResponseDashboard{ - UID: uid, - FolderID: 0, - IsFolder: false, - Data: spec, - }, - }, - }, - }, - } - j, err := json.Marshal(response) - if err != nil { - return err - } - c.send <- j - return nil -} - -func (c *connection) writer() { - for message := range c.send { - err := c.ws.WriteMessage(websocket.TextMessage, message) - if err != nil { - break - } - } - c.ws.Close() -} diff --git a/pkg/grizzly/livereload/livereload.go b/pkg/grizzly/livereload/livereload.go deleted file mode 100644 index e6d0de56..00000000 --- a/pkg/grizzly/livereload/livereload.go +++ /dev/null @@ -1,69 +0,0 @@ -package livereload - -import ( - _ "embed" - "fmt" - "net/http" - - "github.com/gorilla/websocket" - log "github.com/sirupsen/logrus" -) - -type hub struct { - connections map[*connection]bool - register chan *connection - unregister chan *connection -} - -var wsHub = hub{ - register: make(chan *connection), - unregister: make(chan *connection), - connections: make(map[*connection]bool), -} - -func Initialize() { - go func() { - for { - select { - case c := <-wsHub.register: - wsHub.connections[c] = true - case c := <-wsHub.unregister: - delete(wsHub.connections, c) - c.close() - } - } - }() -} - -func LiveReloadHandlerFunc(upgrader websocket.Upgrader) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Errorf("Error upgrading websocket: %s", err) - return - } - c := &connection{send: make(chan []byte, 256), ws: ws} - wsHub.register <- c - defer func() { wsHub.unregister <- c }() - go c.writer() - c.reader() - } -} - -func Reload(kind, name string, spec map[string]any) error { - log.Printf("Reloading %s/%s", kind, name) - if kind != "Dashboard" { - return fmt.Errorf("only dashboards supported for live reload at present") - } - if len(wsHub.connections) == 0 { - log.Warn("No connections to notify") - } - - for c := range wsHub.connections { - err := c.NotifyDashboard(name, spec) - if err != nil { - log.Errorf("Error notifying %s: %s", c.clientID, err) - } - } - return nil -} diff --git a/pkg/grizzly/resources.go b/pkg/grizzly/resources.go index 872c04d4..978868ff 100644 --- a/pkg/grizzly/resources.go +++ b/pkg/grizzly/resources.go @@ -227,16 +227,6 @@ func (r Resources) Find(ref ResourceRef) (Resource, bool) { return r.collection.Get(ref) } -func (r Resources) FindByFilename(path string) (Resource, bool) { - for _, resource := range r.AsList() { - if resource.Source.Path == path { - return resource, true - } - } - - return Resource{}, false -} - func (r Resources) Filter(predicate func(Resource) bool) Resources { filtered := make([]Resource, 0) diff --git a/pkg/grizzly/server.go b/pkg/grizzly/server.go index 49429ba6..5885417d 100644 --- a/pkg/grizzly/server.go +++ b/pkg/grizzly/server.go @@ -14,8 +14,8 @@ import ( "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/gorilla/websocket" + "github.com/grafana/grizzly/internal/livereload" "github.com/grafana/grizzly/internal/logger" - "github.com/grafana/grizzly/pkg/grizzly/livereload" "github.com/hashicorp/go-multierror" log "github.com/sirupsen/logrus" ) @@ -41,7 +41,7 @@ type Server struct { watch bool } -var upgrader = websocket.Upgrader{ +var upgrader = &websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, @@ -193,7 +193,7 @@ func (s *Server) Start() error { } r.Get("/", s.RootHandler) r.Get("/grizzly/{kind}/{name}", s.IframeHandler) - r.Get("/api/live/ws", livereload.LiveReloadHandlerFunc(upgrader)) + r.Get("/livereload", livereload.Handler(upgrader)) if s.watchScript != "" { var b []byte @@ -289,7 +289,7 @@ func (s *Server) updateWatchedResource(name string) error { } resources, err = s.ParseBytes(b) } else { - resources, err = s.ParseResources(s.ResourcePath) + resources, err = s.ParseResources(name) } if errors.As(err, &UnrecognisedFormatError{}) { uerr := err.(UnrecognisedFormatError) @@ -309,12 +309,8 @@ func (s *Server) updateWatchedResource(name string) error { } _, ok := handler.(ProxyHandler) if ok { - log.Infof("[watcher] Changes detected. Reloading %s", resource.Name()) - err = livereload.Reload(resource.Kind(), resource.Name(), resource.Spec()) - if err != nil { - log.Errorf("[watcher] Error reloading %s: %s", resource.Name(), err) - return err - } + log.Infof("[watcher] Changes detected. Reloading %s", resource.Ref()) + livereload.ReloadDashboard(resource.Name()) } } return nil @@ -368,7 +364,8 @@ func (s *Server) IframeHandler(w http.ResponseWriter, r *http.Request) { return } url := proxyHandler.ProxyURL(name) - templateVars := map[string]string{ + templateVars := map[string]any{ + "Port": s.port, "IframeURL": url, "CurrentContext": s.CurrentContext, } diff --git a/pkg/grizzly/watch.go b/pkg/grizzly/watch.go index 474688fc..0f83c4f4 100644 --- a/pkg/grizzly/watch.go +++ b/pkg/grizzly/watch.go @@ -35,8 +35,6 @@ func NewWatcher(watcherFunc func(path string) error) (*Watcher, error) { } func (w *Watcher) Add(path string) error { - log.WithField("path", path).Info("[watcher] Adding path to watch list") - stat, err := os.Stat(path) if err != nil { return err @@ -45,6 +43,7 @@ func (w *Watcher) Add(path string) error { if !stat.IsDir() { // `vim` renames and replaces, doesn't create a WRITE event. So we need to watch the whole dir and filter for our file parent := filepath.Dir(path) + "/" + log.WithField("path", parent).Debug("[watcher] Adding path to watch list") w.watches = append(w.watches, watch{path: path, parent: parent, isDir: false}) err := w.watcher.Add(parent) if err != nil { @@ -59,6 +58,7 @@ func (w *Watcher) Add(path string) error { if !strings.HasSuffix(path, "/") { path += "/" } + log.WithField("path", path).Debug("[watcher] Adding path to watch list") w.watches = append(w.watches, watch{path: path, parent: path, isDir: true}) return w.watcher.Add(path) } @@ -81,7 +81,7 @@ func (w *Watcher) Watch() error { } if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { if w.isWatched(event.Name) { - log.Info("[watcher] Changes detected. Parsing") + log.Debugf("[watcher] Changes detected: %s %s ", event.Op.String(), event.Name) err := w.watcherFunc(event.Name) if err != nil { log.Warn("[watcher] error: ", err)