1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
import colorsys
import heapq
import math
def length_haversine(p1, p2):
lat1 = p1.lat
lng1 = p1.lng
lat2 = p2.lat
lng2 = p2.lng
lat1, lng1, lat2, lng2 = map(math.radians, [lat1, lng1, lat2, lng2])
dlat = lat2 - lat1
dlng = lng2 - lng1
a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlng / 2) ** 2
c = 2 * math.asin(math.sqrt(a))
return 6372797.560856 * c # return the distance in meters
def chunks(lst, n):
""" Split lst into chunks of size n.
The final chunk will have size equal to len(lst) % n
"""
for i in range(0, len(lst), n):
yield lst[i:min(i+n, len(lst))]
def lerp_color(start_color, end_color, t):
""" Lerp between two colors by a factor t.
Colors are assumed to be in the form "#RRGGBB"
"""
def lerp(a, b, t):
return a + (b - a) * t
start = [int(c, 16)/255 for c in chunks(start_color[1:], 2)]
end = [int(c, 16)/255 for c in chunks(end_color[1:], 2)]
start = colorsys.rgb_to_hsv(*start)
end = colorsys.rgb_to_hsv(*end)
color = [lerp(c1, c2, t) for (c1, c2) in zip(start, end)]
color = colorsys.hsv_to_rgb(*color)
return "#{:02x}{:02x}{:02x}".format(
int(color[0] * 255),
int(color[1] * 255),
int(color[2] * 255),
)
def get_closest_node_id(nodes, source_node):
""" Search through all nodes and return the id of the node
that is closest to 'source_node'. """
min_node = None
min_value = None
for node_id, node in nodes.items():
length = length_haversine(source_node, node)
if min_node is None or length < min_value:
min_node = node_id
min_value = length
return min_node
def find_shortest_path(nodes, source_id, target_id):
""" Return the shortest path using Dijkstra's algorithm, as well as
the number of iterations it took."""
# Queue contains multiple tuples of the form
# (walk_dist, ((node_0, iterations_0), ... (node_n, iterations_n)))
# where (node_0, node_1, ... node_n) is a walk to node_n
# and walk_dist is the total length of the walk in meters.
queue = [(0, (source_id,))]
# Iterations is the number of iterations the algorithm took before finding
# a node and is used for coloring.
iterations = 0
# For each node, visited contains the final edge of the shortest path to it
# and the number of iterations it took to get there.
visited = {}
while queue:
# consider an unchecked walk
walk_dist, walk = heapq.heappop(queue)
iterations += 1
walk_end = walk[-1]
if walk_end == target_id:
# you have reached your destination
return walk, iterations, visited
if walk_end in visited:
# there exists a shorter walk to walk_end
continue
# otherwise this is the shortest walk to walk_end
if len(walk) == 1:
visited[walk_end] = (walk[-1], walk[-1], iterations)
else:
visited[walk_end] = (walk[-2], walk[-1], iterations)
# consider all our neighbours
for neighbour in nodes[walk_end].neighbours:
if neighbour in visited:
# there exists a shorter walk to neighbour
continue
# otherwise this MIGHT be the shortest walk to neighbour
# so put it in the queue
new_dist = walk_dist + length_haversine(nodes[walk_end], neighbour)
new_walk = walk + (neighbour.id,)
heapq.heappush(queue, (new_dist, new_walk))
# no path found
return None
|