import heapq import math def length_haversine(p1, p2): lat1 = p1.lat lng1 = p1.lng lat2 = p2.lat lng2 = p2.lng lat1, lng1, lat2, lng2 = map(math.radians, [lat1, lng1, lat2, lng2]) dlat = lat2 - lat1 dlng = lng2 - lng1 a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlng / 2) ** 2 c = 2 * math.asin(math.sqrt(a)) return 6372797.560856 * c # return the distance in meters def neighbours_on_edge(p, steps=1): candidates = set() dy = dx = -steps while dx < steps: candidates.add((dx, dy)) dx += 1 dx = steps while dy <= steps: candidates.add((dx, dy)) dy += 1 dy = steps while dx >= -steps: candidates.add((dx, dy)) dx -= 1 dx = -steps while dy >= -steps: candidates.add((dx, dy)) dy -= 1 dy = -steps candidates = [(p[0] + dx, p[1] + dy) for dx, dy in candidates] candidates = sorted(list(candidates)) return candidates def neighbours_containing_nodes(nodes, grid, p): res = [] steps = 0 while not res: res = [sq for sq in neighbours_on_edge(p, steps) if sq in grid] steps += 1 return res def get_closest_in_square(nodes, node_ids, source_node): min_node = None min_value = None for node_id in node_ids: node = nodes[node_id] length = length_haversine(source_node, node) if min_node is None or length < min_value: min_node = node_id min_value = length return min_node, min_value def get_closest_node_id(nodes, grid, source_node): """ Find the closest node using a grid search and return its id. """ first_squares = neighbours_containing_nodes(nodes, grid, source_node.grid_tuple()) min_node = None min_value = None for sq in first_squares: for node_id in grid[sq]: node = nodes[node_id] length = length_haversine(source_node, node) if min_node is None or length < min_value: min_node = node_id min_value = length for sq in neighbours_on_edge(nodes[min_node].grid_tuple()): for node_id in grid[sq]: node = nodes[node_id] length = length_haversine(source_node, node) if length < min_value: min_node = node_id min_value = length return min_node def find_shortest_path(nodes, source_id, target_id): """ Return the shortest path using Dijkstra's algortihm. """ # queue contains multiple (walk_dist, (node_0, node_1, ... node_n))-tuples # where (node_0, node_1, ... node_n) is a walk to node_n # and walk_dist is the total length of the walk in meters queue = [(0, (source_id,))] visited = set() while queue: # consider an unchecked walk walk_dist, walk = heapq.heappop(queue) walk_end = walk[-1] if walk_end == target_id: # you have reached your destination return walk if walk_end in visited: # there exists a shorter walk to walk_end continue # otherwise this is the shortest walk to walk_end visited.add(walk_end) # consider all our neighbours for neighbour in nodes[walk_end].neighbours: if neighbour in visited: # there exists a shorter walk to neighbour continue # otherwise this MIGHT be the shortest walk to neighbour # so put it in the queue new_dist = walk_dist + length_haversine(nodes[walk_end], neighbour) new_walk = walk + (neighbour.id,) heapq.heappush(queue, (new_dist, new_walk)) # no path found return None