Skip to content

Commit

Permalink
Factor out non-Go-specific parts of elo.py
Browse files Browse the repository at this point in the history
  • Loading branch information
lightvector committed Aug 26, 2023
1 parent 749ea21 commit 99f990c
Show file tree
Hide file tree
Showing 2 changed files with 324 additions and 287 deletions.
293 changes: 293 additions & 0 deletions python/elo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import copy
import math
import numpy as np
import os
import scipy.stats
import scipy.special
import itertools
from abc import ABC, abstractmethod
from typing import List, Dict, Tuple, Set, Sequence
from dataclasses import dataclass

Player = str
PlayerIdx = int
Expand Down Expand Up @@ -545,9 +549,284 @@ def line_search_ascend(strengths: np.array, cur_loglikelihood: float) -> Tuple[n
)
return info

def has_only_factors_of_2_and_3(n: int) -> bool:
while n > 1:
if n % 2 == 0:
n //= 2
elif n % 3 == 0:
n //= 3
else:
return False
return True

@dataclass
class GameRecord:
player1: str
player2: str
win: int = 0
loss: int = 0
draw: int = 0

class GameResultSummary:

def __init__(
self,
elo_prior_games: float,
estimate_first_player_advantage: bool,
):
self.results = {} # dict of { (player1_name, player2_name) : GameRecord }

self._all_game_files = set()
self._elo_prior_games = elo_prior_games # number of games for bayesian prior around Elo 0
self._estimate_first_player_advantage = estimate_first_player_advantage
self._elo_info = None
self._game_count = 0

def add_games_from_file_or_dir(self, input_file_or_dir: str, recursive=False):
"""Add games found in input_file_or_dir into the results. Repeated paths to the same file across multiple calls will be ignored."""
new_files = self._add_files(input_file_or_dir, recursive)

def add_game_record(self, record: GameRecord):
"""Add game record to results."""
if (record.player1, record.player2) not in self.results:
self.results[(record.player1, record.player2)] = GameRecord(player1=record.player1,player2=record.player2)
self.results[(record.player1, record.player2)].win += record.win
self.results[(record.player1, record.player2)].loss += record.loss
self.results[(record.player1, record.player2)].draw += record.draw
self._game_count += record.win + record.loss + record.draw

def clear(self):
"""Clear all data added."""
self.results = {}
self._all_game_files = set()
self._elo_info = None

def print_game_results(self):
"""Print tables of wins and win percentage."""
pla_names = set(itertools.chain(*(name_pair for name_pair in self.results.keys())))
self._print_result_matrix(pla_names)

def print_elos(self):
"""Print game results and maximum likelihood posterior Elos."""
elo_info = self._compute_elos_if_needed()
real_players = [player for player in elo_info.players if player != P1_ADVANTAGE_NAME]
self._print_result_matrix(real_players)
print("Elos (+/- one approx standard error):")
print(elo_info)

print("Pairwise approx % likelihood of superiority of row over column:")
los_matrix = []
for player in real_players:
los_row = []
for player2 in real_players:
los = elo_info.get_approx_likelihood_of_superiority(player,player2)
los_row.append(f"{los*100:.2f}")
los_matrix.append(los_row)
self._print_matrix(real_players,los_matrix)

surprise_matrix = []
for pla1 in real_players:
row = []
for pla2 in real_players:
if (pla1 == pla2):
row.append("-")
continue
else:
pla1_pla2 = self.results[(pla1, pla2)] if (pla1, pla2) in self.results else GameRecord(pla1,pla2)
pla2_pla1 = self.results[(pla2, pla1)] if (pla2, pla1) in self.results else GameRecord(pla2,pla1)
win = pla1_pla2.win + pla2_pla1.loss + 0.5 * (pla1_pla2.draw + pla2_pla1.draw)
total = pla1_pla2.win + pla2_pla1.win + pla1_pla2.loss + pla2_pla1.loss + pla1_pla2.draw + pla2_pla1.draw
surprise = elo_info.get_log10_odds_surprise_max_likelihood(pla1, pla2, win, total)
if total <= 0:
row.append("-")
else:
row.append(f"{surprise:.1f}")
surprise_matrix.append(row)
print("Log10odds surprise matrix given the maximum-likelihood Elos:")
print("E.g. +3.0 means a 1:1000 unexpected good performance by row vs column.")
print("E.g. -4.0 means a 1:10000 unexpected bad performance by row vs column.")
print("Extreme numbers may indicate rock/paper/scissors, or Elo is not a good model for the data.")
self._print_matrix(real_players,surprise_matrix)

print(f"Used a prior of {self._elo_prior_games} games worth that each player is near Elo 0.")

def get_elos(self) -> EloInfo:
return self._compute_elos_if_needed()

def get_game_results(self) -> Dict:
"""Return a dictionary of game results as { (player1_name, player2_name) : GameRecord }
You can retrieve results by player's name like:
results[(player1_name, player2_name)].win
results[(player1_name, player2_name)].loss
results[(player1_name, player2_name)].draw
"""
return self.results

# Functions that can be implemented by subclasses -----------------------------------------------------
# You can override these methods if you want add_games_from_file_or_dir to work.
# Otherwise, you can just add game records yourself via add_game_record.
@abstractmethod
def is_game_file(self, input_file: str) -> bool:
"""Returns true if this file or directory is one that should have game results in it somewhere"""
raise NotImplementedError()

@abstractmethod
def get_game_records(self, input_file: str) -> List[GameRecord]:
"""Return all game records contained in this file"""
raise NotImplementedError()

# Private functions ------------------------------------------------------------------------------------

def _compute_elos_if_needed(self):
if self._elo_info is None:
self._elo_info = self._estimate_elo()
return self._elo_info

def _add_files(self, input_file_or_dir, recursive):
print(f"Searching and adding files in {input_file_or_dir}, please wait...")

if not os.path.exists(input_file_or_dir):
raise Exception(f"There is no file or directory with name: {input_file_or_dir}")

files = []
if os.path.isdir(input_file_or_dir):
if recursive:
for (dirpath, dirnames, filenames) in os.walk(input_file_or_dir):
files += [os.path.join(dirpath, file) for file in filenames if self.is_game_file(os.path.join(dirpath, file))]
else:
files = [os.path.join(input_file_or_dir, file) for file in os.listdir(input_file_or_dir) if self.is_game_file(os.path.join(dirpath, file))]
else:
if self.is_game_file(input_file_or_dir):
files.append(input_file_or_dir)

# Remove duplicates
new_game_files = set(files)
new_game_files = new_game_files.difference(self._all_game_files)
self._all_game_files = self._all_game_files.union(new_game_files)
self._add_new_games_to_result_dict(new_game_files, input_file_or_dir)

print(f"Added {len(new_game_files)} new game files from {input_file_or_dir}")

def _add_new_games_to_result_dict(self, new_game_files, source):
idx = 0
for game_file in new_game_files:
records = self.get_game_records(game_file)
idx += 1
if idx % 10 == 0 and has_only_factors_of_2_and_3(idx // 10):
print(f"Added {idx}/{len(new_game_files)} game files for {source}")

for record in records:
self.add_game_record(record)

def _estimate_elo(self) -> EloInfo:
"""Estimate and print elo values. This function must be called after adding all games"""
pla_names = set(itertools.chain(*(name_pair for name_pair in self.results.keys())))
data = []
for pla_first in pla_names:
for pla_second in pla_names:
if (pla_first == pla_second):
continue
else:
if (pla_first, pla_second) not in self.results:
continue
record = self.results[(pla_first, pla_second)]
total = record.win + record.loss + record.draw
assert total >= 0
if total == 0:
continue

win = record.win + 0.5 * record.draw
winrate = win / total
data.extend(likelihood_of_games(
pla_first,
pla_second,
total,
winrate,
include_first_player_advantage=self._estimate_first_player_advantage
))

for pla in pla_names:
data.extend(make_single_player_prior(pla, self._elo_prior_games,0))
data.extend(make_center_elos_prior(list(pla_names),0)) # Add this in case user put elo_prior_games = 0
if self._estimate_first_player_advantage:
data.extend(make_single_player_prior(P1_ADVANTAGE_NAME, (1.0 + self._elo_prior_games) * 2.0, 0))

info = compute_elos(data, verbose=True)
return info

def _print_matrix(self,pla_names,results_matrix):
per_elt_space = 2
for sublist in results_matrix:
for elt in sublist:
per_elt_space = max(per_elt_space, len(str(elt)))
per_elt_space += 2

per_name_space = 1 if len(pla_names) == 0 else max(len(name) for name in pla_names)
per_name_space += 1
if per_name_space > per_elt_space:
per_elt_space += 1

row_format = f"{{:>{per_name_space}}}" + f"{{:>{per_elt_space}}}" * len(results_matrix)
print(row_format.format("", *[name[:per_elt_space-2] for name in pla_names]))
for name, row in zip(pla_names, results_matrix):
print(row_format.format(name, *row))

def _print_result_matrix(self, pla_names):
print(f"Total games: {self._game_count}")
print("Games by player:")
for pla1 in pla_names:
total = 0
for pla2 in pla_names:
if (pla1 == pla2):
continue
else:
pla1_pla2 = self.results[(pla1, pla2)] if (pla1, pla2) in self.results else GameRecord(pla1,pla2)
pla2_pla1 = self.results[(pla2, pla1)] if (pla2, pla1) in self.results else GameRecord(pla2,pla1)
total += pla1_pla2.win + pla2_pla1.win + pla1_pla2.loss + pla2_pla1.loss + pla1_pla2.draw + pla2_pla1.draw
print(f"{pla1}: {total:.1f}")

print("Wins by row player against column player:")
result_matrix = []
for pla1 in pla_names:
row = []
for pla2 in pla_names:
if (pla1 == pla2):
row.append("-")
continue
else:
pla1_pla2 = self.results[(pla1, pla2)] if (pla1, pla2) in self.results else GameRecord(pla1,pla2)
pla2_pla1 = self.results[(pla2, pla1)] if (pla2, pla1) in self.results else GameRecord(pla2,pla1)
win = pla1_pla2.win + pla2_pla1.loss + 0.5 * (pla1_pla2.draw + pla2_pla1.draw)
total = pla1_pla2.win + pla2_pla1.win + pla1_pla2.loss + pla2_pla1.loss + pla1_pla2.draw + pla2_pla1.draw
row.append(f"{win:.1f}/{total:.1f}")
result_matrix.append(row)
self._print_matrix(pla_names,result_matrix)

print("Win% by row player against column player:")
result_matrix = []
for pla1 in pla_names:
row = []
for pla2 in pla_names:
if (pla1 == pla2):
row.append("-")
continue
else:
pla1_pla2 = self.results[(pla1, pla2)] if (pla1, pla2) in self.results else GameRecord(pla1,pla2)
pla2_pla1 = self.results[(pla2, pla1)] if (pla2, pla1) in self.results else GameRecord(pla2,pla1)
win = pla1_pla2.win + pla2_pla1.loss + 0.5 * (pla1_pla2.draw + pla2_pla1.draw)
total = pla1_pla2.win + pla2_pla1.win + pla1_pla2.loss + pla2_pla1.loss + pla1_pla2.draw + pla2_pla1.draw
if total <= 0:
row.append("-")
else:
row.append(f"{win/total*100.0:.1f}%")
result_matrix.append(row)

self._print_matrix(pla_names,result_matrix)

# Testing code
if __name__ == "__main__":
# Example 1
data = []
data.extend(likelihood_of_games("Alice","Bob", 18, 12/18, False))
data.extend(likelihood_of_games("Bob","Carol", 18, 12/18, False))
Expand All @@ -560,3 +839,17 @@ def line_search_ascend(strengths: np.array, cur_loglikelihood: float) -> Tuple[n
for player2 in info.players:
print(info.get_approx_likelihood_of_superiority(player,player2),end=" ")
print()

# Example 2
summary = GameResultSummary(
elo_prior_games=1.0,
estimate_first_player_advantage=False,
)
summary.add_game_record(GameRecord("Alice","Bob",win=12,loss=6,draw=0))
summary.add_game_record(GameRecord("Bob","Carol",win=12,loss=6,draw=0))
summary.add_game_record(GameRecord("Carol","Dan",win=36,loss=18,draw=0))
summary.add_game_record(GameRecord("Dan","Eve",win=48,loss=24,draw=0))
summary.print_elos()



Loading

0 comments on commit 99f990c

Please sign in to comment.