Skip to content

Commit

Permalink
Ensure matchmaker stats behave correctly if matchmaker becomes fully …
Browse files Browse the repository at this point in the history
…empty and idle. (heroiclabs#1254)
  • Loading branch information
zyro authored Jul 30, 2024
1 parent 1811efb commit 84d47fa
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 72 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to this project are documented below.
The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org).

## [Unreleased]
### Fixed
- Ensure matchmaker stats behave correctly if matchmaker becomes fully empty and idle.

## [3.23.0] - 2024-07-27
### Added
Expand Down
113 changes: 49 additions & 64 deletions server/matchmaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"sync"
"time"

Expand All @@ -28,7 +29,6 @@ import (
"github.com/heroiclabs/nakama-common/runtime"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)

type MatchmakerPresence struct {
Expand Down Expand Up @@ -188,23 +188,9 @@ type Matchmaker interface {
SetStats(*api.MatchmakerStats)
}

type Stats struct {
TicketCount *atomic.Int32
OldestTicketCreateTimeSeconds *atomic.Int64
Completions FifoQueue[StatsEntry]
}

func NewStats(snapshotSize int) *Stats {
return &Stats{
TicketCount: atomic.NewInt32(0),
OldestTicketCreateTimeSeconds: atomic.NewInt64(0),
Completions: NewBuffer(snapshotSize),
}
}

type StatsEntry struct {
CreatedAt int64 // Unix nano
CompletedAt int64 // Unix nano
type MatchmakerStatsEntry struct {
CreatedAt int64 // Unix nanoseconds.
CompletedAt int64 // Unix nanoseconds.
}

type FifoQueue[T any] interface {
Expand All @@ -217,10 +203,10 @@ type Buffer[T any] struct {
values []*T
}

func NewBuffer(size int) *Buffer[StatsEntry] {
return &Buffer[StatsEntry]{
func NewBuffer(size int) *Buffer[MatchmakerStatsEntry] {
return &Buffer[MatchmakerStatsEntry]{
mutex: sync.RWMutex{},
values: make([]*StatsEntry, 0, size),
values: make([]*MatchmakerStatsEntry, 0, size),
}
}

Expand Down Expand Up @@ -267,7 +253,7 @@ type LocalMatchmaker struct {

indexWriter *bluge.Writer
// Running tally of matchmaker stats.
stats *Stats
statsCompletions FifoQueue[MatchmakerStatsEntry]
// Stats snapshot.
statsSnapshot *atomic.Pointer[api.MatchmakerStats]
// All tickets for a session ID.
Expand Down Expand Up @@ -305,14 +291,14 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router
ctx: ctx,
ctxCancelFn: ctxCancelFn,

indexWriter: indexWriter,
stats: NewStats(10), // Only keep 10 samples in memory
statsSnapshot: atomic.NewPointer[api.MatchmakerStats](&api.MatchmakerStats{}),
sessionTickets: make(map[string]map[string]struct{}),
partyTickets: make(map[string]map[string]struct{}),
indexes: make(map[string]*MatchmakerIndex),
activeIndexes: make(map[string]*MatchmakerIndex),
revCache: &MapOf[string, map[string]bool]{},
indexWriter: indexWriter,
statsCompletions: NewBuffer(10), // Only keep 10 samples in memory.
statsSnapshot: atomic.NewPointer[api.MatchmakerStats](&api.MatchmakerStats{}),
sessionTickets: make(map[string]map[string]struct{}),
partyTickets: make(map[string]map[string]struct{}),
indexes: make(map[string]*MatchmakerIndex),
activeIndexes: make(map[string]*MatchmakerIndex),
revCache: &MapOf[string, map[string]bool]{},
}

if revThreshold := m.config.GetMatchmaker().RevThreshold; revThreshold > 0 && m.config.GetMatchmaker().RevPrecision {
Expand Down Expand Up @@ -369,12 +355,6 @@ func (m *LocalMatchmaker) Process() {
activeIndexCount = len(m.activeIndexes)
indexCount = len(m.indexes)

// No active matchmaking tickets, the pool may be non-empty but there are no new tickets to check/query with.
if activeIndexCount == 0 {
m.Unlock()
return
}

activeIndexesCopy := make(map[string]*MatchmakerIndex, activeIndexCount)
for ticket, activeIndex := range m.activeIndexes {
activeIndexesCopy[ticket] = activeIndex
Expand All @@ -388,10 +368,38 @@ func (m *LocalMatchmaker) Process() {
}
}

m.Unlock()
defer func() {
completions := m.statsCompletions.Clone()

m.stats.TicketCount.Store(int32(indexCount))
m.stats.OldestTicketCreateTimeSeconds.Store(oldestTicketCreatedAt)
compStats := make([]*api.MatchmakerCompletionStats, 0, len(completions))
for _, c := range completions {
stats := &api.MatchmakerCompletionStats{
CreateTime: timestamppb.New(time.Unix(0, c.CreatedAt)),
CompleteTime: timestamppb.New(time.Unix(0, c.CompletedAt)),
}
compStats = append(compStats, stats)
}

stats := &api.MatchmakerStats{
TicketCount: int32(indexCount),
Completions: compStats,
}
if oldestTicketCreatedAt != 0 {
stats.OldestTicketCreateTime = timestamppb.New(time.Unix(0, oldestTicketCreatedAt))
}
m.statsSnapshot.Store(stats)
if m.statsUpdateFn != nil {
m.statsUpdateFn(stats)
}
}()

// No active matchmaking tickets, the pool may be non-empty but there are no new tickets to check/query with.
if activeIndexCount == 0 {
m.Unlock()
return
}

m.Unlock()

// Run the custom matching function if one is registered in the runtime, otherwise use the default process function.
var matchedEntries [][]*MatchmakerEntry
Expand Down Expand Up @@ -510,11 +518,11 @@ func (m *LocalMatchmaker) Process() {
}

for i, entry := range entries {
statsEntry := StatsEntry{
statsEntry := MatchmakerStatsEntry{
CreatedAt: entry.CreateTime,
CompletedAt: ts,
}
m.stats.Completions.Insert(statsEntry)
m.statsCompletions.Insert(statsEntry)

// Set per-recipient fields.
outgoing.GetMatchmakerMatched().Self = users[i]
Expand All @@ -532,29 +540,6 @@ func (m *LocalMatchmaker) Process() {
go m.matchedEntriesFn(matchedEntries)
}
}

completions := m.stats.Completions.Clone()

compStats := make([]*api.MatchmakerCompletionStats, 0, len(completions))
for _, c := range completions {
stats := &api.MatchmakerCompletionStats{
CreateTime: timestamppb.New(time.Unix(0, c.CreatedAt)),
CompleteTime: timestamppb.New(time.Unix(0, c.CompletedAt)),
}
compStats = append(compStats, stats)
}

stats := &api.MatchmakerStats{
TicketCount: m.stats.TicketCount.Load(),
Completions: compStats,
}
if t := m.stats.OldestTicketCreateTimeSeconds.Load(); t != 0 {
stats.OldestTicketCreateTime = timestamppb.New(time.Unix(t, 0))
}
m.statsSnapshot.Store(stats)
if m.statsUpdateFn != nil {
m.statsUpdateFn(stats)
}
}

func (m *LocalMatchmaker) Add(ctx context.Context, presences []*MatchmakerPresence, sessionID, partyId, query string, minCount, maxCount, countMultiple int, stringProperties map[string]string, numericProperties map[string]float64) (string, int64, error) {
Expand Down
16 changes: 8 additions & 8 deletions server/matchmaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,14 +1715,14 @@ func NewLocalBenchMatchmaker(logger, startupLogger *zap.Logger, config Config, r
ctx: ctx,
ctxCancelFn: ctxCancelFn,

indexWriter: indexWriter,
stats: NewStats(10),
statsSnapshot: atomic.NewPointer[api.MatchmakerStats](&api.MatchmakerStats{}),
sessionTickets: make(map[string]map[string]struct{}),
partyTickets: make(map[string]map[string]struct{}),
indexes: make(map[string]*MatchmakerIndex),
activeIndexes: make(map[string]*MatchmakerIndex),
revCache: &MapOf[string, map[string]bool]{},
indexWriter: indexWriter,
statsCompletions: NewBuffer(10),
statsSnapshot: atomic.NewPointer[api.MatchmakerStats](&api.MatchmakerStats{}),
sessionTickets: make(map[string]map[string]struct{}),
partyTickets: make(map[string]map[string]struct{}),
indexes: make(map[string]*MatchmakerIndex),
activeIndexes: make(map[string]*MatchmakerIndex),
revCache: &MapOf[string, map[string]bool]{},
}

if tickerActive {
Expand Down

0 comments on commit 84d47fa

Please sign in to comment.