Skip to content


Add shortest path and pipe by pipe
Browse files Browse the repository at this point in the history
-and testing and test data
  • Loading branch information
Dobson committed Feb 1, 2024
1 parent e45fd74 commit c7075fa
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 7 deletions.
263 changes: 262 additions & 1 deletion swmmanywhere/
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import json
import tempfile
from heapq import heappop, heappush
from pathlib import Path
from typing import Callable
from typing import Callable, Hashable

import geopandas as gpd
import networkx as nx
import numpy as np
import osmnx as ox
import pandas as pd
from shapely import geometry as sgeom
from tqdm import tqdm

from swmmanywhere import geospatial_utilities as go
from swmmanywhere import parameters
Expand Down Expand Up @@ -162,6 +165,10 @@ def double_directed(G: nx.Graph, **kwargs):
G (nx.Graph): A graph
#TODO the geometry is left as is currently - should be reveresed, however
# in original osmnx geometry there are some incorrectly directed ones
# someone with more patience might check start and end Points to check
# which direction the line should be going in...
G_new = G.copy()
for u, v, data in G.edges(data=True):
if (v, u) not in G.edges:
Expand Down Expand Up @@ -315,6 +322,9 @@ def calculate_contributing_area(G: nx.Graph,
Adds the edge attributes:
- 'contributing_area' (float)
Adds the node attributes:
- 'contributing_area' (float)
G (nx.Graph): A graph
subcatchment_derivation (parameters.SubcatchmentDerivation): A
Expand Down Expand Up @@ -350,13 +360,18 @@ def calculate_contributing_area(G: nx.Graph,

# Assign contributing area
imperv_lookup = subs_rc.set_index('id').impervious_area.to_dict()
nx.set_node_attributes(G, imperv_lookup, 'contributing_area')
for u, d in G.nodes(data=True):
if 'contributing_area' not in d.keys():
d['contributing_area'] = 0.0
for u,v,d in G.edges(data=True):
if u in imperv_lookup.keys():
d['contributing_area'] = imperv_lookup[u]
d['contributing_area'] = 0.0
return G

def set_elevation(G: nx.Graph,
addresses: parameters.Addresses,
**kwargs) -> nx.Graph:
Expand Down Expand Up @@ -390,6 +405,7 @@ def set_elevation(G: nx.Graph,
nx.set_node_attributes(G, elevations_dict, 'elevation')
return G

def set_surface_slope(G: nx.Graph,
**kwargs) -> nx.Graph:
"""Set the surface slope for each edge.
Expand All @@ -416,6 +432,7 @@ def set_surface_slope(G: nx.Graph,
d['surface_slope'] = slope
return G

def set_chahinan_angle(G: nx.Graph,
**kwargs) -> nx.Graph:
"""Set the Chahinan angle for each edge.
Expand Down Expand Up @@ -459,6 +476,7 @@ def set_chahinan_angle(G: nx.Graph,
d['chahinan_angle'] = min_weight
return G

def calculate_weights(G: nx.Graph,
topo_derivation: parameters.TopologyDerivation,
**kwargs) -> nx.Graph:
Expand Down Expand Up @@ -508,6 +526,7 @@ def calculate_weights(G: nx.Graph,
d['weight'] = total_weight
return G

def identify_outlets(G: nx.Graph,
outlet_derivation: parameters.OutletDerivation,
**kwargs) -> nx.Graph:
Expand Down Expand Up @@ -595,4 +614,246 @@ def identify_outlets(G: nx.Graph,
if (d['edge_type'] == 'outlet') & (v != 'waste'):

return G

def derive_topology(G: nx.Graph,
**kwargs) -> nx.Graph:
"""Derive the topology of a graph.
Runs a djiikstra-based algorithm to identify the shortest path from each
node to its nearest outlet (weighted by the 'weight' edge value). The
returned graph is one that only contains the edges that feature on the
shortest paths.
Requires a graph with edges that have:
- 'edge_type' ('river' or 'street')
- 'weight' (float)
Adds the node attributes:
- 'outlet' (str)
- 'shortest_path' (float)
G (nx.Graph): A graph
**kwargs: Additional keyword arguments are ignored.
G (nx.Graph): A graph
G = G.copy()

# Identify outlets
outlets = [u for u,v,d in G.edges(data=True) if d['edge_type'] == 'outlet']

# Remove non-street edges/nodes and unconnected nodes
nodes_to_remove = []
for u, v, d in G.edges(data=True):
if d['edge_type'] != 'street':
if d['edge_type'] == 'outlet':

isolated_nodes = list(nx.isolates(G))

for u in set(nodes_to_remove).union(isolated_nodes):

# Initialize the dictionary with infinity for all nodes
shortest_paths = {node: float('inf') for node in G.nodes}

# Initialize the dictionary to store the paths
paths: dict[Hashable,list] = {node: [] for node in G.nodes}

# Set the shortest path length to 0 for outlets
for outlet in outlets:
shortest_paths[outlet] = 0
paths[outlet] = [outlet]

# Initialize a min-heap with (distance, node) tuples
heap = [(0, outlet) for outlet in outlets]
while heap:
# Pop the node with the smallest distance
dist, node = heappop(heap)

# For each neighbor of the current node
for neighbor, edge_data in G[node].items():
# Calculate the distance through the current node
alt_dist = dist + edge_data[0]['weight']
# If the alternative distance is shorter

if alt_dist < shortest_paths[neighbor]:
# Update the shortest path length
shortest_paths[neighbor] = alt_dist
# Update the path
paths[neighbor] = paths[node] + [neighbor]
# Push the neighbor to the heap
heappush(heap, (alt_dist, neighbor))

edges_to_keep = set()
for path in paths.values():
# Assign outlet
outlet = path[0]
for node in path:
G.nodes[node]['outlet'] = outlet
G.nodes[node]['shortest_path'] = shortest_paths[node]

# Store path
for i in range(len(path) - 1):
edges_to_keep.add((path[i+1], path[i]))

# Remvoe edges not on paths
new_graph = G.copy()
for u,v in G.edges():
if (u,v) not in edges_to_keep:
return new_graph

def pipe_by_pipe(G: nx.Graph,
pipe_design: parameters.HydraulicDesign,
"""Pipe by pipe hydraulic design.
Starting from the most upstream node, design a pipe to the downstream node
specifying a diameter and downstream invert level. A range of diameters and
invert levels are tested (ranging between conditions defined in
pipe_design). From the tested diameters/inverts, a selection is made based
on each pipe's satisfying feasibility constraints on: surcharge velocity,
filling ratio, (and shear stress - not currently implemented). Prioritising
feasibility in this order it identifies pipes with the preferred feasibility
level. If multiple pipes are feasible, it picks the lowest cost pipe. Once
the feasible pipe is identified, the diameter and downstream invert are set
and then the next downstream pipe can be designed.
This approach is based on the pipe-by-pipe design proposed in:
Requires a graph with edges that have:
- 'length' (float)
- 'elevation' (float)
Requires a graph with nodes that have:
- 'contributing_area' (float)
- 'elevation' (float)
Adds the edge attributes:
- 'diameter' (float)
Adds the node attributes:
- 'chamber_floor_elevation' (float)
G (nx.Graph): A graph
pipe_design (parameters.HydraulicDesign): A HydraulicDesign parameter
**kwargs: Additional keyword arguments are ignored.
G (nx.Graph): A graph
# TODO obviously needs a refactor and better testing

G = G.copy()
surface_elevations = {n : d['elevation'] for n, d in G.nodes(data=True)}
topological_order = list(nx.topological_sort(G))
chamber_floor = {}
edge_diams = {}
# Iterate over nodes in topological order
for node in tqdm(topological_order):
# Check if there's any nodes upstream, if not set the depth to min_depth
if len(nx.ancestors(G,node)) == 0:
chamber_floor[node] = surface_elevations[node] - pipe_design.min_depth
for ix, ds_node in enumerate(G.successors(node)):
edge = G.get_edge_data(node,ds_node,0)
# Find contributing area with ancestors
# TODO - could do timearea here if i hated myself enough
anc = nx.ancestors(G,node).union([node])
tot = sum([G.nodes[anc_node]['contributing_area'] for anc_node in anc])

M3_PER_HR_TO_M3_PER_S = 1 / 60 / 60
Q = tot * pipe_design.precipitation * M3_PER_HR_TO_M3_PER_S

# Within all allowable slopes and diams, calculate the parameters
# of all feasible pipes
feasible_pipes = []
for diam in pipe_design.diameters:
A = (np.pi * diam ** 2 / 4)
n = 0.012 # mannings n
R = A / (np.pi * diam) # hydraulic radius
for depth in np.linspace(pipe_design.min_depth,
# TODO... presumably need to check depth > (diam + min_depth)

elev_diff = chamber_floor[node] - \
(surface_elevations[ds_node] - depth)
slope = elev_diff / edge['length']
# Always pick a pipe that is feasible without surcharging
# if available
surcharge_feasibility = 0.0
# Use surcharged elevation
while slope <= 0:
surcharge_feasibility += 0.05
slope = (chamber_floor[node] + surcharge_feasibility - \
(surface_elevations[ds_node] - depth)) / edge['length']
# TODO could make the feasibility penalisation increase
# when you get above surface_elevation[node]... but
# then you'd need a feasibility tracker and an offset
# tracker
v = (slope ** 0.5) * (R ** (2/3)) / n
filling_ratio = Q / (v * A)
# buffers from:
average_depth = (depth + chamber_floor[node]) / 2
V = edge['length'] * (diam + 0.3) * (average_depth + 0.1)
cost = 1.32 / 2000 * (9579.31 * diam ** 0.5737 + 1153.77 * V**1.31)
v_feasibility = max(pipe_design.min_v - v, 0) + \
max(v - pipe_design.max_v, 0)
fr_feasibility = max(filling_ratio - pipe_design.max_fr, 0)
TODO shear stress... got confused here
density = 1000
dyn_visc = 0.001
hydraulic_diameter = 4 * (A * filling_ratio**2) / \
(np.pi * diam * filling_ratio)
Re = density * v * 2 * (diam / 4) * (filling_ratio ** 2) / dyn_visc
fd = 64 / Re
shear_stress = fd * density * v**2 / fd
shear_feasibility = max(min_shear - shear_stress, 0)
slope = (chamber_floor[node] - (surface_elevations[ds_node] -\
depth)) / edge['length']
feasible_pipes.append({'diam' : diam,
'depth' : depth,
'slope' : slope,
'v' : v,
'fr' : filling_ratio,
# 'tau' : shear_stress,
'cost' : cost,
'v_feasibility' : v_feasibility,
'fr_feasibility' : fr_feasibility,
'surcharge_feasibility' : surcharge_feasibility,
# 'shear_feasibility' : shear_feasibility
feasible_pipes_df = pd.DataFrame(feasible_pipes).dropna()
if feasible_pipes_df.shape[0] > 0:
ideal_pipe = feasible_pipes_df.sort_values(by=['surcharge_feasibility',
# 'shear_feasibility',
ascending = True).iloc[0]
edge_diams[(node,ds_node,0)] = ideal_pipe.diam
chamber_floor[ds_node] = surface_elevations[ds_node] - ideal_pipe.depth
print('something odd - no non nan pipes')
if ix > 0:
print('''a node has multiple successors,
not sure how that can happen if using shortest path
to derive topology''')
nx.function.set_edge_attributes(G, edge_diams, "diameter")
nx.function.set_node_attributes(G, chamber_floor, "chamber_floor_elevation")
return G
46 changes: 45 additions & 1 deletion swmmanywhere/
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from pathlib import Path

import numpy as np
from pydantic import BaseModel, Field, model_validator

Expand Down Expand Up @@ -130,7 +131,50 @@ class NewTopo(TopologyDerivation):
min_items = 1,
unit = "-",
description = "Weights for topo derivation")

class HydraulicDesign(BaseModel):
"""Parameters for hydraulic design."""
diameters: list = Field(default = np.linspace(0.15,3,int((3-0.15)/0.075) + 1),
min_items = 1,
unit = "m",
description = """Diameters to consider in
pipe by pipe method""")
max_fr: float = Field(default = 0.8,
upper_limit = 1,
lower_limit = 0,
unit = "-",
description = "Maximum filling ratio in pipe by pipe method")
min_shear: float = Field(default = 2,
upper_limit = 3,
lower_limit = 0,
unit = "Pa",
description = "Minimum wall shear stress in pipe by pipe method")
min_v: float = Field(default = 0.75,
upper_limit = 2,
lower_limit = 0,
unit = "m/s",
description = "Minimum velocity in pipe by pipe method")
max_v: float = Field(default = 5,
upper_limit = 10,
lower_limit = 3,
unit = "m/s",
description = "Maximum velocity in pipe by pipe method")
min_depth: float = Field(default = 0.5,
upper_limit = 1,
lower_limit = 0,
unit = "m",
description = "Minimum excavation depth in pipe by pipe method")
max_depth: float = Field(default = 5,
upper_limit = 10,
lower_limit = 2,
unit = "m",
description = "Maximum excavation depth in pipe by pipe method")
precipitation: float = Field(default = 0.006,
upper_limit = 0.010,
lower_limit = 0.001,
description = "Depth of design storm in pipe by pipe method",
unit = "m")

class Addresses:
"""Parameters for address lookup.
Expand Down

0 comments on commit c7075fa

Please sign in to comment.