-
Notifications
You must be signed in to change notification settings - Fork 9
/
server.go
216 lines (182 loc) · 5.34 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package eventbus
import (
"encoding/json"
"fmt"
"github.com/aeden/eventbus/middleware"
"io"
"io/ioutil"
"log"
"net/http"
"time"
)
// configuration options
type option func(*Server)
// The EventBus server.
type Server struct {
httpServer *http.Server
corsHostAndPort string
eventStore EventStore
servicesConfig []ServiceConfig
}
// Configure a new server that is ready to be started.
func NewServer(opts ...option) *Server {
server := &Server{
httpServer: &http.Server{
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
},
eventStore: NewNullEventStore(),
servicesConfig: []ServiceConfig{},
}
for _, opt := range opts {
opt(server)
}
mux := http.NewServeMux()
mux.Handle("/", middleware.NewCorsHandler(server.corsHostAndPort, newEventBusRequestHandler(server.servicesConfig, server.eventStore)))
mux.Handle("/ws", newWebSocketHandler(server.corsHostAndPort))
server.httpServer.Handler = mux
return server
}
/*
Start the event bus server for handling JSON events over HTTP.
This function starts a handler on the root that is used for POST
requests to construct new events. It also starts a WebSocket
handler on /ws that is used for broadcasting events to
the client or service.
*/
func (server *Server) Start() {
startWebsocketHub()
log.Printf("Starting EventBus server %s", server.httpServer.Addr)
server.httpServer.ListenAndServe()
}
// Configure the host and port of the EventBus server.
func HostAndPort(v string) option {
return func(server *Server) {
server.httpServer.Addr = v
}
}
// Configure the CORS host and port for the EventBus server. This is
// the host and port where JavaScript client calls are coming from.
func CorsHostAndPort(v string) option {
return func(server *Server) {
server.corsHostAndPort = v
}
}
// Configure the services that will attach to the EventBus server.
func Services(in io.Reader) option {
return func(server *Server) {
file, e := ioutil.ReadAll(in)
if e != nil {
log.Printf("Error reading services config: %s", e)
}
servicesConfig := &[]ServiceConfig{}
json.Unmarshal(file, servicesConfig)
server.servicesConfig = *servicesConfig
}
}
// internal
type eventBusRequestHandler struct {
servicesConfig []ServiceConfig
eventStore EventStore
}
func newEventBusRequestHandler(servicesConfig []ServiceConfig, eventStore EventStore) http.Handler {
return &eventBusRequestHandler{
servicesConfig: servicesConfig,
eventStore: eventStore,
}
}
func (handler *eventBusRequestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("Received request from %s", r.RemoteAddr)
if r.Method == "POST" {
handler.handlePost(w, r)
} else if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
} else if r.Method == "GET" {
json, err := json.Marshal(handler.eventStore.(*InMemoryEventStore).Events)
if err != nil {
log.Printf("Error marshaling events JSON: %s", err)
return
}
w.Write(json)
} else {
http.Error(w, "Not Found", http.StatusNotFound)
}
}
func (handler *eventBusRequestHandler) handlePost(w http.ResponseWriter, r *http.Request) {
authContext := handler.prepareAuthContext(w, r)
log.Printf("Authorization context: %v", authContext)
event, err := handler.decodeEvent(r)
if err != nil {
log.Printf("Parser error: %s", err)
http.Error(w, fmt.Sprintf("Parser error: %s", err), 500)
} else {
// If client access token isn't present, and auth context is nil, then 401
clientAccessToken := event.Context["identifier"]
if clientAccessToken == "" && authContext == nil {
http.Error(w, "Authorization required", http.StatusUnauthorized)
return
}
// The event is persisted here
err := handler.eventStore.WriteEvent(event)
if err != nil {
http.Error(w, "Failed to write event", http.StatusInternalServerError)
return
}
// If the event was successfully persisted, return OK
w.WriteHeader(http.StatusOK)
// Route event
go routeEvent(event)
}
}
func (handler *eventBusRequestHandler) decodeEvent(r *http.Request) (event *Event, err error) {
event = NewEvent()
decoder := json.NewDecoder(r.Body)
err = decoder.Decode(&event)
return
}
func (handler *eventBusRequestHandler) prepareAuthContext(w http.ResponseWriter, r *http.Request) (authContext interface{}) {
authorization := r.Header.Get("Authorization")
if authorization != "" {
for _, serviceConfig := range handler.servicesConfig {
if serviceConfig.Token == authorization {
authContext = serviceConfig
return
}
}
}
return
}
// routing and sending events
func routeEvent(event *Event) {
clientAccessToken := event.Context["identifier"]
// If client access token is present, then send to client
if clientAccessToken != "" {
notifyClient(clientAccessToken, event)
}
// Broadcast the event to services
notifyServices(event)
}
func notify(event *Event) {
eventJSON, err := json.Marshal(event)
if err != nil {
log.Printf("Error marshaling event JSON: %s", err)
return
}
websocketHub.send(eventJSON)
}
func notifyClient(clientAccessToken string, event *Event) {
eventJSON, err := json.Marshal(event)
if err != nil {
log.Printf("Error marshaling event JSON: %s", err)
return
}
websocketHub.sendToClient(clientAccessToken, eventJSON)
}
func notifyServices(event *Event) {
eventJSON, err := json.Marshal(event)
if err != nil {
log.Printf("Error marshaling event JSON: %s", err)
return
}
websocketHub.sendToServices(eventJSON)
}