forked from slack-go/slack
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfuncs.go
139 lines (125 loc) · 3.31 KB
/
funcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package slacktest
import (
"context"
"fmt"
"log"
"time"
websocket "github.com/gorilla/websocket"
slack "github.com/rusq/slack"
)
func (sts *Server) queueForWebsocket(s, hubname string) {
channel, err := getHubForServer(hubname)
if err != nil {
log.Printf("Unable to get server's channels: %s", err.Error())
}
sts.seenOutboundMessages.Lock()
sts.seenOutboundMessages.messages = append(sts.seenOutboundMessages.messages, s)
sts.seenOutboundMessages.Unlock()
channel.sent <- s
}
func handlePendingMessages(c *websocket.Conn, hubname string) {
channel, err := getHubForServer(hubname)
if err != nil {
log.Printf("Unable to get server's channels: %s", err.Error())
return
}
for m := range channel.sent {
err := c.WriteMessage(websocket.TextMessage, []byte(m))
if err != nil {
log.Printf("error writing message to websocket: %s", err.Error())
continue
}
}
}
func (sts *Server) postProcessMessage(m, hubname string) {
channel, err := getHubForServer(hubname)
if err != nil {
log.Printf("Unable to get server's channels: %s", err.Error())
return
}
sts.seenInboundMessages.Lock()
sts.seenInboundMessages.messages = append(sts.seenInboundMessages.messages, m)
sts.seenInboundMessages.Unlock()
// send to firehose
channel.seen <- m
}
func newHub() *hub {
h := &hub{}
c := make(map[string]*messageChannels)
h.serverChannels = c
return h
}
func addServerToHub(s *Server, channels *messageChannels) error {
if s.ServerAddr == "" {
return ErrEmptyServerToHub
}
masterHub.Lock()
masterHub.serverChannels[s.ServerAddr] = channels
masterHub.Unlock()
return nil
}
func getHubForServer(serverAddr string) (*messageChannels, error) {
if serverAddr == "" {
return &messageChannels{}, ErrPassedEmptyServerAddr
}
masterHub.RLock()
defer masterHub.RUnlock()
channels, ok := masterHub.serverChannels[serverAddr]
if !ok {
return &messageChannels{}, ErrNoQueuesRegisteredForServer
}
return channels, nil
}
// BotNameFromContext returns the botname from a provided context
func BotNameFromContext(ctx context.Context) string {
botname, ok := ctx.Value(ServerBotNameContextKey).(string)
if !ok {
return defaultBotName
}
return botname
}
// BotIDFromContext returns the bot userid from a provided context
func BotIDFromContext(ctx context.Context) string {
botname, ok := ctx.Value(ServerBotIDContextKey).(string)
if !ok {
return defaultBotID
}
return botname
}
// generate a full rtminfo response for initial rtm connections
func generateRTMInfo(ctx context.Context, wsurl string) *fullInfoSlackResponse {
rtmInfo := slack.Info{
URL: wsurl,
Team: defaultTeam,
User: defaultBotInfo,
}
rtmInfo.User.ID = BotIDFromContext(ctx)
rtmInfo.User.Name = BotNameFromContext(ctx)
return &fullInfoSlackResponse{
rtmInfo,
okWebResponse,
}
}
func nowAsJSONTime() slack.JSONTime {
return slack.JSONTime(time.Now().Unix())
}
func defaultBotInfoJSON(ctx context.Context) string {
botid := BotIDFromContext(ctx)
botname := BotNameFromContext(ctx)
return fmt.Sprintf(`
{
"ok":true,
"bot":{
"id": "%s",
"app_id": "A4H1JB4AZ",
"deleted": false,
"name": "%s",
"icons": {
"image_36": "https://localhost.localdomain/img36.png",
"image_48": "https://localhost.localdomain/img48.png",
"image_72": "https://localhost.localdomain/img72.png"
}
}
}
`, botid, botname)
}