Skip to content

Commit 13af91f

Browse files
committed
Improve matchmaker and match registry memory reuse.
1 parent 75cf6ca commit 13af91f

File tree

4 files changed

+133
-92
lines changed

4 files changed

+133
-92
lines changed

server/config.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,9 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string {
265265
if config.GetMatchmaker().MaxIntervals < 1 {
266266
logger.Fatal("Matchmaker max intervals must be >= 1", zap.Int("matchmaker.max_intervals", config.GetMatchmaker().MaxIntervals))
267267
}
268+
if config.GetMatchmaker().BatchPoolSize < 1 {
269+
logger.Fatal("Matchmaker batch pool size must be >= 1", zap.Int("matchmaker.batch_pool_size", config.GetMatchmaker().BatchPoolSize))
270+
}
268271

269272
// If the runtime path is not overridden, set it to `datadir/modules`.
270273
if config.GetRuntime().Path == "" {
@@ -874,15 +877,17 @@ func NewLeaderboardConfig() *LeaderboardConfig {
874877
}
875878

876879
type MatchmakerConfig struct {
877-
MaxTickets int `yaml:"max_tickets" json:"max_tickets" usage:"Maximum number of concurrent matchmaking tickets allowed per session or party. Default 3."`
878-
IntervalSec int `yaml:"interval_sec" json:"interval_sec" usage:"How quickly the matchmaker attempts to form matches, in seconds. Default 15."`
879-
MaxIntervals int `yaml:"max_intervals" json:"max_intervals" usage:"How many intervals the matchmaker attempts to find matches at the max player count, before allowing min count. Default 2."`
880+
MaxTickets int `yaml:"max_tickets" json:"max_tickets" usage:"Maximum number of concurrent matchmaking tickets allowed per session or party. Default 3."`
881+
IntervalSec int `yaml:"interval_sec" json:"interval_sec" usage:"How quickly the matchmaker attempts to form matches, in seconds. Default 15."`
882+
MaxIntervals int `yaml:"max_intervals" json:"max_intervals" usage:"How many intervals the matchmaker attempts to find matches at the max player count, before allowing min count. Default 2."`
883+
BatchPoolSize int `yaml:"batch_pool_size" json:"batch_pool_size" usage:"Number of concurrent indexing batches that will be allocated."`
880884
}
881885

882886
func NewMatchmakerConfig() *MatchmakerConfig {
883887
return &MatchmakerConfig{
884-
MaxTickets: 3,
885-
IntervalSec: 15,
886-
MaxIntervals: 2,
888+
MaxTickets: 3,
889+
IntervalSec: 15,
890+
MaxIntervals: 2,
891+
BatchPoolSize: 32,
887892
}
888893
}

server/leaderboard_rank_cache.go

+80-66
Original file line numberDiff line numberDiff line change
@@ -119,89 +119,103 @@ func NewLocalLeaderboardRankCache(startupLogger *zap.Logger, db *sql.DB, config
119119

120120
nowTime := time.Now().UTC()
121121

122-
skippedLeaderboards := make([]string, 0, 10)
123-
leaderboards := leaderboardCache.GetAllLeaderboards()
124-
cachedLeaderboards := make([]string, 0, len(leaderboards))
125-
for _, leaderboard := range leaderboards {
126-
if _, ok := cache.blacklistIds[leaderboard.Id]; ok {
127-
startupLogger.Debug("Skip caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id))
128-
skippedLeaderboards = append(skippedLeaderboards, leaderboard.Id)
129-
continue
130-
}
122+
go func() {
123+
skippedLeaderboards := make([]string, 0, 10)
124+
leaderboards := leaderboardCache.GetAllLeaderboards()
125+
cachedLeaderboards := make([]string, 0, len(leaderboards))
126+
for _, leaderboard := range leaderboards {
127+
if _, ok := cache.blacklistIds[leaderboard.Id]; ok {
128+
startupLogger.Debug("Skip caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id))
129+
skippedLeaderboards = append(skippedLeaderboards, leaderboard.Id)
130+
continue
131+
}
131132

132-
cachedLeaderboards = append(cachedLeaderboards, leaderboard.Id)
133-
startupLogger.Debug("Caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id))
133+
cachedLeaderboards = append(cachedLeaderboards, leaderboard.Id)
134+
startupLogger.Debug("Caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id))
134135

135-
// Current expiry for this leaderboard.
136-
// This matches calculateTournamentDeadlines
137-
var expiryUnix int64
138-
if leaderboard.ResetSchedule != nil {
139-
expiryUnix = leaderboard.ResetSchedule.Next(nowTime).UTC().Unix()
140-
if leaderboard.EndTime > 0 && expiryUnix > leaderboard.EndTime {
136+
// Current expiry for this leaderboard.
137+
// This matches calculateTournamentDeadlines
138+
var expiryUnix int64
139+
if leaderboard.ResetSchedule != nil {
140+
expiryUnix = leaderboard.ResetSchedule.Next(nowTime).UTC().Unix()
141+
if leaderboard.EndTime > 0 && expiryUnix > leaderboard.EndTime {
142+
expiryUnix = leaderboard.EndTime
143+
}
144+
} else {
141145
expiryUnix = leaderboard.EndTime
142146
}
143-
} else {
144-
expiryUnix = leaderboard.EndTime
145-
}
146147

147-
// Prepare structure to receive rank data.
148-
rankCache := &RankCache{
149-
owners: make(map[uuid.UUID]skiplist.Interface),
150-
cache: skiplist.New(),
151-
}
152-
key := LeaderboardWithExpiry{LeaderboardId: leaderboard.Id, Expiry: expiryUnix}
153-
cache.cache[key] = rankCache
148+
// Prepare structure to receive rank data.
149+
key := LeaderboardWithExpiry{LeaderboardId: leaderboard.Id, Expiry: expiryUnix}
150+
cache.Lock()
151+
rankCache, found := cache.cache[key]
152+
if !found {
153+
rankCache = &RankCache{
154+
owners: make(map[uuid.UUID]skiplist.Interface),
155+
cache: skiplist.New(),
156+
}
157+
cache.cache[key] = rankCache
158+
}
159+
cache.Unlock()
154160

155-
// Look up all active records for this leaderboard.
156-
query := `
161+
// Look up all active records for this leaderboard.
162+
query := `
157163
SELECT owner_id, score, subscore
158164
FROM leaderboard_record
159165
WHERE leaderboard_id = $1 AND expiry_time = $2`
160-
rows, err := db.Query(query, leaderboard.Id, time.Unix(expiryUnix, 0).UTC())
161-
if err != nil {
162-
startupLogger.Fatal("Failed to caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
163-
return nil
164-
}
165-
166-
// Process the records.
167-
for rows.Next() {
168-
var ownerIDStr string
169-
var score int64
170-
var subscore int64
171-
172-
if err = rows.Scan(&ownerIDStr, &score, &subscore); err != nil {
173-
startupLogger.Fatal("Failed to scan leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
174-
return nil
175-
}
176-
ownerID, err := uuid.FromString(ownerIDStr)
166+
rows, err := db.Query(query, leaderboard.Id, time.Unix(expiryUnix, 0).UTC())
177167
if err != nil {
178-
startupLogger.Fatal("Failed to parse scanned leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.String("owner_id", ownerIDStr), zap.Error(err))
179-
return nil
168+
startupLogger.Error("Failed to caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
169+
continue
180170
}
181171

182-
// Prepare new rank data for this leaderboard entry.
183-
var rankData skiplist.Interface
184-
if leaderboard.SortOrder == LeaderboardSortOrderDescending {
185-
rankData = &RankDesc{
186-
OwnerId: ownerID,
187-
Score: score,
188-
Subscore: subscore,
172+
// Process the records.
173+
for rows.Next() {
174+
var ownerIDStr string
175+
var score int64
176+
var subscore int64
177+
178+
if err = rows.Scan(&ownerIDStr, &score, &subscore); err != nil {
179+
startupLogger.Error("Failed to scan leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
180+
break
189181
}
190-
} else {
191-
rankData = &RankAsc{
192-
OwnerId: ownerID,
193-
Score: score,
194-
Subscore: subscore,
182+
ownerID, err := uuid.FromString(ownerIDStr)
183+
if err != nil {
184+
startupLogger.Error("Failed to parse scanned leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.String("owner_id", ownerIDStr), zap.Error(err))
185+
break
195186
}
196-
}
197187

198-
rankCache.owners[ownerID] = rankData
199-
rankCache.cache.Insert(rankData)
188+
// Prepare new rank data for this leaderboard entry.
189+
var rankData skiplist.Interface
190+
if leaderboard.SortOrder == LeaderboardSortOrderDescending {
191+
rankData = &RankDesc{
192+
OwnerId: ownerID,
193+
Score: score,
194+
Subscore: subscore,
195+
}
196+
} else {
197+
rankData = &RankAsc{
198+
OwnerId: ownerID,
199+
Score: score,
200+
Subscore: subscore,
201+
}
202+
}
203+
204+
rankCache.Lock()
205+
if _, alreadyInserted := rankCache.owners[ownerID]; alreadyInserted {
206+
rankCache.Unlock()
207+
continue
208+
}
209+
rankCache.owners[ownerID] = rankData
210+
rankCache.cache.Insert(rankData)
211+
rankCache.Unlock()
212+
}
213+
_ = rows.Close()
200214
}
201-
_ = rows.Close()
202-
}
203215

204-
startupLogger.Info("Leaderboard rank cache initialization completed successfully", zap.Strings("cached", cachedLeaderboards), zap.Strings("skipped", skippedLeaderboards))
216+
startupLogger.Info("Leaderboard rank cache initialization completed successfully", zap.Strings("cached", cachedLeaderboards), zap.Strings("skipped", skippedLeaderboards))
217+
}()
218+
205219
return cache
206220
}
207221

server/match_registry.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -181,21 +181,22 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, ses
181181

182182
go func() {
183183
ticker := time.NewTicker(time.Duration(config.GetMatch().LabelUpdateIntervalMs) * time.Millisecond)
184+
batch := r.index.NewBatch()
184185
for {
185186
select {
186187
case <-ctx.Done():
187188
ticker.Stop()
188189
return
189190
case <-ticker.C:
190-
r.processLabelUpdates()
191+
r.processLabelUpdates(batch)
191192
}
192193
}
193194
}()
194195

195196
return r
196197
}
197198

198-
func (r *LocalMatchRegistry) processLabelUpdates() {
199+
func (r *LocalMatchRegistry) processLabelUpdates(batch *bleve.Batch) {
199200
r.pendingUpdatesMutex.Lock()
200201
if len(r.pendingUpdates) == 0 {
201202
r.pendingUpdatesMutex.Unlock()
@@ -205,7 +206,6 @@ func (r *LocalMatchRegistry) processLabelUpdates() {
205206
r.pendingUpdates = make(map[string]*MatchIndexEntry, len(pendingUpdates)+10)
206207
r.pendingUpdatesMutex.Unlock()
207208

208-
batch := r.index.NewBatch()
209209
for id, op := range pendingUpdates {
210210
if op == nil {
211211
batch.Delete(id)
@@ -219,6 +219,7 @@ func (r *LocalMatchRegistry) processLabelUpdates() {
219219
if err := r.index.Batch(batch); err != nil {
220220
r.logger.Error("error processing match label updates", zap.Error(err))
221221
}
222+
batch.Reset()
222223
}
223224

224225
func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error) {

0 commit comments

Comments
 (0)