Skip to content

Commit 7917a9b

Browse files
committed
Merge branch '107-restart-user-session' into 'master'
feat: dump user's session data in json file during shutdown to restore active sessions Closes #107 See merge request postgres-ai/joe!158
2 parents c5a4dee + 22436ff commit 7917a9b

File tree

13 files changed

+475
-50
lines changed

13 files changed

+475
-50
lines changed

.golangci.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
run:
2-
timeout: 1m
2+
timeout: 90s
33
issues-exit-code: 1
44
tests: true
55
skip-dirs:
@@ -48,6 +48,8 @@ linters-settings:
4848
- hugeParam
4949
enabled-tags:
5050
- performance
51+
godot:
52+
scope: declarations
5153

5254
linters:
5355
enable:
@@ -76,6 +78,7 @@ linters:
7678
- unused
7779
- unparam
7880
- wsl
81+
- godot
7982
enable-all: false
8083
disable:
8184
- gosec

cmd/joe/main.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@ import (
2525
"gitlab.com/postgres-ai/joe/pkg/bot"
2626
"gitlab.com/postgres-ai/joe/pkg/config"
2727
"gitlab.com/postgres-ai/joe/pkg/services/platform"
28+
"gitlab.com/postgres-ai/joe/pkg/services/storage"
2829
)
2930

3031
const (
3132
shutdownTimeout = 60 * time.Second
33+
34+
configFilePath = "config/config.yml"
35+
sessionsFilePath = "config/sessions.json"
3236
)
3337

3438
// ldflag variables.
@@ -37,7 +41,7 @@ var buildTime, version string
3741
func main() {
3842
version := formatBotVersion()
3943

40-
botCfg, err := loadConfig("config/config.yml")
44+
botCfg, err := loadConfig(configFilePath)
4145
if err != nil {
4246
log.Fatal("failed to load config: %v", err)
4347
}
@@ -56,7 +60,14 @@ func main() {
5660
ctx, cancel := context.WithCancel(context.Background())
5761
shutdownCh := setShutdownListener()
5862

59-
joeBot := bot.NewApp(botCfg, platformClient, features.NewPack())
63+
sessionsStorage := storage.NewJSONSessionData(sessionsFilePath)
64+
if err := sessionsStorage.Load(); err != nil {
65+
log.Fatal("unable to load sessions data: ", err)
66+
}
67+
68+
joeBot := bot.NewApp(botCfg, platformClient, features.NewPack(), sessionsStorage)
69+
70+
go setSighupListener(ctx, joeBot)
6071

6172
go func() {
6273
if err := joeBot.RunServer(ctx); err != nil && err != http.ErrServerClosed {
@@ -65,6 +76,7 @@ func main() {
6576
}()
6677

6778
<-shutdownCh
79+
log.Dbg("shutdown request received")
6880
cancel()
6981

7082
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
@@ -111,3 +123,20 @@ func setShutdownListener() chan os.Signal {
111123

112124
return c
113125
}
126+
127+
// setSighupListener allows to dump active sessions.
128+
func setSighupListener(ctx context.Context, app *bot.App) {
129+
c := make(chan os.Signal, 1)
130+
signal.Notify(c, syscall.SIGHUP)
131+
132+
for {
133+
select {
134+
case <-ctx.Done():
135+
return
136+
case <-c:
137+
if err := app.SaveSessions(); err != nil {
138+
log.Err("failed to save user session data: ", err)
139+
}
140+
}
141+
}
142+
}

pkg/bot/bot.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"gitlab.com/postgres-ai/joe/pkg/connection/slackrtm"
2828
"gitlab.com/postgres-ai/joe/pkg/connection/webui"
2929
"gitlab.com/postgres-ai/joe/pkg/services/dblab"
30+
"gitlab.com/postgres-ai/joe/pkg/services/storage"
3031
"gitlab.com/postgres-ai/joe/pkg/util"
3132
)
3233

@@ -43,6 +44,8 @@ type App struct {
4344

4445
dblabMu *sync.RWMutex
4546
dblabInstances map[string]*dblab.Instance
47+
48+
sessionStorage storage.PersistentSessionStorage
4649
}
4750

4851
// HealthResponse represents a response for heath-check requests.
@@ -53,13 +56,15 @@ type HealthResponse struct {
5356
}
5457

5558
// Creates a new application.
56-
func NewApp(cfg *config.Config, platformClient *platform.Client, enterprise *features.Pack) *App {
59+
func NewApp(cfg *config.Config, platformClient *platform.Client,
60+
enterprise *features.Pack, sessions storage.PersistentSessionStorage) *App {
5761
bot := App{
5862
Config: cfg,
5963
dblabMu: &sync.RWMutex{},
6064
dblabInstances: make(map[string]*dblab.Instance, len(cfg.ChannelMapping.DBLabInstances)),
6165
featurePack: enterprise,
6266
platformClient: platformClient,
67+
sessionStorage: sessions,
6368
}
6469

6570
return &bot
@@ -95,12 +100,25 @@ func (a *App) Shutdown(ctx context.Context) error {
95100
}
96101

97102
if len(a.assistants) > 0 {
103+
if err := a.SaveSessions(); err != nil {
104+
log.Err("unable to dump sessionStorage data: ", err)
105+
}
106+
98107
a.deregisterAssistants(ctx)
99108
}
100109

101110
return nil
102111
}
103112

113+
// SaveSessions dumps session data to the disk.
114+
func (a *App) SaveSessions() error {
115+
for _, assistantSvc := range a.assistants {
116+
assistantSvc.DumpSessions()
117+
}
118+
119+
return a.sessionStorage.Save()
120+
}
121+
104122
func (a *App) deregisterAssistants(ctx context.Context) {
105123
wg := sync.WaitGroup{}
106124

@@ -185,13 +203,13 @@ func (a *App) getAssistant(communicationType string, workspaceCfg config.Workspa
185203

186204
switch communicationType {
187205
case slack.CommunicationType:
188-
return slack.NewAssistant(&workspaceCfg.Credentials, a.Config, handlerPrefix, a.featurePack, a.platformClient), nil
206+
return slack.NewAssistant(&workspaceCfg.Credentials, a.Config, handlerPrefix, a.featurePack, a.platformClient, a.sessionStorage), nil
189207

190208
case slackrtm.CommunicationType:
191-
return slackrtm.NewAssistant(&workspaceCfg.Credentials, a.Config, a.featurePack, a.platformClient), nil
209+
return slackrtm.NewAssistant(&workspaceCfg.Credentials, a.Config, a.featurePack, a.platformClient, a.sessionStorage), nil
192210

193211
case webui.CommunicationType:
194-
return webui.NewAssistant(&workspaceCfg.Credentials, a.Config, handlerPrefix, a.featurePack, a.platformClient), nil
212+
return webui.NewAssistant(&workspaceCfg.Credentials, a.Config, handlerPrefix, a.featurePack, a.platformClient, a.sessionStorage), nil
195213

196214
default:
197215
return nil, errors.New("unknown workspace type given")
@@ -222,6 +240,10 @@ func (a *App) setupChannels(ctx context.Context, assistant connection.Assistant,
222240
return errors.Wrapf(err, "failed to register the %q assistant", channel.ChannelID)
223241
}
224242

243+
if err := assistant.RestoreSessions(ctx); err != nil {
244+
return errors.Wrapf(err, "failed to restore active sessions for the %q assistant", channel.ChannelID)
245+
}
246+
225247
_ = util.RunInterval(InactiveCloneCheckInterval, func() {
226248
assistant.CheckIdleSessions(ctx)
227249
})

pkg/connection/assistant.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"gitlab.com/postgres-ai/joe/pkg/models"
1111
"gitlab.com/postgres-ai/joe/pkg/services/dblab"
12+
"gitlab.com/postgres-ai/joe/pkg/services/usermanager"
1213
)
1314

1415
// Assistant defines the interface of a Query Optimization assistant.
@@ -22,11 +23,17 @@ type Assistant interface {
2223
// Deregister defines the method to deregister the assistant.
2324
Deregister(ctx context.Context) error
2425

26+
// RestoreSessions checks sessions after restart and establishes DB connection
27+
RestoreSessions(context.Context) error
28+
2529
// CheckIdleSessions defines the method for checking user idle sessions and notification about them.
2630
CheckIdleSessions(context.Context)
2731

2832
// AddChannel adds a new Database Lab instance to communication via the assistant.
2933
AddChannel(channelID, project string, dbLabInstance *dblab.Instance)
34+
35+
// DumpSessions iterates over channels and collects user's sessions to storage
36+
DumpSessions()
3037
}
3138

3239
// MessageProcessor defines the interface of a message processor.
@@ -37,6 +44,12 @@ type MessageProcessor interface {
3744
// ProcessAppMentionEvent defines the method for replying to an application mention event.
3845
ProcessAppMentionEvent(incomingMessage models.IncomingMessage)
3946

47+
// RestoreSessions checks sessions after restart and establishes DB connection
48+
RestoreSessions(ctx context.Context) error
49+
50+
// Users returns user's session data
51+
Users() usermanager.UserList
52+
4053
// CheckIdleSessions defines the method of check idleness sessions.
4154
CheckIdleSessions(ctx context.Context)
4255
}

pkg/connection/slack/assistant.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"gitlab.com/postgres-ai/joe/pkg/services/dblab"
3131
"gitlab.com/postgres-ai/joe/pkg/services/msgproc"
3232
"gitlab.com/postgres-ai/joe/pkg/services/platform"
33+
"gitlab.com/postgres-ai/joe/pkg/services/storage"
3334
"gitlab.com/postgres-ai/joe/pkg/services/usermanager"
3435
)
3536

@@ -50,19 +51,19 @@ type Assistant struct {
5051
appCfg *config.Config
5152
featurePack *features.Pack
5253
messenger *Messenger
53-
userManager *usermanager.UserManager
54+
userInformer usermanager.UserInformer
5455
platformClient *platform.Client
56+
sessionStorage storage.SessionStorage
5557
}
5658

5759
// NewAssistant returns a new assistant service.
5860
func NewAssistant(cfg *config.Credentials, appCfg *config.Config, handlerPrefix string, pack *features.Pack,
59-
platformClient *platform.Client) *Assistant {
61+
platformClient *platform.Client, sessionStorage storage.SessionStorage) *Assistant {
6062
prefix := fmt.Sprintf("/%s", strings.Trim(handlerPrefix, "/"))
6163

6264
chatAPI := slack.New(cfg.AccessToken)
6365
messenger := NewMessenger(chatAPI, &MessengerConfig{AccessToken: cfg.AccessToken})
6466
userInformer := NewUserInformer(chatAPI)
65-
userManager := usermanager.NewUserManager(userInformer, appCfg.Enterprise.Quota)
6667

6768
assistant := &Assistant{
6869
credentialsCfg: cfg,
@@ -71,8 +72,9 @@ func NewAssistant(cfg *config.Credentials, appCfg *config.Config, handlerPrefix
7172
prefix: prefix,
7273
featurePack: pack,
7374
messenger: messenger,
74-
userManager: userManager,
75+
userInformer: userInformer,
7576
platformClient: platformClient,
77+
sessionStorage: sessionStorage,
7678
}
7779

7880
return assistant
@@ -117,12 +119,12 @@ func (a *Assistant) Deregister(_ context.Context) error {
117119

118120
// AddChannel sets a message processor for a specific channel.
119121
func (a *Assistant) AddChannel(channelID, project string, dbLabInstance *dblab.Instance) {
120-
messageProcessor := a.buildMessageProcessor(project, dbLabInstance)
122+
messageProcessor := a.buildMessageProcessor(channelID, project, dbLabInstance)
121123

122124
a.addProcessingService(channelID, messageProcessor)
123125
}
124126

125-
func (a *Assistant) buildMessageProcessor(project string, dbLabInstance *dblab.Instance) *msgproc.ProcessingService {
127+
func (a *Assistant) buildMessageProcessor(channelID, project string, dbLabInstance *dblab.Instance) *msgproc.ProcessingService {
126128
processingCfg := msgproc.ProcessingConfig{
127129
App: a.appCfg.App,
128130
Platform: a.appCfg.Platform,
@@ -132,7 +134,10 @@ func (a *Assistant) buildMessageProcessor(project string, dbLabInstance *dblab.I
132134
Project: project,
133135
}
134136

135-
return msgproc.NewProcessingService(a.messenger, MessageValidator{}, dbLabInstance.Client(), a.userManager, a.platformClient,
137+
users := a.sessionStorage.GetUsers(CommunicationType, channelID)
138+
um := usermanager.NewUserManager(a.userInformer, a.appCfg.Enterprise.Quota, users)
139+
140+
return msgproc.NewProcessingService(a.messenger, MessageValidator{}, dbLabInstance.Client(), um, a.platformClient,
136141
processingCfg, a.featurePack)
137142
}
138143

@@ -156,6 +161,22 @@ func (a *Assistant) getProcessingService(channelID string) (connection.MessagePr
156161
return messageProcessor, nil
157162
}
158163

164+
// RestoreSessions checks sessions after restart and establishes DB connection.
165+
func (a *Assistant) RestoreSessions(ctx context.Context) error {
166+
log.Dbg("Restore sessions", a.prefix)
167+
168+
a.procMu.RLock()
169+
defer a.procMu.RUnlock()
170+
171+
for _, proc := range a.msgProcessors {
172+
if err := proc.RestoreSessions(ctx); err != nil {
173+
return err
174+
}
175+
}
176+
177+
return nil
178+
}
179+
159180
// CheckIdleSessions check the running user sessions for idleness.
160181
func (a *Assistant) CheckIdleSessions(ctx context.Context) {
161182
log.Dbg("Check idle sessions", a.prefix)
@@ -358,3 +379,15 @@ func (a *Assistant) verifyRequest(r *http.Request) error {
358379

359380
return nil
360381
}
382+
383+
// DumpSessions collects user's data from every message processor to sessionStorage.
384+
func (a *Assistant) DumpSessions() {
385+
log.Dbg("dump sessions", a.prefix)
386+
387+
a.procMu.RLock()
388+
defer a.procMu.RUnlock()
389+
390+
for channelID, proc := range a.msgProcessors {
391+
a.sessionStorage.SetUsers(CommunicationType, channelID, proc.Users())
392+
}
393+
}

0 commit comments

Comments
 (0)