Skip to content

Commit

Permalink
Graphs and database heuristic
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeyt8 committed Dec 4, 2023
1 parent 92aea05 commit 44dcce8
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 34 deletions.
64 changes: 40 additions & 24 deletions Rubiks_cube.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
"source": [
"from pocket_cube.cube import Cube\n",
"from pocket_cube.cube import Move\n",
"from tests import test_list, test, is_solved\n",
"from heuristics import hamming, inverse_hamming, blocked_hamming, blocked_inverse_hamming, manhattan, inverse_manhattan\n",
"from tests import test_list, test, is_solved, TestCase, draw_graph, test_mtcs\n",
"from heuristics import hamming, inverse_hamming, \\\n",
" blocked_hamming, blocked_inverse_hamming, \\\n",
" manhattan, inverse_manhattan, \\\n",
" build_database, database_heuristic\n",
"from utils import get_neighbors, get_path, met_in_the_middle, FrontierItem, DiscoveredDict\n",
"\n",
"from heapq import heappush, heappop\n",
"from typing import Callable\n",
"\n",
"%matplotlib notebook\n",
"import time\n",
""
]
},
Expand All @@ -26,10 +28,10 @@
"outputs": [],
"source": [
"# A*\n",
"def a_star(cube: Cube) -> (list[Move], int):\n",
"def a_star(cube: Cube, heuristic: Callable[[Cube], int]) -> (list[Move], int):\n",
" # initialize with cube\n",
" frontier: list[FrontierItem] = []\n",
" heappush(frontier, (0 + manhattan(cube), cube.hash(), cube.clone()))\n",
" heappush(frontier, (0 + heuristic(cube), cube.hash(), cube.clone()))\n",
" discovered: DiscoveredDict = {cube.hash(): (None, None, 0)}\n",
" # search\n",
" while frontier:\n",
Expand All @@ -40,7 +42,7 @@
" score: int = discovered[currentCube.hash()][2] + 1\n",
" if neighbor.hash() not in discovered or score < discovered[neighbor.hash()][2]:\n",
" discovered[neighbor.hash()] = (currentCube.hash(), move, score)\n",
" node: FrontierItem = (score + manhattan(neighbor), neighbor.hash(), neighbor.clone())\n",
" node: FrontierItem = (score + heuristic(neighbor), neighbor.hash(), neighbor.clone())\n",
" heappush(frontier, node)\n",
" # get path\n",
" return (get_path(currentCube.hash(), discovered), len(discovered))"
Expand All @@ -53,7 +55,8 @@
"outputs": [],
"source": [
"# Test A*\n",
"test(a_star, test_list)"
"test_res_astar: list[TestCase] = test(lambda cube: a_star(cube, manhattan), test_list)\n",
"draw_graph(test_res_astar)"
]
},
{
Expand Down Expand Up @@ -99,7 +102,8 @@
"outputs": [],
"source": [
"# Test Bidirectional BFS\n",
"test(bidirectional_bfs, test_list)"
"test_res_bfs: list[TestCase] = test(bidirectional_bfs, test_list)\n",
"draw_graph(test_res_bfs)"
]
},
{
Expand Down Expand Up @@ -204,20 +208,7 @@
"outputs": [],
"source": [
"# Test MTCS\n",
"for heuristic in [inverse_manhattan, inverse_hamming]:\n",
" for c in [0.1, 0.5]:\n",
" for budget in [1000, 5000, 10000, 20000]:\n",
" print(f\"Heuristic: {heuristic.__name__} Budget: {budget}, c: {c}\")\n",
" test_results: list[list[tuple[bool, float, int, int]]] = []\n",
" for _ in range(0, 20):\n",
" test_results.append(test(lambda cube: play_mtcs(cube, budget, c, heuristic), test_list, False))\n",
" # compute average test result\n",
" for i in range(len(test_list)):\n",
" # average for test i\n",
" test_result: tuple[float, int, int] = (0, 0, 0)\n",
" for result in test_results:\n",
" test_result = (result[i][1], result[i][2], result[i][3])\n",
" print(f\"Test {i} average: Time: {test_result[0] / len(test_results)} seconds. States expanded: {test_result[1] / len(test_results)}. Path length: {test_result[2] / len(test_results)}\")"
"test_mtcs(lambda cube, budget, c, heuristic: play_mtcs(cube, budget, c, heuristic), [inverse_manhattan, inverse_hamming])"
]
},
{
Expand All @@ -226,7 +217,32 @@
"metadata": {},
"outputs": [],
"source": [
""
"# Build database\n",
"start = time.time()\n",
"database = build_database()\n",
"end = time.time()\n",
"print(f\"Database built in {end - start} seconds.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Test A* with database\n",
"test_result_astar_database: list[TestCase] = test(lambda cube: a_star(cube, lambda cube: database_heuristic(cube, database, manhattan)), test_list)\n",
"draw_graph(test_result_astar_database)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Test MTCS with database\n",
"test_mtcs(lambda cube, budget, c, heuristic: play_mtcs(cube, budget, c, heuristic), lambda cube: database_heuristic(cube, database, inverse_manhattan))"
]
}
],
Expand Down
47 changes: 42 additions & 5 deletions heuristics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from pocket_cube.cube import Cube
from pocket_cube.cube import Move
from utils import get_neighbors
import numpy as np
from typing import Callable

# neighbours of a certain square considering rotations as moves
square_neighbours = [[1,3,4,5], [0,2,4,5], [1,4,3,5], [0,2,4,5], [0,1,2,3], [0,1,2,3]]
Expand Down Expand Up @@ -90,14 +93,12 @@ def manhattan(cube: Cube) -> int:
Returns:
int: The sum of the distances from each square to the correct face.
"""
res: int = 0
max_distance: int = 0
for face in range(6):
max_distance: int = 0
for i in range(face * 4, face * 4 + 4):
distance: int = __distance_to_correct_face(cube, i)
max_distance = max(max_distance, distance)
res += max_distance
return res
return max_distance

def inverse_manhattan(cube: Cube) -> int:
"""
Expand All @@ -109,4 +110,40 @@ def inverse_manhattan(cube: Cube) -> int:
Returns:
int: The sum of the distances from each square to the correct face, subtracted from the maximum.
"""
return 12 - manhattan(cube)
return 2 - manhattan(cube)

def build_database(max_depth: int = 7) -> dict[str, int]:
"""
Builds a database of the distance to the solved state for each state with a depth lower than max_depth.
Args:
max_depth (int, optional): The maximum depth to search. Defaults to 7.
Returns:
dict[str, int]: The database.
"""
database: dict[str, int] = {}
frontier: list[tuple[Cube, int]] = [(Cube(), 0)]
while frontier:
(cube, depth) = frontier.pop()
if cube.hash() not in database or database[cube.hash()] > depth:
database[cube.hash()] = depth
if depth < max_depth:
for (neighbor, _) in get_neighbors(cube):
frontier.append((neighbor, depth + 1))
return database

def database_heuristic(cube: Cube, database: dict[str, int], default_heuristic: Callable[[Cube], int]) -> int:
"""
Returns a heuristic function that uses the databse heuristic if the entry exists, and the default heuristic otherwise.
Args:
default_heuristic (Callable[[Cube], int]): The default heuristic.
Returns:
int: The heuristic value.
"""
if cube.hash() in database:
return database[cube.hash()]
else:
return default_heuristic(cube)
69 changes: 64 additions & 5 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Callable
import time
from pocket_cube.cube import Cube, Move
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

case1 = "R U' R' F' U"
case2 = "F' R U R U F' U'"
Expand All @@ -13,6 +15,8 @@
[case1, case2, case3, case4])
)

TestCase = tuple[bool, float, int, int]

def is_solved(cube: Cube) -> bool:
"""
Checks if the cube is solved.
Expand All @@ -23,12 +27,12 @@ def is_solved(cube: Cube) -> bool:
Returns:
bool: True if the cube is solved, False otherwise.
"""
for i in range(len(cube.state)):
if cube.state[i] != cube.goal_state[i]:
for i in range(0, len(cube.state), 4):
if cube.state[i] != cube.state[i + 1] or cube.state[i] != cube.state[i + 2] or cube.state[i] != cube.state[i + 3]:
return False
return True

def test(algorithm: Callable[[Cube], list[Move]], tests: list[list[Move]], log: bool = True) -> list[tuple[bool, float, int, int]]:
def test(algorithm: Callable[[Cube], tuple[list[Move], int]], tests: list[list[Move]], log: bool = True) -> list[TestCase]:
"""
Tests the algorithm with the given tests.
Expand All @@ -39,7 +43,7 @@ def test(algorithm: Callable[[Cube], list[Move]], tests: list[list[Move]], log:
Returns:
list[tuple[float, int, int]]: The time taken, the number of states expanded and the length of the path for each test.
"""
res: list[tuple[bool, float, int, int]] = []
res: list[TestCase] = []
for idx, test in enumerate(tests):
success: bool = True
cube: Cube = Cube(test)
Expand All @@ -56,4 +60,59 @@ def test(algorithm: Callable[[Cube], list[Move]], tests: list[list[Move]], log:
if log:
print(f"Test {idx} passed. Time: {end - start} seconds. States expanded: {states}. Path length: {len(path)}")
res.append((success, end - start, states, len(path)))
return res
return res

def test_mtcs(algorithm: Callable[[Cube, int, float, Callable[[Cube], int]], tuple[list[Move], int]], heuristic_list: list[Callable[[Cube], int]]) -> None:
for heuristic in heuristic_list:
for c in [0.1, 0.5]:
for budget in [1000, 5000, 10000, 20000]:
print(f"Heuristic: {heuristic.__name__} Budget: {budget}, c: {c}")
test_results: list[list[TestCase]] = []
for _ in range(0, 20):
test_results.append(test(lambda cube: algorithm(cube, budget, c, heuristic), test_list, False))
# compute average test result
test_results_averaged = []
for i in range(len(test_list)):
# average for test i
test_result: tuple[float, int, int] = (0, 0, 0)
no_passed: int = 0
for result in test_results:
if result[i][0]:
no_passed += 1
test_result += (result[i][1], result[i][2], result[i][3])
if (no_passed == 0):
print(f"Test {i} failed.")
test_results_averaged.append((False, 0, 0, 0))
else:
print(f"Accuracy: {no_passed / len(test_results) * 100}%. Test {i} average: Time: {test_result[0] / no_passed} seconds. States expanded: {test_result[1] / no_passed}. Path length: {test_result[2] / no_passed}")
test_results_averaged.append((True, test_result[0] / no_passed, test_result[1] / no_passed, test_result[2] / no_passed))
draw_graph(test_results_averaged)

def draw_graph(test_cases: list[TestCase]) -> None:
# time plot
fig, ax = plt.subplots()
ax.xaxis.set_major_locator(ticker.MaxNLocator(integer=True))
bars = ax.bar(range(len(test_cases)), [test_case[1] for test_case in test_cases])
ax.bar_label(bars)
ax.set_xlabel("Test")
ax.set_ylabel("Time")
ax.set_title("Time taken by each test")
plt.show()
# states expanded plot
fig, ax = plt.subplots()
ax.xaxis.set_major_locator(ticker.MaxNLocator(integer=True))
bars = ax.bar(range(len(test_cases)), [test_case[2] for test_case in test_cases])
ax.bar_label(bars)
ax.set_xlabel("Test")
ax.set_ylabel("States expanded")
ax.set_title("States expanded by each test")
plt.show()
# path length plot
fig, ax = plt.subplots()
ax.xaxis.set_major_locator(ticker.MaxNLocator(integer=True))
bars = ax.bar(range(len(test_cases)), [test_case[3] for test_case in test_cases])
ax.bar_label(bars)
ax.set_xlabel("Test")
ax.set_ylabel("Path length")
ax.set_title("Path length of each test")
plt.show()

0 comments on commit 44dcce8

Please sign in to comment.