-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patha_star.py
79 lines (59 loc) · 2.79 KB
/
a_star.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# I M P O R T S & D E P E N D E N C I E S ----------------------
import heapq
from typing import Dict, Set, Tuple, Optional, Callable
# A* A L G O R I T H M -------------------------------------------
def a_star(graph, start_node: int, end_node: int, heuristic: Callable[[int, int], float], positions: Dict[int, Tuple[float, float]]) -> Tuple[Dict[int, float], Dict[int, Optional[int]], Set[int]]:
# Validate input nodes
graph_nodes = set(graph.nodes())
if start_node not in graph_nodes or end_node not in graph_nodes:
raise ValueError("Start or end node not in graph")
# Initialize data structures
g_score = {node: float('infinity') for node in graph_nodes}
f_score = {node: float('infinity') for node in graph_nodes}
g_score[start_node] = 0
f_score[start_node] = heuristic(start_node, end_node)
# Track previous nodes for path reconstruction
previous: Dict[int, Optional[int]] = {node: None for node in graph_nodes}
# Priority queue and visited tracking
open_set = []
heapq.heappush(open_set, (f_score[start_node], start_node))
# Keep track of nodes in open set to avoid duplicates
in_open_set = {start_node}
# Efficiently track visited nodes
visited: Set[int] = set()
while open_set:
# Get the node with lowest f_score
_, current_node = heapq.heappop(open_set)
in_open_set.remove(current_node)
# if same node don't calculate
if current_node == end_node:
break
# Mark node as visited
visited.add(current_node)
# Explore neighbors
for neighbor in graph.neighbors(current_node):
# Skip already visited nodes
if neighbor in visited:
continue
# Get edge weight using length from OSMMX
try:
edge_data = graph.get_edge_data(current_node, neighbor)[0]
edge_length = edge_data.get('length', 1) # in meters
except Exception as e:
print(f"Error getting edge data: {e}")
continue
# Calculate tentative g_score
tentative_g_score = g_score[current_node] + edge_length
# Potentially better path
if tentative_g_score < g_score[neighbor]:
# Update tracking
previous[neighbor] = current_node
g_score[neighbor] = tentative_g_score
# Calculate f_score
f_new = tentative_g_score + heuristic(neighbor, end_node)
f_score[neighbor] = f_new
# Add to open set if not already there
if neighbor not in in_open_set:
heapq.heappush(open_set, (f_new, neighbor))
in_open_set.add(neighbor)
return g_score, previous, visited