import heapq import math import store def length_haversine(p1, p2): """ The distance in meters between two coordinates on a sphere. Assumes p1=(lat1, lng1) and p2=(lat2, lng2) in degrees. """ lat1 = p1.lat lng1 = p1.lng lat2 = p2.lat lng2 = p2.lng lat1, lng1, lat2, lng2 = map(math.radians, [lat1, lng1, lat2, lng2]) dlat = lat2 - lat1 dlng = lng2 - lng1 a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlng / 2) ** 2 c = 2 * math.asin(math.sqrt(a)) return 6372797.560856 * c # return the distance in meters def grid_search(grid, lat, lng): """ Finds closest node to source node by comparing distance to nodes within nearest tiles in the grid. """ source_node = store.Node(-1, lat, lng) source_key = ( int(round(source_node.lat, 3) * 1000), int(round(source_node.lng, 3) * 1000), ) closest_tiles = find_nodes(grid, source_key) closest_nodes = [get_closest_node(tile_nodes, source_node) for tile_nodes in closest_tiles] closest_node_id = get_closest_node(closest_nodes, source_node).id return closest_node_id def find_nodes(grid, source_key, offset=0): """ Searches a grid in an outward "spiral" from source node fetching all tiles which contain nodes. """ tiles = None # search further out until we find nodes while not tiles: tiles = look_for_nodes(grid, source_key, offset) offset += 1 # include another layer since they might contain closer nodes return tiles + look_for_nodes(grid, source_key, offset + 1) def look_for_nodes(grid, source_key, offset): """Search for nearest tile containing nodes in a spiral.""" tiles = [] for i in range(-offset, offset + 1): for j in range(-offset, offset + 1): if i in (-offset, offset) or j in (-offset, offset): key = (source_key[0] + j, source_key[1] + i) if key in grid.keys(): tiles.append(grid[key]) return tiles def get_closest_node(nodes, source_node): """ Search through all nodes in a specified grid and return the node closest to source_node. """ min_node = None min_value = None for node in nodes: length = length_haversine(source_node, node) if min_node is None or length < min_value: min_node = node min_value = length return min_node def find_shortest_path(nodes, source_id, target_id): """ Return the shortest path using Dijkstra's algortihm. """ # queue contains multiple (walk_dist, (node_0, node_1, ... node_n))-tuples # where (node_0, node_1, ... node_n) is a walk to node_n # and walk_dist is the total length of the walk in meters queue = [(0, (source_id,))] visited = set() while queue: # consider an unchecked walk walk_dist, walk = heapq.heappop(queue) walk_end = walk[-1] if walk_end == target_id: # you have reached your destination return walk if walk_end in visited: # there exists a shorter walk to walk_end continue # otherwise this is the shortest walk to walk_end visited.add(walk_end) # consider all our neighbours for neighbour in nodes[walk_end].neighbours: if neighbour in visited: # there exists a shorter walk to neighbour continue # otherwise this MIGHT be the shortest walk to neighbour # so put it in the queue new_dist = walk_dist + length_haversine(nodes[walk_end], neighbour) new_walk = walk + (neighbour.id,) heapq.heappush(queue, (new_dist, new_walk)) # no path found return None