Click to expand the full ZhuEtal2017 code
.. code-block:: python
:linenos:
# Zhu et al. (2017) code
-----------------------------------------------------------
class ZhuEtal2017(Liquefaction):
"""
A map-based procedure to quantify liquefaction at a given location using logistic models by Zhu et al. (2017). Two models are provided:
1. For distance to coast < cutoff, **prob_liq** = f(**pgv**, **vs30**, **precip**, **dist_coast**, **dist_river**)
2. For distance to coast >= cutoff, **prob_liq** = f(**pgv**, **vs30**, **precip**, **dist_coast**, **dist_river**, **gw_depth**)
Parameters
----------
From upstream PBEE:
pgv: float, np.ndarray or list
[cm/s] peak ground velocity
mag: float, np.ndarray or list
moment magnitude
pga: float, np.ndarray or list
[g] peak ground acceleration, only to check threshold where prob_liq(pga<0.1g)=0
stations: list
a list of dict containing the site infomation. Keys in the dict are 'ID',
'lon', 'lat', 'vs30', 'z1pt0', 'z2pt5', 'vsInferred', 'rRup', 'rJB', 'rX'
Geotechnical/geologic:
vs30: float, np.ndarray or list
[m/s] time-averaged shear wave velocity in the upper 30-meters
precip: float, np.ndarray or list
[mm] mean annual precipitation
dist_coast: float, np.ndarray or list
[km] distance to nearest coast
dist_river: float, np.ndarray or list
[km] distance to nearest river
dist_water: float, np.ndarray or list
[km] distance to nearest river, lake, or coast
gw_depth: float, np.ndarray or list
[m] groundwater table depth
Fixed:
# dist_water_cutoff: float, optional
# [km] distance to water cutoff for switching between global and coastal model, default = 20 km
Returns
-------
prob_liq : float, np.ndarray
probability for liquefaciton
liq_susc_val : str, np.ndarray
liquefaction susceptibility category value
References
----------
.. [1] Zhu, J., Baise, L.G., and Thompson, E.M., 2017, An Updated Geospatial Liquefaction Model for Global Application, Bulletin of the Seismological Society of America, vol. 107, no. 3, pp. 1365-1385.
"""
def __init__(self, parameters, stations) -> None:
self.stations = stations
self.parameters = parameters
self.dist_to_water = None #(km)
self.dist_to_river = None #(km)
self.dist_to_coast = None #(km)
self.gw_depth = None #(m)
self.precip = None # (mm)
self.vs30 = None #(m/s)
self.interpolate_spatial_parameters(parameters)
def interpolate_spatial_parameters(self, parameters):
# site coordinate in CRS 4326
lat_station = [site['lat'] for site in self.stations]
lon_station = [site['lon'] for site in self.stations]
# dist_to_water
if parameters["DistWater"] == "Defined (\"distWater\") in Site File (.csv)":
self.dist_to_water = np.array([site['distWater'] for site in self.stations])
else:
self.dist_to_water = sampleRaster(parameters["DistWater"], parameters["inputCRS"],\
lon_station, lat_station)
# dist_to_river
if parameters["DistRiver"] == "Defined (\"distRiver\") in Site File (.csv)":
self.dist_to_river = np.array([site['distRiver'] for site in self.stations])
else:
self.dist_to_river = sampleRaster(parameters["DistRiver"], parameters["inputCRS"],\
lon_station, lat_station)
# dist_to_coast
if parameters["DistCoast"] == "Defined (\"distCoast\") in Site File (.csv)":
self.dist_to_coast = np.array([site['distCoast'] for site in self.stations])
else:
self.dist_to_coast = sampleRaster(parameters["DistCoast"], parameters["inputCRS"],\
lon_station, lat_station)
# gw_water
if parameters["GwDepth"] == "Defined (\"gwDepth\") in Site File (.csv)":
self.gw_depth = np.array([site['gwDepth'] for site in self.stations])
else:
self.gw_depth = sampleRaster(parameters["GwDepth"], parameters["inputCRS"],\
lon_station, lat_station)
# precipitation
if parameters["Precipitation"] == "Defined (\"precipitation\") in Site File (.csv)":
self.precip = np.array([site['precipitation'] for site in self.stations])
else:
self.precip = sampleRaster(parameters["Precipitation"], parameters["inputCRS"],\
lon_station, lat_station)
self.vs30 = np.array([site['vs30'] for site in self.stations])
print("Sampling finished")
def run(self, ln_im_data, eq_data, im_list, output_keys, additional_output_keys):
if ('PGA' in im_list) and ('PGV' in im_list):
num_stations = len(self.stations)
num_scenarios = len(eq_data)
PGV_col_id = [i for i, x in enumerate(im_list) if x == 'PGV'][0]
PGA_col_id = [i for i, x in enumerate(im_list) if x == 'PGA'][0]
for scenario_id in range(num_scenarios):
num_rlzs = ln_im_data[scenario_id].shape[2]
im_data_scen = np.zeros([num_stations,\
len(im_list)+len(output_keys), num_rlzs])
im_data_scen[:,0:len(im_list),:] = ln_im_data[scenario_id]
for rlz_id in range(num_rlzs):
pgv = np.exp(ln_im_data[scenario_id][:,PGV_col_id,rlz_id])
pga = np.exp(ln_im_data[scenario_id][:,PGA_col_id,rlz_id])
mag = float(eq_data[scenario_id][0])
model_output = self.model(pgv, pga, mag)
for i, key in enumerate(output_keys):
im_data_scen[:,len(im_list)+i,rlz_id] = model_output[key]
ln_im_data[scenario_id] = im_data_scen
im_list = im_list + output_keys
additional_output = dict()
for key in additional_output_keys:
item = getattr(self, key, None)
if item is None:
warnings.warn(f"Additional output {key} is not avaliable in the liquefaction trigging model 'ZhuEtal2017'.")
else:
additional_output.update({key:item})
else:
sys.exit(f"At least one of 'PGA' and 'PGV' is missing in the selected intensity measures and the liquefaction trigging model 'ZhuEtal2017' can not be computed.")
# print(f"At least one of 'PGA' and 'PGV' is missing in the selected intensity measures and the liquefaction trigging model 'ZhuEtal2017' can not be computed."\
# , file=sys.stderr)
# sys.stderr.write("test")
# sys.exit(-1)
return ln_im_data, eq_data, im_list, additional_output
def model(self, pgv, pga, mag):
"""Model"""
# zero prob_liq
zero_prob_liq = 1e-5 # decimal
# distance cutoff for model
model_transition = 20 # km
# initialize arrays
x_logistic = np.empty(pgv.shape)
prob_liq = np.empty(pgv.shape)
liq_susc_val = np.ones(pgv.shape)*-99
liq_susc = np.empty(pgv.shape, dtype=int)
# magnitude correction, from Baise & Rashidian (2020) and Allstadt et al. (2022)
pgv_mag = pgv/(1+np.exp(-2*(mag-6)))
pga_mag = pga/(10**2.24/mag**2.56)
# find where dist_water <= cutoff for model of 20 km
# coastal model
ind_coastal = self.dist_to_water<=model_transition
# global model
# ind_global = list(set(list(range(pgv.shape[0]))).difference(set(ind_coastal)))
ind_global = ~(self.dist_to_water<=model_transition)
# set cap of precip to 1700 mm
self.precip[self.precip>1700] = 1700
# x = b0 + b1*var1 + ...
# if len(ind_global) > 0:
# liquefaction susceptbility value, disregard pgv term
liq_susc_val[ind_global] = \
8.801 + \
-1.918 * np.log(self.vs30[ind_global]) + \
5.408e-4 * self.precip[ind_global] + \
-0.2054 * self.dist_to_water[ind_global] + \
-0.0333 * self.gw_depth[ind_global]
# liquefaction susceptbility value, disregard pgv term
liq_susc_val[ind_coastal] = \
12.435 + \
-2.615 * np.log(self.vs30[ind_coastal]) + \
5.556e-4 * self.precip[ind_coastal] + \
-0.0287 * np.sqrt(self.dist_to_coast[ind_coastal]) + \
0.0666 * self.dist_to_river[ind_coastal] + \
-0.0369 * self.dist_to_river[ind_coastal]*np.sqrt(self.dist_to_coast[ind_coastal])
# catch nan values
liq_susc_val[np.isnan(liq_susc_val)] = -99.
# x-term for logistic model = liq susc val + pgv term
x_logistic[ind_global] = liq_susc_val[ind_global] + 0.334*np.log(pgv_mag[ind_global])
# x-term for logistic model = liq susc val + pgv term
x_logistic[ind_coastal] = liq_susc_val[ind_coastal] + 0.301*np.log(pgv_mag[ind_coastal])
# probability of liquefaction
prob_liq = 1/(1+np.exp(-x_logistic)) # decimal
prob_liq = np.maximum(prob_liq,zero_prob_liq) # set prob to > "0" to avoid 0% in log
# for pgv_mag < 3 cm/s, set prob to "0"
prob_liq[pgv_mag<3] = zero_prob_liq
# for pga_mag < 0.1 g, set prob to "0"
prob_liq[pga_mag<0.1] = zero_prob_liq
# for vs30 > 620 m/s, set prob to "0"
prob_liq[self.vs30>620] = zero_prob_liq
# calculate sigma_mu
sigma_mu = (np.exp(0.25)-1) * prob_liq
# determine liquefaction susceptibility category
liq_susc[liq_susc_val>-1.15] = liq_susc_enum['very_high'].value
liq_susc[liq_susc_val<=-1.15] = liq_susc_enum['high'].value
liq_susc[liq_susc_val<=-1.95] = liq_susc_enum['moderate'].value
liq_susc[liq_susc_val<=-3.15] = liq_susc_enum['low'].value
liq_susc[liq_susc_val<=-3.20] = liq_susc_enum['very_low'].value
liq_susc[liq_susc_val<=-38.1] = liq_susc_enum['none'].value
# liq_susc[prob_liq==zero_prob_liq] = 'none'
return {"liq_prob":prob_liq, "liq_susc":liq_susc}
.. raw:: html
.. raw:: html
Click to expand the full Sanger2024 code
.. code-block:: python
:linenos:
# Sanger et al. (2024) code
-----------------------------------------------------------
class ZhuEtal2017(Liquefaction):
"""
A map-based procedure to quantify liquefaction at a given location Sanger et al. (2024).
Parameters
----------
From upstream PBEE:
mag: float, np.ndarray or list
moment magnitude
pga: float, np.ndarray or list
[g] peak ground acceleration, only to check threshold where prob_liq(pga<0.1g)=0
stations: list
a list of dict containing the site infomation. Keys in the dict are 'ID',
'lon', 'lat', 'vs30', 'z1pt0', 'z2pt5', 'vsInferred', 'rRup', 'rJB', 'rX'
Geotechnical:
LPI A: float, np.ndarray or list
LPI B: float, np.ndarray or list
Returns
-------
prob_liq : float, np.ndarray
probability for liquefaciton (surface manifestation)
"""
def __init__(self, parameters, stations) -> None:
self.stations = stations
self.parameters = parameters
self.LPI_A = None #
self.LPI_B = None #
self.interpolate_spatial_parameters(parameters)
def interpolate_spatial_parameters(self, parameters):
# site coordinate in CRS 4326
lat_station = [site['lat'] for site in self.stations]
lon_station = [site['lon'] for site in self.stations]
# LPI_A
self.LPI_A = sampleRaster(parameters["DistWater"], parameters["inputCRS"],\
lon_station, lat_station)
# LPI_B
self.LPI_B = sampleRaster(parameters["DistCoast"], parameters["inputCRS"],\
lon_station, lat_station)
print("Sampling finished")
def run(self, ln_im_data, eq_data, im_list, output_keys, additional_output_keys):
if ('PGA' in im_list):
num_stations = len(self.stations)
num_scenarios = len(eq_data)
PGA_col_id = [i for i, x in enumerate(im_list) if x == 'PGA'][0]
for scenario_id in range(num_scenarios):
num_rlzs = ln_im_data[scenario_id].shape[2]
im_data_scen = np.zeros([num_stations,\
len(im_list)+len(output_keys), num_rlzs])
im_data_scen[:,0:len(im_list),:] = ln_im_data[scenario_id]
for rlz_id in range(num_rlzs):
pga = np.exp(ln_im_data[scenario_id][:,PGA_col_id,rlz_id])
mag = float(eq_data[scenario_id][0])
model_output = self.model(pga, mag)
for i, key in enumerate(output_keys):
im_data_scen[:,len(im_list)+i,rlz_id] = model_output[key]
ln_im_data[scenario_id] = im_data_scen
im_list = im_list + output_keys
additional_output = dict()
for key in additional_output_keys:
item = getattr(self, key, None)
if item is None:
warnings.warn(f"Additional output {key} is not avaliable in the liquefaction trigging model 'SangerEtal2024'.")
else:
additional_output.update({key:item})
else:
sys.exit(f"'PGA' is missing in the selected intensity measures and the liquefaction trigging model 'SangerEtal2024' can not be computed.")
return ln_im_data, eq_data, im_list, additional_output
def model(self, pga, mag):
"""Model"""
# magnitude correction, according to MSF Correction (SAND) Function according to Idriss and Boulanger (2008)
MSF = 6.9*np.exp(-mag/4) - 0.058
if MSF > 1.8:
MSF = 1.8
pga_mag = pga / MSF
# Geospatial LPI A and LPI B
# Initialize an array for the calculated MI
LPI = np.zeros_like(pga_mag)
# Calculate MI for each element of the arrays
mask_low = pga_mag < 0.1
mask_high = pga_mag >= 0.1
LPI[mask_low] = 0
LPI[mask_high] = self.LPI_A[mask_high] * np.arctan(self.LPI_B[mask_high] * (pga_mag[mask_high] - self.LPI_A[mask_high] / self.LPI_B[mask_high]) ** 2) * 100
from scipy.stats import norm
# Probability of liquefaction (manifestation at the surface) according to Geyin et al. (2020) (minor manifestation)
LPI_beta= 1.774
LPI_theta= 4.095
prob_liq = norm.cdf(np.log(LPI/LPI_theta)/LPI_beta)
return {"liq_prob":prob_liq, "liq_susc":LPI}
.. raw:: html
.. raw:: html