import colorsys import heapq import math def length_haversine(p1, p2): lat1 = p1.lat lng1 = p1.lng lat2 = p2.lat lng2 = p2.lng lat1, lng1, lat2, lng2 = map(math.radians, [lat1, lng1, lat2, lng2]) dlat = lat2 - lat1 dlng = lng2 - lng1 a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlng / 2) ** 2 c = 2 * math.asin(math.sqrt(a)) return 6372797.560856 * c # return the distance in meters def chunks(lst, n): """ Split lst into chunks of size n. The final chunk will have size equal to len(lst) % n """ for i in range(0, len(lst), n): yield lst[i:min(i+n, len(lst))] def lerp_color(start_color, end_color, t): """ Linearly interpolate between two colors by a factor t. Colors are assumed to be in the form "#RRGGBB" """ def lerp(a, b, t): return a + (b - a) * t start = [int(c, 16)/255 for c in chunks(start_color[1:], 2)] end = [int(c, 16)/255 for c in chunks(end_color[1:], 2)] start = colorsys.rgb_to_hsv(*start) end = colorsys.rgb_to_hsv(*end) color = [lerp(c1, c2, t) for (c1, c2) in zip(start, end)] color = colorsys.hsv_to_rgb(*color) return "#{:02x}{:02x}{:02x}".format( int(color[0] * 255), int(color[1] * 255), int(color[2] * 255), ) def get_closest_node_id(nodes, source_node): """ Search through all nodes and return the id of the node that is closest to 'source_node'. """ min_node = None min_value = None for node_id, node in nodes.items(): length = length_haversine(source_node, node) if min_node is None or length < min_value: min_node = node_id min_value = length return min_node def find_shortest_path(nodes, source_id, target_id): """ Return the shortest path using Dijkstra's algorithm, as well as the number of iterations it took.""" # Queue contains multiple tuples of the form # (walk_dist, ((node_0, iterations_0), ... (node_n, iterations_n))) # where (node_0, node_1, ... node_n) is a walk to node_n # and walk_dist is the total length of the walk in meters. queue = [(0, (source_id,))] # Iterations is the number of iterations the algorithm took before finding # a node and is used for coloring. iterations = 0 # For each node, visited contains the final edge of the shortest path to it # and the number of iterations it took to get there. visited = {} while queue: # consider an unchecked walk walk_dist, walk = heapq.heappop(queue) iterations += 1 walk_end = walk[-1] if walk_end == target_id: # you have reached your destination return walk, iterations, visited if walk_end in visited: # there exists a shorter walk to walk_end continue # otherwise this is the shortest walk to walk_end if len(walk) == 1: visited[walk_end] = (walk[-1], walk[-1], iterations) else: visited[walk_end] = (walk[-2], walk[-1], iterations) # consider all our neighbours for neighbour in nodes[walk_end].neighbours: if neighbour in visited: # there exists a shorter walk to neighbour continue # otherwise this MIGHT be the shortest walk to neighbour # so put it in the queue new_dist = walk_dist + length_haversine(nodes[walk_end], neighbour) new_walk = walk + (neighbour.id,) heapq.heappush(queue, (new_dist, new_walk)) # no path found return None