Skip to content

Commit

Permalink
Create new function for parsing engine output
Browse files Browse the repository at this point in the history
The function next_token() extracts the next word from a string and
removes the leading and trailing whitespace around that word. It
also returns the remainder of the string unchanged. Using this function
instead of split() gets rid of extraneous whitespace when searching
for keywords and preserves whitespace in strings generated by
users and their engines.
  • Loading branch information
MarkZH committed Sep 6, 2023
1 parent b6a04e8 commit e9997d5
Showing 1 changed file with 110 additions and 59 deletions.
169 changes: 110 additions & 59 deletions chess/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1303,14 +1303,15 @@ def start(self, engine: UciProtocol) -> None:
engine.send_line("uci")

def line_received(self, engine: UciProtocol, line: str) -> None:
if line == "uciok" and not self.result.done():
token, remaining = _next_token(line)
if line.strip() == "uciok" and not self.result.done():
engine.initialized = True
self.result.set_result(None)
self.set_finished()
elif line.startswith("option "):
self._option(engine, line.split(" ", 1)[1])
elif line.startswith("id "):
self._id(engine, line.split(" ", 1)[1])
elif token == "option":
self._option(engine, remaining)
elif token == "id":
self._id(engine, remaining)

def _option(self, engine: UciProtocol, arg: str) -> None:
current_parameter = None
Expand Down Expand Up @@ -1371,8 +1372,8 @@ def _option(self, engine: UciProtocol, arg: str) -> None:
engine.target_config[option.name] = option.default

def _id(self, engine: UciProtocol, arg: str) -> None:
key, value = arg.split(" ", 1)
engine.id[key] = value
key, value = _next_token(arg)
engine.id[key] = value.strip()

return await self.communicate(UciInitializeCommand)

Expand Down Expand Up @@ -1587,11 +1588,12 @@ def start(self, engine: UciProtocol) -> None:
self._readyok(engine)

def line_received(self, engine: UciProtocol, line: str) -> None:
if line.startswith("info "):
self._info(engine, line.split(" ", 1)[1])
elif line.startswith("bestmove "):
self._bestmove(engine, line.split(" ", 1)[1])
elif line == "readyok" and self.sent_isready:
token, remaining = _next_token(line)
if token == "info":
self._info(engine, remaining)
elif token == "bestmove":
self._bestmove(engine, remaining)
elif line.strip() == "readyok" and self.sent_isready:
self._readyok(engine)
else:
LOGGER.warning("%s: Unexpected engine output: %r", engine, line)
Expand Down Expand Up @@ -1679,11 +1681,12 @@ def start(self, engine: UciProtocol) -> None:
self._readyok(engine)

def line_received(self, engine: UciProtocol, line: str) -> None:
if line.startswith("info "):
self._info(engine, line.split(" ", 1)[1])
elif line.startswith("bestmove "):
self._bestmove(engine, line.split(" ", 1)[1])
elif line == "readyok" and self.sent_isready:
token, remaining = _next_token(line)
if token == "info":
self._info(engine, remaining)
elif token == "bestmove":
self._bestmove(engine, remaining)
elif line.strip() == "readyok" and self.sent_isready:
self._readyok(engine)
else:
LOGGER.warning("%s: Unexpected engine output: %r", engine, line)
Expand Down Expand Up @@ -1733,34 +1736,39 @@ def _parse_uci_info(arg: str, root_board: chess.Board, selector: Info = INFO_ALL
if not selector:
return info

tokens = arg.split(" ")
while tokens:
parameter = tokens.pop(0)
remaining_line = arg
while remaining_line:
parameter, remaining_line = _next_token(remaining_line)

if parameter == "string":
info["string"] = " ".join(tokens)
info["string"] = remaining_line
break
elif parameter in ["depth", "seldepth", "nodes", "multipv", "currmovenumber", "hashfull", "nps", "tbhits", "cpuload"]:
try:
info[parameter] = int(tokens.pop(0)) # type: ignore
number, remaining_line = _next_token(remaining_line)
info[parameter] = int(number) # type: ignore
except (ValueError, IndexError):
LOGGER.error("Exception parsing %s from info: %r", parameter, arg)
elif parameter == "time":
try:
info["time"] = int(tokens.pop(0)) / 1000.0
time_ms, remaining_line = _next_token(remaining_line)
info["time"] = int(time_ms) / 1000.0
except (ValueError, IndexError):
LOGGER.error("Exception parsing %s from info: %r", parameter, arg)
elif parameter == "ebf":
try:
info["ebf"] = float(tokens.pop(0))
number, remaining_line = _next_token(remaining_line)
info["ebf"] = float(number)
except (ValueError, IndexError):
LOGGER.error("Exception parsing %s from info: %r", parameter, arg)
elif parameter == "score" and selector & INFO_SCORE:
try:
kind = tokens.pop(0)
value = tokens.pop(0)
if tokens and tokens[0] in ["lowerbound", "upperbound"]:
info[tokens.pop(0)] = True # type: ignore
kind, remaining_line = _next_token(remaining_line)
value, remaining_line = _next_token(remaining_line)
token, remaining_after_token = _next_token(remaining_line)
if token in ["lowerbound", "upperbound"]:
info[token] = True # type: ignore
remaining_line = remaining_after_token
if kind == "cp":
info["score"] = PovScore(Cp(int(value)), root_board.turn)
elif kind == "mate":
Expand All @@ -1771,21 +1779,28 @@ def _parse_uci_info(arg: str, root_board: chess.Board, selector: Info = INFO_ALL
LOGGER.error("Exception parsing score from info: %r", arg)
elif parameter == "currmove":
try:
info["currmove"] = chess.Move.from_uci(tokens.pop(0))
current_move, remaining_line = _next_token(remaining_line)
info["currmove"] = chess.Move.from_uci(current_move)
except (ValueError, IndexError):
LOGGER.error("Exception parsing currmove from info: %r", arg)
elif parameter == "currline" and selector & INFO_CURRLINE:
try:
if "currline" not in info:
info["currline"] = {}

cpunr = int(tokens.pop(0))
cpunr_text, remaining_line = _next_token(remaining_line)
cpunr = int(cpunr_text)
currline: List[chess.Move] = []
info["currline"][cpunr] = currline

board = root_board.copy(stack=False)
while tokens and UCI_REGEX.match(tokens[0]):
currline.append(board.push_uci(tokens.pop(0)))
while True:
next_move, remaining_line_after_move = _next_token(remaining_line)
if UCI_REGEX.match(next_move):
currline.append(board.push_uci(next_move))
remaining_line = remaining_line_after_move
else:
break
except (ValueError, IndexError):
LOGGER.error("Exception parsing currline from info: %r, position at root: %s", arg, root_board.fen())
elif parameter == "refutation" and selector & INFO_REFUTATION:
Expand All @@ -1794,27 +1809,41 @@ def _parse_uci_info(arg: str, root_board: chess.Board, selector: Info = INFO_ALL
info["refutation"] = {}

board = root_board.copy(stack=False)
refuted = board.push_uci(tokens.pop(0))
refuted_text, remaining_line = _next_token(remaining_line)
refuted = board.push_uci(refuted_text)

refuted_by: List[chess.Move] = []
info["refutation"][refuted] = refuted_by

while tokens and UCI_REGEX.match(tokens[0]):
refuted_by.append(board.push_uci(tokens.pop(0)))
while True:
token, remaining_line_after_move = _next_token(remaining_line)
if UCI_REGEX.match(token):
refuted_by.append(board.push_uci(token))
remaining_line = remaining_line_after_move
else:
break
except (ValueError, IndexError):
LOGGER.error("Exception parsing refutation from info: %r, position at root: %s", arg, root_board.fen())
elif parameter == "pv" and selector & INFO_PV:
try:
pv: List[chess.Move] = []
info["pv"] = pv
board = root_board.copy(stack=False)
while tokens and UCI_REGEX.match(tokens[0]):
pv.append(board.push_uci(tokens.pop(0)))
while True:
token, remaining_line_after_pv = _next_token(remaining_line)
if UCI_REGEX.match(token):
pv.append(board.push_uci(token))
remaining_line = remaining_line_after_pv
else:
break
except (ValueError, IndexError):
LOGGER.error("Exception parsing pv from info: %r, position at root: %s", arg, root_board.fen())
elif parameter == "wdl":
try:
info["wdl"] = PovWdl(Wdl(int(tokens.pop(0)), int(tokens.pop(0)), int(tokens.pop(0))), root_board.turn)
wins, remaining_line = _next_token(remaining_line)
draws, remaining_line = _next_token(remaining_line)
losses, remaining_line = _next_token(remaining_line)
info["wdl"] = PovWdl(Wdl(int(wins), int(draws), int(losses)), root_board.turn)
except (ValueError, IndexError):
LOGGER.error("Exception parsing wdl from info: %r", arg)

Expand Down Expand Up @@ -1944,10 +1973,11 @@ def timeout(self, engine: XBoardProtocol) -> None:
self.end(engine)

def line_received(self, engine: XBoardProtocol, line: str) -> None:
if line.startswith("#"):
token, remaining = _next_token(line)
if token.startswith("#"):
pass
elif line.startswith("feature "):
self._feature(engine, line.split(" ", 1)[1])
elif token == "feature":
self._feature(engine, remaining)
elif XBOARD_ERROR_REGEX.match(line):
raise EngineError(line)

Expand Down Expand Up @@ -2159,32 +2189,35 @@ def start(self, engine: XBoardProtocol) -> None:
engine.send_line("go")

def line_received(self, engine: XBoardProtocol, line: str) -> None:
if line.startswith("move "):
self._move(engine, line.split(" ", 1)[1])
elif line.startswith("Hint: "):
self._hint(engine, line.split(" ", 1)[1])
elif line == self.pong_after_move:
if not self.result.done():
self.result.set_result(self.play_result)
if not ponder:
token, remaining = _next_token(line)
if token == "move":
self._move(engine, remaining.strip())
elif token == "Hint:":
self._hint(engine, remaining.strip())
elif token == "pong":
pong_line = f"{token} {remaining.strip()}"
if pong_line == self.pong_after_move:
if not self.result.done():
self.result.set_result(self.play_result)
if not ponder:
self.set_finished()
elif pong_line == self.pong_after_ponder:
if not self.result.done():
self.result.set_result(self.play_result)
self.set_finished()
elif line == self.pong_after_ponder:
if not self.result.done():
self.result.set_result(self.play_result)
self.set_finished()
elif line == "offer draw":
elif f"{token} {remaining.strip()}" == "offer draw":
if not self.result.done():
self.play_result.draw_offered = True
self._ping_after_move(engine)
elif line == "resign":
elif line.strip() == "resign":
if not self.result.done():
self.play_result.resigned = True
self._ping_after_move(engine)
elif line.startswith("1-0") or line.startswith("0-1") or line.startswith("1/2-1/2"):
elif token in ["1-0", "0-1", "1/2-1/2"]:
if "resign" in line and not self.result.done():
self.play_result.resigned = True
self._ping_after_move(engine)
elif line.startswith("#"):
elif token.startswith("#"):
pass
elif XBOARD_ERROR_REGEX.match(line):
engine.first_game = True # Board state might no longer be in sync
Expand Down Expand Up @@ -2291,11 +2324,12 @@ def start(self, engine: XBoardProtocol) -> None:
self.time_limit_handle = None

def line_received(self, engine: XBoardProtocol, line: str) -> None:
if line.startswith("#"):
token, remaining = _next_token(line)
if token.startswith("#"):
pass
elif len(line.split()) >= 4 and line.lstrip()[0].isdigit():
self._post(engine, line)
elif line == self.final_pong:
elif f"{token} {remaining.strip()}" == self.final_pong:
self.end(engine)
elif XBOARD_ERROR_REGEX.match(line):
engine.first_game = True # Board state might no longer be in sync
Expand Down Expand Up @@ -2551,6 +2585,23 @@ def _parse_xboard_post(line: str, root_board: chess.Board, selector: Info = INFO
return info


def _next_token(line: str) -> tuple[str, str]:
"""Get the next token in a whitespace-delimited line of text.
The result is returned as a 2-part tuple of strings.
If the input line is empty or all whitespace, then the result is two
empty strings.
If the input line is not empty and not completely whitespace, then
the first element of the returned tuple is a single word with
leading and trailing whitespace removed. The second element is the
unchanged rest of the line."""

parts = line.split(maxsplit=1)
return (parts[0] if parts else "", parts[1] if len(parts) == 2 else "")


class BestMove:
"""Returned by :func:`chess.engine.AnalysisResult.wait()`."""

Expand Down

0 comments on commit e9997d5

Please sign in to comment.