Module pyearthquake
Introduction
pyearthquake
is a light-weight module with basic earthquake engineering utilities.
The aim of this module is to help engineers play with earthquakes by python programming.
Features
- Convert .AT2 file downloaded from PEER database to nicely aligned form.
- Select and match ground motions to the target spectrum with Monte Carlo simulation and Least Square optimization.
- Beautify the ground motions by truncating, trimming, changing dt, and renaming.
- Store a suite of motions in a single file or separate files.
- Generate displacement spectrums and pesudo acceleration spectrums.
- Inspect a suite or a motion with nice html interactive plots, thanks to plotly.
- Create Code Spectrums. Currently support Chinese Code.
Fact Sheet
Version: 1.0
Author: Hanlin Dong
License: MIT
Last Update: 2020-04-08
Content Briefing
Currently, it has three main classes: Spectrum
, Motion
, and Suite
Spectrum
: a class storing seismic spectrum.
Some static methods are created confining to Code Spectrums.
Motion
: a class storing acceleration timehistory.
Motion has utility functions to nicely parse PEER motion file.
Motion also generates spectrums.
Suite
: a class storing a suite of motions and their target spectrum.
Suite has utilities to load a PEER file folder.
Suite has optimization functions to select a bunch of earthquakes out of a large number of earthquakes
with Monte Carlo simulation and bounded least square optimization.
The advantage is that the shape of the accels is not changed unlike wavelet methods,
but the average match result is still good.
Suite can write all the acceleration files seperately or integrally.
Suite can plot all the accelerations and spectrums interactively for users to inspect.
Suite can generate .tcl file to be used in OpenSees.
All the classes mentioned above can be stored as string file. Currently they are json files with ext: .spectrum, .motion, .suite. These files can be read by static methods in every class.
Change log:
2020-04-08 13:53:32 v1.0
- Join the three classes to one single file, refactor some functions.
- Unify some methods, and create good documentation.
- Add plotly interactive plots.
Expand source code
'''
Introduction
============
`pyearthquake` is a light-weight module with basic earthquake engineering utilities.
The aim of this module is to help engineers play with earthquakes by python programming.
*Features*
--------
1. Convert .AT2 file downloaded from PEER database to nicely aligned form.
2. Select and match ground motions to the target spectrum with Monte Carlo simulation and Least Square optimization.
3. Beautify the ground motions by truncating, trimming, changing dt, and renaming.
4. Store a suite of motions in a single file or separate files.
5. Generate displacement spectrums and pesudo acceleration spectrums.
6. Inspect a suite or a motion with nice html interactive plots, thanks to [plotly](https://plot.ly).
7. Create Code Spectrums. Currently support Chinese Code.
Fact Sheet
----------
Version: 1.0
Author: [Hanlin Dong](http://www.hanlindong.com)
License: MIT
Last Update: 2020-04-08
Content Briefing
----------------
Currently, it has three main classes: `Spectrum`, `Motion`, and `Suite`
`Spectrum` : a class storing seismic spectrum.
Some static methods are created confining to Code Spectrums.
`Motion` : a class storing acceleration timehistory.
Motion has utility functions to nicely parse PEER motion file.
Motion also generates spectrums.
`Suite` : a class storing a suite of motions and their target spectrum.
Suite has utilities to load a PEER file folder.
Suite has optimization functions to select a bunch of earthquakes out of a large number of earthquakes
with Monte Carlo simulation and bounded least square optimization.
The advantage is that the shape of the accels is not changed unlike wavelet methods,
but the average match result is still good.
Suite can write all the acceleration files seperately or integrally.
Suite can plot all the accelerations and spectrums interactively for users to inspect.
Suite can generate .tcl file to be used in OpenSees.
All the classes mentioned above can be stored as string file.
Currently they are json files with ext: .spectrum, .motion, .suite.
These files can be read by static methods in every class.
Change log:
-----------
2020-04-08 13:53:32 v1.0
- Join the three classes to one single file, refactor some functions.
- Unify some methods, and create good documentation.
- Add plotly interactive plots.
'''
import os
import re
import json
import random
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.signal import lsim
from scipy.interpolate import interp1d
from scipy.optimize import least_squares
class Spectrum():
'''A class storing a spectrum.
It can be init-ed simply with (periods, spectrum) sampling points.
It can also be init-ed by static functions to generate Code-confined spectrums.
Parameters
----------
`periods` : List, numpy.ndarray
List of sampling periods.
It can be generated by the static function `Spectrum.periods_standard` .
Otherwise, users can specify desired period sampling points.
`spectrum` : List, numpy.ndarray
List of the spectrum points corresponding to sampling periods.
'''
def __init__(self, periods, spectrum):
self.periods = np.array(periods)
'''numpy.ndarray, sampling periods'''
self.spectrum = np.array(spectrum)
'''numpy.ndarray, spectrum corresponding to sampling periods.'''
def match(self, target, start, end, step=0.1):
'''Match the current spectrum with a target spectrum. Use Least Square method.
Parameters
----------
`target` : `Spectrum`
the spectrum to be matched.
`start`, `end` : float
the matching period boundaries.
`step` : float
evenly sample the periods with the step.
Returns
-------
`coefficient` : float
the coefficient to match the current spectrum to the target.
`srss` : float
the srss after match.
'''
periods = np.arange(start, end+step, step)
this_spectrum = np.array([self.get(p) for p in periods])
that_spectrum = np.array([target.get(p) for p in periods])
coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2)
srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5
return coefficient, srss
def get(self, period):
'''Get the spectrum value. Either a list or a number is accepted.
If the period exceeds the largest sampling period, the last spectrum value is returned.
The spectrum value not on sampling points are linearly interpolated.
Parameters
----------
`period` : number, list, numpy.ndarray
The interested period.
Returns
-------
`spectrum` : number, numpy.ndarray
if period is list-like, return a ndarray.
if period is a number, return a number.
'''
if not hasattr(period, '__iter__'):
i = self.periods.searchsorted(period)
if self.periods[i] == period:
return self.spectrum[i]
elif i >= len(self.periods):
print("Warning: given period exceeds the sampling range.")
return self.spectrum[-1]
else:
return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period)
else:
return np.array([self.get(p) for p in period])
def scale(self, factor_x=1, factor_y=1):
'''Scale the spectrum.
Parameters
----------
`factor_x` : number
scaling factor in x direction (periods)
`factor_y` : number
scaling factor in y direction (spectrum)
Returns
-------
`spectrum` : `Spectrum`
'''
return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y)
def plot(self, filename=None, engine='pyplot'):
'''Plot the spectrum.
Parameters
----------
`filename` : string, None
if None, show it. Else, save it to the filename
`engine` : 'pyplot' or 'plotly'
Plot engine.
Outputs
-------
A file with name `filename`.png or `filename`.html will be generated.
'''
if engine == 'pyplot':
plt.figure(figsize=(4, 3))
plt.plot(self.periods, self.spectrum, marker='o')
plt.xlabel('Period (s)')
plt.ylabel('Spetrum')
plt.xlim([0, self.periods[-1]])
if filename is None:
plt.show()
else:
plt.savefig(filename+'.png', dpi=200)
elif engine == 'plotly':
fig = go.Figure()
fig.add_scatter(dict(
x='self.periods',
y='self.spectrum',
mode='lines+markers',
))
if filename is None:
fig.show()
else:
fig.write_html(filename+'.html')
else:
print('ERROR: engine should be in "pyplot" or "plotly"')
def to_dict(self):
'''Convert the instance properties to a dict.
Returns
-------
`it` : dict
'''
return dict(
periods=self.periods.tolist(),
spectrum=self.spectrum.tolist()
)
def to_json(self):
'''Convert the properties to json.
Returns
-------
`json` : string
'''
return json.dumps(self.to_dict())
def save(self, filename):
'''Save the spectrum to a file. Currently use json form.
Parameters
----------
`filename` : string
The file name to be saved.
The extension '.spectrum' will be automatically added.
Outputs
-------
A file with name `filename`.spectrum will be generated.
'''
with open(filename + '.spectrum', 'w') as f:
f.write(self.to_json())
def write_csv(self, filename):
'''Write the spectrum to a csv file.
Parameters
----------
`filename` : string
The extension '.csv' will be automatically added.
Outputs
-------
A file named `filename`.csv will be generated.
'''
np.savetxt(filename+'.csv', np.vstack(
[self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum')
@staticmethod
def read_csv(filename):
'''Create an instance by reading the written csv file.
Parameters
----------
`filename` : string
The filename to read.
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
if not filename.endswith('.csv'):
filename += '.csv'
data = np.loadtxt(filename, delimiter=',')
return Spectrum(periods=data[:, 0], spectrum=data[:, 1])
@staticmethod
def read_dict(it):
'''Create an instance from the dict.
Parameters
----------
`it` : dict
The dict generated by `Spectrum.to_dict`
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
return Spectrum(periods=it['periods'], spectrum=it['spectrum'])
@staticmethod
def read_json(string):
'''Create an instance from json string.
Parameters
----------
`string` : str
The json string
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
it = json.loads(string)
return Spectrum.read_dict(it)
@staticmethod
def load(filename):
'''Load the spectrum from file.
Parameters
----------
`filename` : str
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
if not filename.endswith('.spectrum'):
filename += '.spectrum'
with open(filename, 'r') as f:
string = f.read()
return Spectrum.read_json(string)
@staticmethod
def periods_standard(until=6):
'''Get a standard sampling of the periods.
The sampling points are more sparse with the increase of period.
Parameters
----------
`until` : number
The end of the period sampling list. It should >= 4.0.
Returns
-------
`periods` : numpy.ndarray
The standard sampling points.
'''
assert until >= 4
return np.hstack([np.arange(0, 0.2, 0.01),
np.arange(0.2, 0.5, 0.02),
np.arange(0.5, 1.0, 0.05),
np.arange(1.0, 2.0, 0.1),
np.arange(2.0, 4.0, 0.2),
np.arange(4.0, until+0.5, 0.5)])
@staticmethod
def chinese_point(period, intensity, major, site, group, damping=0.05):
'''Get spectrum point for Chinese code, given a period.
Parameters
----------
`period` : number
The given period.
`intensity` : number
Intensity represented by a number.
Use 7.5 or 8.5 for half-level intensity.
`major` : boolean
If True, it's for major (rare) earthquake.
If False, for minor (frequent) earthquake.
`site` : int
Site number (Roman number in the code. 0 for I0, 1 for I1.)
`group` : int
Site class (Chinese number in the code.)
`damping` : float
Spectrum damping.
Returns
-------
`value` : float
the specific spectrum point value corresponding to the period.
'''
# alpha max
if intensity == 6:
alphamax = 0.28 if major else 0.04
elif intensity == 7:
alphamax = 0.50 if major else 0.08
elif intensity == 7.5:
alphamax = 0.72 if major else 0.12
elif intensity == 8:
alphamax = 0.9 if major else 0.16
elif intensity == 8.5:
alphamax = 1.2 if major else 0.24
elif intensity == 9:
alphamax = 1.40 if major else 0.32
else:
raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]")
# tg
tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65],
[0.25, 0.3, 0.4, 0.55, 0.75],
[0.3, 0.35, 0.45, 0.65, 0.9]]
assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4]
tg = tg_table[group-1][site]
# other parameters
gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping)
eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping)
eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping)
# calculate
if period < 0.1:
return ((10*eta2 - 4.5) * period + 0.45) * alphamax
elif period < tg:
return eta2 * alphamax
elif period < 5*tg:
return (tg/period)**gamma * eta2 * alphamax
elif period <= 6:
return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax
else:
return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax
@staticmethod
def chinese(intensity=8, major=True, site=3, group=2, damping=0.05):
'''Return a new spectrum instance that confines to Chinese Code.
Use `Spectrum.periods_standard` to get sampling periods.
Parameters
----------
refer to `Spectrum.chinese_point`
Returns
-------
`spectrum` : `Spectrum`
'''
periods = Spectrum.periods_standard(until=6)
spectrum = [Spectrum.chinese_point(p,
intensity=intensity,
major=major,
site=site,
group=group,
damping=damping) for p in periods]
return Spectrum(periods, spectrum)
class Motion(object):
'''A class plays with ground motion. Store accel time history and accel spectrum only.
'''
def __init__(self, folder, name, damping=0.05):
'''
Parameters
----------
`folder` : str
the folder this motion is stored.
`name` : str
The motion name. The motion will be saved to '{folder}/{name}.motion'
`damping` : float
The damping ratio of the motion spectrum.
'''
self.folder = folder
'''str. See `Motion`.'''
os.makedirs(folder, exist_ok=True)
self.name = name
'''str. See `Motion`.'''
self.accel = None
'''numpy.ndarray. Acceleration time history with evenly distributed sampling points.'''
self.dt = None
'''float. Acceleration time sampling delta t.'''
self.information = ''
'''str. Store massive infomation about the motion. '''
self.spectrum = None
'''`Spectrum`. The spectrum of the motion.'''
self.damping = damping
'''float. The damping ratio of the spectrum. Note: init with 0.05.'''
def get_PGA(self):
'''
Returns
-------
`value` : float
The peak ground acceleration.
'''
return np.max(abs(self.accel))
def get_duration(self):
'''
Returns
-------
`value` : float
The duration of the motion.
'''
return (len(self.accel) - 1) * self.dt
def get_times(self):
'''
Returns
-------
`value` : numpy.ndarray
The time series of the motion
'''
return np.arange(len(self.accel)) * self.dt
def generate_spectrum(self, periods=None):
'''Use scipy LTS signal processing module to generate the spectrum.
Note that the accel spectrum is pseudo-spectrum derived from disp spectrum.
So it's only accurate when damping is small.
If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated,
a blank spectrum will be returned.
Parameters
----------
`periods` : List-like, None
The period sampling points.
If is None, `Spectrum.periods_standard` will be called.
Returns
-------
`spectrum` : `Spectrum`
The generated spectrum.
Outputs
-------
`Motion.spectrum` will be changed.
'''
if periods is None:
periods = Spectrum.periods_standard()
if self.dt is not None:
print("Generating spectrum ...")
omegas = 2 * np.pi / periods[1:]
num = np.array([-1])
spectrum = np.zeros(len(periods))
spectrum[0] = self.get_PGA()
for i, omega in enumerate(omegas):
den = np.array([1, 2*self.damping*omega, omega**2])
_, y, _ = lsim((num, den), self.accel, self.get_times())
spectrum[i+1] = np.max(abs(y)) * omega ** 2
self.spectrum = Spectrum(periods, spectrum)
else:
print("Dt is not known. Assigning empty spectrum.")
self.spectrum = Spectrum(periods=[], spectrum=[])
return self.spectrum
def load_peer(self, peer_filename):
'''Load acceleration time-history files downloaded from PEER.
The strings in the file will be parsed and also stored in self.information.
Parameters
----------
`peer_filename` : str
Outputs
-------
`Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed.
'''
self.information += f'peer_filename: {peer_filename}'
try:
f = open(peer_filename, 'r')
for _ in range(3):
self.information += f.readline()
s = f.readline()
self.information += s
re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC")
mt = re_dt.match(s)
try:
self.dt = float(mt.group(1).replace('.', '0.'))
print(f'Extracted dt = {self.dt:.4f}')
except:
print(f'ERROR: cannot find dt from {s}')
self.dt = 0.000001
series = []
datastring = f.readlines()
for dtline in datastring:
for dt in dtline.split(" "):
if dt:
try:
series.append(float(dt.replace(".", "0.")))
except ValueError:
break
self.accel = np.array(series)
print(f'Extracted npts = {len(self.accel)}')
except IOError:
print(f'No file named {self.name} is found.')
def convert_peer(self, peer_filename):
'''A recipe to convert the peer acceleration file to the .motion file.
Parameters
----------
`peer_filename` : str
`folder` : str
the folder to save .motion file.
Outputs
-------
A .motion file will be created in `folder` with the PEER filename.
'''
print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}')
self.load_peer(peer_filename)
self.generate_spectrum()
self.save()
def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None):
'''Trim the motion with two coefficients of PGA.
This method is used when a long near-zero time-history is stored in the data file.
Parameters
----------
`new_name` : str
`new_folder` : str, None
`left`, `right` : float
The coefficient of PGA to trim the time history.
Search from left or right, when the target point is found,
search for the nearest near-zero point. Then trim off the small values.
`prepend_zero` : boolean
If True, prepend a zero value to the trimmed acceleration.
`log` : boolean
If the log picture will be generated.
`log_folder` : str, None
Path for the folder to save the trim logs.
If None, save in the previous folder.
Returns
-------
`motion` : `Motion`
The trimmed motion.
Outputs
-------
If log, plot a figure in log_folder named '{new_name}_trimlog.png'
'''
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
pga = self.get_PGA()
# search left
i = 0
while abs(self.accel[i]) < pga * left:
i += 1
while i > 0 and self.accel[i] * self.accel[i-1] > 0:
i -= 1
i_left = i
# search right
i = -1
while abs(self.accel[i]) < pga * right:
i -= 1
while i < -1 and self.accel[i] * self.accel[i+1] > 0:
i += 1
i_right = len(self.accel) + i
if prepend_zero:
mo.accel = np.hstack([0, self.accel[i_left:i_right]])
else:
mo.accel = self.accel[i_left:i_right]
mo.generate_spectrum()
print(
f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
if log:
log_folder = self.folder if log_folder is None else log_folder
os.makedirs(log_folder, exist_ok=True)
_, ax = plt.subplots(3, 1, figsize=(7, 5))
ax[0].plot(self.get_times(), self.accel, label='before')
ax[1].plot(mo.get_times(), mo.accel, label='after')
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
for i in range(3):
ax[i].legend()
plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200)
return mo
def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None):
'''Change dt for the accel. Linearly interpolate the acceleration, and re-sample.
Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum.
Parameters
----------
`dt` : float
New sampling delta t.
`new_name` : str
`new_folder` : str, None
`log` : boolean
If log, create a log picture comparing the before and after accels and spectrum.
`log_folder` : str, None
Path for the folder to save the trim logs.
If None, save in the previous folder.
Returns
-------
`motion` : `Motion`
Outputs
-------
If log, create a picture named '{log_folder}/{new_name}_dtlog.png'.
'''
if dt == self.dt:
print(f'Motion {self.name} do not need to change dt')
return self.copy()
else:
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
f = interp1d(self.get_times(), self.accel)
npts = np.floor(self.dt * len(self.accel) / dt)
mo.accel = f(np.arange(npts)*dt)
mo.dt = dt
mo.generate_spectrum()
print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
if log:
log_folder = self.folder if log_folder is None else log_folder
os.makedirs(log_folder, exist_ok=True)
_, ax = plt.subplots(3, 1, figsize=(7, 5))
ax[0].plot(self.get_times(), self.accel, label='before')
ax[1].plot(mo.get_times(), mo.accel, label='after')
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
for i in range(3):
ax[i].legend()
plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200)
return mo
def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None):
'''Truncate the acceleration.
Sometimes, a ground motion file may have two peaks or more.
So if only one peak is needed, this method can be used to skip the other peaks.
Parameters
----------
`time` : float
The accels after the time will be cut off.
`new_name` : str
`new_folder` : str, None
`log` : boolean
`log_folder` : str, None
Returns
-------
`motion` : `Motion`
Outputs
-------
If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created.
'''
i = round(time / self.dt)
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
if i >= len(self.accel):
print(f"WARNING: time is longer than duration. {self.name} is not truncated.")
return mo
mo.accel = self.accel[:i]
mo.generate_spectrum()
print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
if log:
log_folder = self.folder if log_folder is None else log_folder
os.makedirs(log_folder, exist_ok=True)
_, ax = plt.subplots(3, 1, figsize=(7, 5))
ax[0].plot(self.get_times(), self.accel, label='before')
ax[1].plot(mo.get_times(), mo.accel, label='after')
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
for i in range(3):
ax[i].legend()
plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200)
return mo
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1):
'''Scale the ground motion.
Parameters
----------
`new_name` : str
`new_folder` : str, None
`factor_x` : float
The scaling factor on the time/period axis.
`factor_y` : float
The scaling factor on the accel axis.
Returns
-------
`motion` : Motion
'''
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
if factor_x != 1:
mo.dt = self.dt * factor_x
if factor_y != 1:
mo.accel = self.accel * factor_y
mo.spectrum = self.spectrum.scale(factor_x, factor_y)
return mo
def plot_timehistory(self, folder=None, save=True, engine='pyplot'):
'''Plot the time history of the acceleration.
Parameters
----------
`folder` : str, None
Store folder. If None, use `Motion.folder`
`save` : boolean
Save the figure or not.
`engine` : 'pyplot' or 'plotly'
Outputs
-------
if save is True, create a figure '{folder}/{self.name}_th'.
'''
folder = self.folder if folder is None else folder
if engine == 'pyplot':
plt.figure()
t = self.get_times()
plt.plot(t, self.accel)
if save:
plt.savefig(f'{folder}/{self.name}_th.png', dpi=200)
else:
plt.show()
elif engine == 'plotly':
fig = go.Figure()
fig.add_trace(go.Scatter(
x=self.get_times(),
y=self.accel,
mode='lines',
name=self.name
))
if save:
fig.write_html(f'{folder}/{self.name}_th.html')
else:
fig.show()
else:
print('ERROR: engine should be in "pyplot" or "plotly"')
def plot(self, folder=None, save=True, engine='pyplot'):
'''Plot the time history and the spectrum.
Parameters
----------
See `Motion.plot_timehistory`
Outputs
-------
Create a figure named '{folder}/{name}'
'''
folder = self.folder if folder is None else folder
if engine == 'pyplot':
_, ax = plt.subplots(2, 1, figsize=(7, 8))
ax[0].plot(self.get_times(), self.accel)
ax[0].set_xlabel('Time(s)')
ax[0].set_ylabel('Acceleration(g)')
ax[0].set_title(f'PGA={self.get_PGA():.4f}')
ax[1].plot(self.spectrum.periods, self.spectrum.spectrum)
ax[1].set_xlabel('Period(s)')
ax[1].set_ylabel('Spectrum Acceleration(g)')
ax[1].set_title(f'damping={self.damping:.4f}')
plt.subplots_adjust(hspace=0.3)
if save:
plt.savefig(f'{folder}/{self.name}.png', dpi=200)
else:
plt.show()
elif engine == 'plotly':
fig = make_subplots(rows=1, cols=2)
fig.add_trace(go.Scatter(
x=self.get_times(),
y=self.accel,
mode='lines',
name='accel'
), row=1, col=2)
fig.add_trace(go.Scatter(
x=self.spectrum.periods,
y=self.spectrum.spectrum,
mode='lines',
name='spectrum'
), row=1, col=1)
if save:
fig.write_html(f'{folder}/{self.name}.html')
else:
fig.show()
else:
print('Engine should be in "pyplot" or "plotly"')
def write_accel(self, filename, time=True):
'''Write the acceleration timehistory to a single file.
Parameters
----------
`filename` : str
The file path to store the accelration.
`time` : boolean
If True, a time column is created.
To know dt in the file content, time should be True.
Otherwise, dt should be stored in the filename.
Outputs
-------
An text file with acceleration information will be written.
'''
if time:
data = np.vstack([self.get_times(), self.accel])
np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel")
else:
np.savetxt(filename, self.accel, fmt="%.7e")
def write_info(self, filename):
'''Write the information to a single file.
Parameters
----------
`filename` : str
Outputs
-------
A text file with information will be created.
'''
with open(filename, 'w') as f:
f.write(f'dt={self.dt:.3}\n')
f.write(f'npts={len(self.accel)}\n')
f.write(f'damping={self.damping:.3}\n')
f.write(self.information)
def write_spectrum(self, filename):
'''Write the spectrum to a single file. Use the form of csv.
Parameters
----------
`filename` : str
Outputs
-------
A csv file will be created.
'''
self.spectrum.write_csv(filename)
def read_accel(self, filename, dt=True):
'''Load an single-column acceleration file to the current instance.
Parameters
----------
`dt` : float, boolean
if dt is float, `Motion.dt` will be set to dt.
Else if dt is True, `Motion.dt` will be found in the file.
Outputs
-------
Change `Motion.dt` and `Motion.accel` in the CURRENT instance.
'''
if not isinstance(dt, float):
data = np.loadtxt(filename)
self.accel = data[:, 1]
self.dt = data[1, 0]
else:
self.accel = np.loadtxt(filename)
self.dt = dt
def read_spectrum(self, filename):
'''Read the spectrum from csv file.
Parameters
----------
`filename` : str
Outputs
-------
The `Motion.spectrum` of the CURRENT instance will be changed.
'''
spectrum = Spectrum.read_csv(filename)
self.spectrum = spectrum
def read_info(self, filename):
'''Read info from text file.
Parameters
----------
`filename` : str
Outputs
-------
The `Motion.information` of the CURRENT instance will be changed.
'''
with open(filename, 'r') as f:
self.dt = float(f.readline().replace("dt=", ""))
_ = int(f.readline().replace("npts=", ""))
self.damping = float(f.readline().replace("damping=", ""))
self.information = f.read()
def to_dict(self):
'''Convert the instance properties to dict.
Returns
-------
`it` : dict
'''
return dict(
dt=self.dt,
npts=len(self.accel),
damping=self.damping,
information=self.information,
accel=self.accel.tolist(),
spectrum=self.spectrum.to_dict(),
folder=self.folder,
name=self.name,
)
def to_json(self):
'''Convert the instance properties to string using json format.
Returns
-------
`string` : str
'''
return json.dumps(self.to_dict())
def save(self):
'''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name`
Outputs
-------
A file named '{self.folder}/{self.name}.motion' will be created.
'''
with open(f'{self.folder}/{self.name}.motion', 'w') as f:
f.write(self.to_json())
@staticmethod
def read_dict(it):
'''Create a instance by the dict generated by `Motion.to_dict`.
Parameters
----------
`it` : dict
Returns
-------
`mo` : `Motion`
'''
mo = Motion(folder=it['folder'], name=it['name'])
mo.dt = it['dt']
mo.damping = it['damping']
mo.information = it['information']
mo.accel = np.array(it['accel'])
mo.spectrum = Spectrum.read_dict(it['spectrum'])
return mo
@staticmethod
def read_json(string):
'''Create a new instance by the string generated by `Motion.to_json`.
Parameters
----------
`string` : str
Returns
-------
`mo` : `Motion`
'''
it = json.loads(string)
return Motion.read_dict(it)
@staticmethod
def load(filename):
'''Load the file created from `Motion.save` method.
Parameters
----------
`filename` : str
Returns
-------
`mo` : `Motion`
'''
if not filename.endswith('.motion'):
filename += '.motion'
with open(filename, 'r') as f:
return Motion.read_json(f.read())
def copy(self):
'''Copy the instance.
Returns
-------
`mo` : `Motion`
'''
return Motion.read_dict(self.to_dict())
class Suite(object):
'''A class that contains mainly a series of `Motion` instances,
and corresponding series of factors, matching the suite of motions to the target `Spectrum`.
'''
def __init__(self, folder, name, target_spectrum, period_bound):
'''
Parameters
----------
`folder` : str
A path-like string for the suite.
`name` : str
A specific name for the suite. The instance will be saved in '{folder}/{name}.suite'
`target_spectrum` : `Spectrum`
The target spectrum of the suite.
`period_bound` : List<float> len=2
The matching period boundaries.
'''
self.folder = folder
'''str. See `Suite`'''
os.makedirs(folder, exist_ok=True)
self.name = name
'''str. See `Suite`'''
self.motions = []
'''List<`Motion`>. Store all the motions.'''
self.factors = []
'''List<float>. Store all the factors that should be applied to the motions.'''
self.target_spectrum = target_spectrum
'''`Spectrum`. See `Suite`'''
self.period_bound = period_bound
'''List<float>. See `Suite`'''
self.period_samples = np.linspace(period_bound[0], period_bound[1], 200)
'''numpy.ndarray. The period within the `period_bound` is evenly sampled, with 200 points.'''
self.spectrum_matrix = None
'''numpy.ndarray. A matrix with shape (len(`Suite.period_samples`), len(`Suite.motions`)).
This matrix times the `Suite.factors` vector products the average spectrum with `Suite.period_samples`.'''
self.target_vector = target_spectrum.get(self.period_samples)
'''numpy.ndarray. The vector generated by the target_spectrum sampling the `Suite.period_samples`'''
def set_folder(self, folder):
'''Reset the folder for the suite and the motions.
Parameters
----------
`folder` : str
Outputs
-------
`Suite.folder` and `Suite.motions` will be changed.
'''
if folder is not None:
self.folder = folder
os.makedirs(folder, exist_ok=True)
for mo in self.motions:
mo.folder = folder
def add_motion(self, motion, factor=1):
'''Add motion to the suite together with a factor, to keep the lengths are identical.
Parameters
----------
`motion` : `Motion`
`factor` : number
Outputs
-------
`Suite.motions` and `Suite.factors` will be changed.
'''
self.motions.append(motion)
self.factors.append(factor)
def load_motions_from_names(self, names, folder=None):
'''Give a list of name and the folder, load motions.
Parameters
----------
`names` : List<str>
The list of motion names.
`folder` : str, None
The folder of the motions.
If is None, use `Suite.folder`
Outputs
-------
`Suite.motions` and `Suite.factors` will be changed.
'''
folder = self.folder if folder is None else folder
for name in names:
mo = Motion.load(f'{folder}/{name}.motion')
self.add_motion(mo)
print(f'Loaded motion {mo.name}')
self.reset_spectrum_matrix()
def load_motions_from_folder(self, folder=None):
'''Find all the .motion file from the given folder, and load them.
Parameters
----------
`folder` : str, None
The folder to search the .motion files.
If is None, use `Suite.folder` instead.
Outputs
-------
`Suite.motions` and `Suite.factors` will be changed.
'''
folder = self.folder if folder is None else folder
filenames = os.listdir(self.folder)
for filename in filenames:
if filename.endswith('.motion'):
mo = Motion.load(f'{folder}/{filename}')
self.add_motion(mo)
print(f'Loadad motion {mo.name}')
self.reset_spectrum_matrix()
def filter_by_IDs(self, ids, new_name, new_folder=None):
'''Give a series of selected IDs, filter the motions.
Parameters
----------
`ids` : List<int>
The list of indexes of the selected motions in the current suite.
`new_name` : str
`new_folder` : str, None
Returns
-------
`suite` : `Suite`
A new suite with the selected motions and their factors only.
'''
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
suite.motions = [self.motions[i] for i in ids]
suite.factors = [self.factors[i] for i in ids]
suite.reset_spectrum_matrix()
print(f'The motions have been filtered. {len(ids)} motions remain.')
return suite
def filter_by_file(self, filename, new_name, new_folder=None):
'''Give a file with indexes selected.
Parameters
----------
Refer to `Suite.filter_by_IDs`.
Returns
-------
Refer to `Suite.filter_by_IDs`.
'''
with open(filename, 'r') as f:
lines = f.readlines()
ids = [int(l) for l in lines]
return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder)
def get_average_spectrum(self):
'''Return a spectrum which is the average spectrum of the suite.
Outputs
-------
`spectrum` : `Spectrum`
'''
periods = self.motions[0].spectrum.periods
average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0]
for i in range(1, len(self.motions)):
average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i]
average_spectrum /= len(self.motions)
return Spectrum(periods, average_spectrum)
def get_srss(self):
'''Get the srss of the current averate spectrum to the target spectrum in the period boundary.
Returns
-------
`srss` : float
'''
return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item()
def reset_spectrum_matrix(self):
'''Reset the spectrum matrix, which produces the average spectrum
in the boundary by multiplying factors vector.
This method should be called everytime `Suite.motions` change.
Outputs
-------
`Suite.spectrum_matrix` and `Suite.target_vector` will be changed.
'''
mat = np.zeros((len(self.period_samples), len(self.motions)))
for i in range(len(self.motions)):
spectrum = self.motions[i].spectrum.get(self.period_samples)
mat[:, i] = spectrum / len(self.motions)
self.spectrum_matrix = mat
self.target_vector = self.target_spectrum.get(self.period_samples)
def eliminate_neg(self):
'''Eliminate negative target vector by multiplying a factor for every individual.
In other words, make the average spectrum above the target spectrum.
Returns
-------
`factor` : float
The global factor that should be applied to each of the factors.
Outputs
-------
`Suite.factors` are all amplified by the returned `factor`.
'''
error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector
argmin = np.argmin(error)
factor = 1 / (1 + error[argmin] / self.target_vector[argmin])
print(f'Multiply {factor:.4} to eliminate negative spectrum.')
self.factors = [self.factors[i] * factor for i in range(len(self.factors))]
return factor
def match_individual_LSQ(self):
'''Use Least square method for each individual motion to match the spectrum.
Returns
-------
`residuals` : numpy.ndarray
srss of each motion matched to the target spectrum.
Outputs
-------
`Suite.factors` are changed.
'''
residuals = np.zeros(len(self.motions))
for i in range(len(self.motions)):
this = self.spectrum_matrix[:, i] * len(self.motions)
that = self.target_vector
factor = np.dot(this, that) / np.dot(this, this)
residual = np.linalg.norm(this * factor - that)
self.factors[i] = factor
residuals[i] = residual
print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}")
return residuals
def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000):
'''Use Monte Carlo simulation and Least Square method to optimize the suite.
after running, a bunch of suites will be written to the new_folder.
Parameters
----------
`count` : int
The number of motions in the resulting suite.
`times` : int
The number of times that MC simulation with dimension is run.
The total number of samples of MC simulation will be {times} * {dimension}.
`use_lsq` : boolean
Whether use LSQ optimizer to better optimize the suite.
`output_count` : int
The number of outputs. if 0, output all.
`new_folder` : str
`lower_bound`, `upper_bound` : float
For the purpose that the individual motions are not too far from the individually matched factors,
the lower and upper boundaries of amplifying the individually matched factors are set.
This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer.
`dimension` : int
Each time the Monte Carlo simulation is run,
means that {dimension} times of simulation have been done by using a matrix target.
Returns
-------
List<`Suite`> with len=`output_count`.
The list is sorted. The best answer is the first result. However, visual check should be done.
Outputs
-------
Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted.
'''
new_folder = new_folder if new_folder is not None else self.folder
os.makedirs(new_folder, exist_ok=True)
suites = []
srsses = []
for i in range(times):
suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder)
suites.append(suite_new)
srsses.append(suite_new.get_srss())
if use_lsq:
suite_new2 = suite_new.optimize_LSQ(
new_name=f'lsq{i:03d}',
new_folder=new_folder,
lower_bound=lower_bound,
upper_bound=upper_bound
)
suites.append(suite_new2)
srsses.append(suite_new2.get_srss())
argsorted = np.argsort(np.array(srsses))
results = [suites[i] for i in argsorted[:output_count]]
for i, suite in enumerate(results):
suite.name = f'no{i}-{suite.name}'
suite.save()
suite.plot_all_spectrums(engine='pyplot')
return results
def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000):
'''Use Monte Carlo simulation to filter the suite to count numbers of motions.
Parameters
----------
Refer to `Suite.filter_optimize`.
Returns
-------
`suite` : `Suite`
The optimized result after {dimension} times of MC simulations.
'''
print(f'Running Monte Carlo simulation, dimension={dimension}')
self.match_individual_LSQ()
factors = np.array(self.factors)
xs = np.zeros((len(self.motions), dimension))
for i in range(dimension):
indexes = random.sample(range(len(self.motions)), count)
xs[indexes, i] = factors[indexes]
mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs)
loss = mul - self.target_vector.reshape(-1, 1)
argmin = np.argmin(loss, axis=0)
lossmin = np.min(loss, axis=0)
target_at_lossmin = self.target_vector[argmin]
amps = 1 / (lossmin / target_at_lossmin + 1)
lossAmp = mul * amps - self.target_vector.reshape(-1, 1)
srss = np.linalg.norm(lossAmp, axis=0)
arg = np.argmin(srss)
factors = xs[:, arg] * amps[arg]
suvives = np.where(factors > 1.0e-3)
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
suite.motions = [self.motions[i] for i in suvives[0]]
suite.factors = [factors[i] for i in suvives[0]]
suite.reset_spectrum_matrix()
return suite
def _loss(self, x):
'''Utility function: calculate the loss for optimization.'''
factors = x * np.array(self.factors)
mul = np.matmul(self.spectrum_matrix, factors)
loss = mul - self.target_vector
argmin = np.argmin(loss)
amp = 1 / (loss[argmin] / self.target_vector[argmin] + 1)
return mul * amp - self.target_vector
def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4):
'''Use Least Square method from scipy to optimize the factors.
Parameters
----------
Refer to `Suite.filter_optimize`.
Returns
-------
`suite` : `Suite`
Optimized suite.
'''
print('Running Bounded Least Square optimization ...')
res = least_squares(self._loss,
np.ones(len(self.motions)),
bounds=(lower_bound, upper_bound),
verbose=0)
factors = res.x * self.factors
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
suite.factors = factors.tolist()
suite.eliminate_neg()
return suite
def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False):
'''From a folder load all the PEER motions into the suite.
Extract the downloaded peer zip file first.
Parameters
----------
`peer_folder` : str
The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found.
`h1`, `h2`, `v` : boolean
Whether load the three directional files or not.
Outputs
-------
`Suite.motions`, `Suite.factors` will be changed.
All the PEER motions will be saved in .motion format in the new folder.
'''
with open(f'{peer_folder}/_SearchResults.csv', "r") as f:
for _ in range(34):
f.readline()
i = 0
positions = []
if h1:
positions.append(19)
if h2:
positions.append(20)
if v:
positions.append(21)
line = f.readline()
while line != '\n':
cells = line.split(',')
for pos in positions:
filename = cells[pos].replace(' ', '')
motion_name = filename.replace('.AT2', '')
print(f'Loading {motion_name}')
mo = Motion(folder=self.folder, name=motion_name)
mo.load_peer(peer_filename=f'{peer_folder}/{filename}')
mo.generate_spectrum()
mo.save()
self.add_motion(mo)
i += 1
line = f.readline()
self.reset_spectrum_matrix()
print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.")
return True
def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True):
'''Beautify the suite in 4 steps:
1. truncate the motions, see `Motion.truncate`
2. trim the near-zero parts, see `Motion.trim`
3. change dt to uniform, see `Motion.change_dt`
4. rename the motions to a series.
This procedure is always logged to the current folder.
Parameters
----------
`new_name` : str
`run_steps` : List<int>
The step number to run. Users can select which one out of the 4 steps to run.
The order will also follow this List.
`dt` : float
The uniform delta t.
`new_folder` : str
`trunc_dict` : dict
A dict that defines the `time` parameter in `Motion.truncate`
In the dict, keys are the indexes of the motions in the suite, values are the `time` values.
`left`, `right` : float
The parameters of PGA to trim. Refer to `Motion.trim`.
Returns
-------
`suite` : `Suite`
Outputs
-------
All the logs and the pre- and post- beautify average spectrum.
'''
suite = self.copy()
spectrum_hist = suite.get_average_spectrum()
suite.plot_all_spectrums(engine='pyplot')
for i, motion in enumerate(self.motions):
mo = motion.copy()
mo.name = f'{self.name}-{i}-b'
for step in run_steps:
if step == 1:
if i in trunc_dict:
mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1')
elif step == 2:
mo = mo.trim(new_name=mo.name+'2', left=left, right=right)
elif step == 3:
mo = mo.change_dt(dt=dt, new_name=mo.name+'3')
elif step == 4:
mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}'
else:
print(f'step={step} is not recognized and ignored.')
suite.motions[i] = mo
spectrum_new = suite.get_average_spectrum()
plt.figure()
plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before')
plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after')
plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target')
plt.legend()
plt.suptitle(f'Beautify: {self.name} -> {new_name}')
plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200)
suite.name = new_name
if new_folder is not None:
suite.set_folder(new_folder)
return suite
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True):
'''Scale the motions both horizontally and vertically.
Parameters
----------
`new_name` : str
`new_folder` : str, None
`factor_x`, `factor_y` : float
The factors to amplify in x axis and y axis.
`scale_target_spectrum` : boolean
Whether scale the target spectrum as well.
Returns
-------
`suite` : `Suite`
'''
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
for i, motion in enumerate(self.motions):
suite.motions[i] = motion.scale(
new_name=motion.name, factor_x=factor_x, factor_y=factor_y)
if scale_target_spectrum:
suite.target_spectrum = self.target_spectrum.scale(
factor_x=factor_x, factor_y=factor_y)
if factor_x != 1:
suite.period_bound = [p * factor_x for p in self.period_bound]
suite.period_samples = self.period_samples * factor_x
if factor_y != 1:
suite.factors = [f*factor_y for f in self.factors]
suite.reset_spectrum_matrix()
return suite
def write_TCL(self, folder=None, filename=None):
'''Write the suite to TCL forms for OpenSees.
Currently the TCL contains only one dict.
The keys of the dict is the motion names. Then the essential properties are stored.
At the same time, the accelerations for each motion is written to a text file.
Parameters
----------
`folder` : str, None
The folder to write the tcl files.
If is None, use `Suite.folder`.
`filename` : str, None
The tcl file name. If is None, use `Suite.name`.
Outputs
-------
A tcl file and a series of acceleration files.
'''
folder = self.folder if folder is None else folder
filename = self.name if filename is None else filename.replace(
'.tcl', '') + '.tcl'
os.makedirs(folder, exist_ok=True)
config = ''
config += 'dict set motion keys {'
names = [mo.name for mo in self.motions]
config += ' '.join(names)
config += '}\n\n'
for i, mo in enumerate(self.motions):
mo.write_accel(f'{folder}/{mo.name}', time=False)
config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n'
config += f'dict set motion {mo.name} npts {len(mo.accel)}\n'
config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n'
config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n'
with open(f'{folder}/{filename}.tcl', 'w') as f:
f.write(config)
def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'):
'''Plot amplified accelograms to a single file.
Parameters
----------
`folder` : str, None
If is None, use `Suite.folder`.
`filename` : str, None
If is None, use `Suite.name` + '_accel'
`save` : boolean
If True, save the file. Else, show immediately.
`engine` : 'pyplot' or 'plotly'
Outputs
-------
A figure is plotted.
'''
folder = self.folder if folder is None else folder
if engine == 'plotly':
filename = f'{self.name}_accel.html' if filename is None else filename + '.html'
fig = go.Figure()
for i, motion in enumerate(self.motions):
fig.add_trace(go.Scatter(
x=np.arange(len(motion.accels)) * motion.dt,
y=motion.accels,
mode='lines',
name=str(i) + motion.name,
text=motion.name
))
if save:
fig.write_html(f'{folder}/{filename}')
else:
fig.show()
elif engine == 'pyplot':
print('Pyplot is not supported.')
else:
print('ERROR: engine should be in "pyplot" and "plotly"')
def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'):
'''Plot all amplified spectrums in one file.
Parameters
----------
Refer to `Suite.plot_all_accels`.
The filename if is None will be `Suite.name` + '_spectrum'
Outputs
-------
A figure will be generated.
'''
if engine == 'plotly':
folder = self.folder if folder is None else folder
filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html'
fig = go.Figure()
for i, motion in enumerate(self.motions):
fig.add_trace(go.Scatter(
x=motion.spectrum.periods,
y=motion.spectrum.spectrum * self.factors[i],
mode='lines',
name=motion.name,
text=motion.name
))
spectrum1 = self.target_spectrum
spectrum2 = self.get_average_spectrum()
fig.add_trace(go.Scatter(
x=spectrum1.periods,
y=spectrum1.spectrum,
mode='lines',
name='Target',
text='Target',
line=dict(width=5),
))
fig.add_trace(go.Scatter(
x=spectrum2.periods,
y=spectrum2.spectrum,
mode='lines',
name='Average',
text='Average',
line=dict(width=5),
))
if save:
fig.write_html(f'{folder}/{filename}')
else:
fig.show()
elif engine == 'pyplot':
folder = self.folder if folder is None else folder
filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png'
plt.figure(figsize=(7, 5))
for i, motion in enumerate(self.motions):
plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver')
plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3)
averageSpectrum = self.get_average_spectrum()
plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2)
plt.legend(loc='best', fontsize='x-small')
plt.title(f'SRSS={self.get_srss():.4f}')
if save:
plt.savefig(os.path.join(folder, filename), dpi=200)
else:
plt.show()
else:
print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.")
def plot_individual(self, save=True):
'''Plot all individual ground motions and the spectrum.
Parameters
----------
Refer to `Suite.plot_all_accels`.
Outputs
-------
If save, a figure for each motion will be generated.
'''
for i, mo in enumerate(self.motions):
_, ax = plt.subplots(2, 1, figsize=(7, 5))
ax[0].plot(np.arange(len(mo.accel)) * mo.dt,
mo.accel * self.factors[i])
ax[1].plot(mo.spectrum.periods,
mo.spectrum.spectrum * self.factors[i])
ax[1].plot(self.target_spectrum.periods,
self.target_spectrum.spectrum, linewidth=2)
plt.suptitle(
f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}')
filename = f'{self.folder}/motion{i}.png'
if save:
plt.savefig(filename, dpi=200)
else:
plt.show()
def plot_interactive(self, save=True):
'''Plot the spectrum and the acceleration time-histories to an interactive html file.
Parameters
----------
`save` : boolean
If True, write to html file. Otherwise, show directly without saving.
Outputs
-------
A html file with interactive plotting sliders.
'''
fig = make_subplots(rows=1, cols=2)
for i in range(len(self.motions)):
fig.add_trace(go.Scatter(
x=self.motions[i].spectrum.periods,
y=self.motions[i].spectrum.spectrum * self.factors[i],
name=self.motions[i].name,
), row=1, col=1)
fig.add_trace(go.Scatter(
x=self.motions[i].get_times(),
y=self.motions[i].accel * self.factors[i],
name=self.motions[i].name,
), row=1, col=2)
spectrum_average = self.get_average_spectrum()
fig.add_trace(go.Scatter(
x=spectrum_average.periods,
y=spectrum_average.spectrum,
name='average',
line={'width': 5}
))
fig.add_trace(go.Scatter(
x=self.target_spectrum.periods,
y=self.target_spectrum.spectrum,
name='target',
line={'width': 5}
))
def _title(i):
return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}'
steps = []
for i in range(len(self.motions)):
step = dict(
method='update',
args=[
{'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]},
{'title': _title(i)}
],
label=str(i),
)
steps.append(step)
sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})]
updatemenus = [dict(
buttons=[dict(
label='All',
method='update',
args=[
{'visible': [True] * (len(self.motions)*2+2)},
{'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'}
]
)],
type='buttons',
x=1,
y=0,
xanchor='right',
yanchor='top',
pad={'t': 30},
)]
fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus)
if save:
fig.write_html(f'{self.folder}/{self.name}.html')
else:
fig.show()
def to_dict(self, separate_motion=True):
'''Convert all the properties to dict.
Parameters
----------
`separate_motion` : boolean
If True, the motion details are not included into the dict.
User has to make sure these motions are not moved.
Returns
-------
`it` : dict
'''
return dict(
folder=self.folder,
name=self.name,
motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [
f'{mo.folder}/{mo.name}' for mo in self.motions],
factors=self.factors if type(self.factors[0] == float) else [
fact.item() for fact in self.factors],
period_bound=self.period_bound,
target_spectrum=self.target_spectrum.to_dict(),
period_samples=self.period_samples.tolist(),
srss=self.get_srss(),
average_spectrum=self.get_average_spectrum().to_dict(),
)
def to_json(self, separate_motion=True):
'''Convert the properties to json.
Parameters
----------
Refer to `Suite.to_dict`.
Returns
-------
`json` : str
'''
return json.dumps(self.to_dict(separate_motion=separate_motion))
def save(self, separate_motion=True, rewrite=False):
'''Save the suite to a .suite file.
Parameters
----------
`separate_motion` : boolean
If True, the motion details are not included into the file.
User has to make sure the motions are not moved.
`rewrite` : boolean
If True, all the '.motion' files will be rewritten.
otherwise, the motion files will be ignored.
Outputs
-------
Create a .suite file to store the suite.
'''
with open(f'{self.folder}/{self.name}.suite', 'w') as f:
f.write(self.to_json(separate_motion=separate_motion))
if separate_motion:
for mo in self.motions:
if not os.path.exists(f'{mo.folder}/{mo.name}.motion'):
mo.save()
print(f'Motion saved to {mo.folder}/{mo.name}.motion')
else:
if rewrite:
mo.save()
print(f'Motion {mo.folder}/{mo.name}.motion is written.')
print(f'Saved to {self.folder}/{self.name}.suite')
@staticmethod
def read_dict(it):
'''Read the dictionary representing the properties of the class.
If key 'motions' is a instance of dict, then construct motions with dict.
else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file.
Parameters
----------
`it` : dict
The dict generated by `Suite.to_dict`
Returns
-------
`suite` : `Suite`
'''
suite = Suite(
folder=it['folder'],
name=it['name'],
target_spectrum=Spectrum.read_dict(it['target_spectrum']),
period_bound=it['period_bound']
)
for i, mo_content in enumerate(it['motions']):
if isinstance(mo_content, dict):
suite.add_motion(Motion.read_dict(
mo_content), it['factors'][i])
else:
suite.add_motion(Motion.load(
f'{mo_content}.motion'), it['factors'][i])
suite.period_samples = np.array(it['period_samples'])
suite.target_vector = suite.target_spectrum.get(suite.period_samples)
suite.reset_spectrum_matrix()
return suite
@staticmethod
def load(filename):
'''Load a the suite from a file.
Parameters
----------
`filename` : str
File that stores the suite.
Returns
-------
`suite` : `Suite`
'''
if not filename.endswith('.suite'):
filename += '.suite'
with open(filename, 'r') as f:
return Suite.read_dict(json.loads(f.read()))
def copy(self):
'''Copy the instance.
Returns
-------
`suite` : `Suite`
'''
return Suite.read_dict(self.to_dict())
Classes
class Motion (folder, name, damping=0.05)
-
A class plays with ground motion. Store accel time history and accel spectrum only.
Parameters
folder
: str
the folder this motion is stored.name
: str
The motion name. The motion will be saved to '{folder}/{name}.motion'damping
: float
The damping ratio of the motion spectrum.Expand source code
class Motion(object): '''A class plays with ground motion. Store accel time history and accel spectrum only. ''' def __init__(self, folder, name, damping=0.05): ''' Parameters ---------- `folder` : str the folder this motion is stored. `name` : str The motion name. The motion will be saved to '{folder}/{name}.motion' `damping` : float The damping ratio of the motion spectrum. ''' self.folder = folder '''str. See `Motion`.''' os.makedirs(folder, exist_ok=True) self.name = name '''str. See `Motion`.''' self.accel = None '''numpy.ndarray. Acceleration time history with evenly distributed sampling points.''' self.dt = None '''float. Acceleration time sampling delta t.''' self.information = '' '''str. Store massive infomation about the motion. ''' self.spectrum = None '''`Spectrum`. The spectrum of the motion.''' self.damping = damping '''float. The damping ratio of the spectrum. Note: init with 0.05.''' def get_PGA(self): ''' Returns ------- `value` : float The peak ground acceleration. ''' return np.max(abs(self.accel)) def get_duration(self): ''' Returns ------- `value` : float The duration of the motion. ''' return (len(self.accel) - 1) * self.dt def get_times(self): ''' Returns ------- `value` : numpy.ndarray The time series of the motion ''' return np.arange(len(self.accel)) * self.dt def generate_spectrum(self, periods=None): '''Use scipy LTS signal processing module to generate the spectrum. Note that the accel spectrum is pseudo-spectrum derived from disp spectrum. So it's only accurate when damping is small. If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated, a blank spectrum will be returned. Parameters ---------- `periods` : List-like, None The period sampling points. If is None, `Spectrum.periods_standard` will be called. Returns ------- `spectrum` : `Spectrum` The generated spectrum. Outputs ------- `Motion.spectrum` will be changed. ''' if periods is None: periods = Spectrum.periods_standard() if self.dt is not None: print("Generating spectrum ...") omegas = 2 * np.pi / periods[1:] num = np.array([-1]) spectrum = np.zeros(len(periods)) spectrum[0] = self.get_PGA() for i, omega in enumerate(omegas): den = np.array([1, 2*self.damping*omega, omega**2]) _, y, _ = lsim((num, den), self.accel, self.get_times()) spectrum[i+1] = np.max(abs(y)) * omega ** 2 self.spectrum = Spectrum(periods, spectrum) else: print("Dt is not known. Assigning empty spectrum.") self.spectrum = Spectrum(periods=[], spectrum=[]) return self.spectrum def load_peer(self, peer_filename): '''Load acceleration time-history files downloaded from PEER. The strings in the file will be parsed and also stored in self.information. Parameters ---------- `peer_filename` : str Outputs ------- `Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed. ''' self.information += f'peer_filename: {peer_filename}' try: f = open(peer_filename, 'r') for _ in range(3): self.information += f.readline() s = f.readline() self.information += s re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC") mt = re_dt.match(s) try: self.dt = float(mt.group(1).replace('.', '0.')) print(f'Extracted dt = {self.dt:.4f}') except: print(f'ERROR: cannot find dt from {s}') self.dt = 0.000001 series = [] datastring = f.readlines() for dtline in datastring: for dt in dtline.split(" "): if dt: try: series.append(float(dt.replace(".", "0."))) except ValueError: break self.accel = np.array(series) print(f'Extracted npts = {len(self.accel)}') except IOError: print(f'No file named {self.name} is found.') def convert_peer(self, peer_filename): '''A recipe to convert the peer acceleration file to the .motion file. Parameters ---------- `peer_filename` : str `folder` : str the folder to save .motion file. Outputs ------- A .motion file will be created in `folder` with the PEER filename. ''' print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}') self.load_peer(peer_filename) self.generate_spectrum() self.save() def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None): '''Trim the motion with two coefficients of PGA. This method is used when a long near-zero time-history is stored in the data file. Parameters ---------- `new_name` : str `new_folder` : str, None `left`, `right` : float The coefficient of PGA to trim the time history. Search from left or right, when the target point is found, search for the nearest near-zero point. Then trim off the small values. `prepend_zero` : boolean If True, prepend a zero value to the trimmed acceleration. `log` : boolean If the log picture will be generated. `log_folder` : str, None Path for the folder to save the trim logs. If None, save in the previous folder. Returns ------- `motion` : `Motion` The trimmed motion. Outputs ------- If log, plot a figure in log_folder named '{new_name}_trimlog.png' ''' mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder pga = self.get_PGA() # search left i = 0 while abs(self.accel[i]) < pga * left: i += 1 while i > 0 and self.accel[i] * self.accel[i-1] > 0: i -= 1 i_left = i # search right i = -1 while abs(self.accel[i]) < pga * right: i -= 1 while i < -1 and self.accel[i] * self.accel[i+1] > 0: i += 1 i_right = len(self.accel) + i if prepend_zero: mo.accel = np.hstack([0, self.accel[i_left:i_right]]) else: mo.accel = self.accel[i_left:i_right] mo.generate_spectrum() print( f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') if log: log_folder = self.folder if log_folder is None else log_folder os.makedirs(log_folder, exist_ok=True) _, ax = plt.subplots(3, 1, figsize=(7, 5)) ax[0].plot(self.get_times(), self.accel, label='before') ax[1].plot(mo.get_times(), mo.accel, label='after') ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') for i in range(3): ax[i].legend() plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200) return mo def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None): '''Change dt for the accel. Linearly interpolate the acceleration, and re-sample. Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum. Parameters ---------- `dt` : float New sampling delta t. `new_name` : str `new_folder` : str, None `log` : boolean If log, create a log picture comparing the before and after accels and spectrum. `log_folder` : str, None Path for the folder to save the trim logs. If None, save in the previous folder. Returns ------- `motion` : `Motion` Outputs ------- If log, create a picture named '{log_folder}/{new_name}_dtlog.png'. ''' if dt == self.dt: print(f'Motion {self.name} do not need to change dt') return self.copy() else: mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder f = interp1d(self.get_times(), self.accel) npts = np.floor(self.dt * len(self.accel) / dt) mo.accel = f(np.arange(npts)*dt) mo.dt = dt mo.generate_spectrum() print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.') if log: log_folder = self.folder if log_folder is None else log_folder os.makedirs(log_folder, exist_ok=True) _, ax = plt.subplots(3, 1, figsize=(7, 5)) ax[0].plot(self.get_times(), self.accel, label='before') ax[1].plot(mo.get_times(), mo.accel, label='after') ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') for i in range(3): ax[i].legend() plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.') plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200) return mo def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None): '''Truncate the acceleration. Sometimes, a ground motion file may have two peaks or more. So if only one peak is needed, this method can be used to skip the other peaks. Parameters ---------- `time` : float The accels after the time will be cut off. `new_name` : str `new_folder` : str, None `log` : boolean `log_folder` : str, None Returns ------- `motion` : `Motion` Outputs ------- If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created. ''' i = round(time / self.dt) mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder if i >= len(self.accel): print(f"WARNING: time is longer than duration. {self.name} is not truncated.") return mo mo.accel = self.accel[:i] mo.generate_spectrum() print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') if log: log_folder = self.folder if log_folder is None else log_folder os.makedirs(log_folder, exist_ok=True) _, ax = plt.subplots(3, 1, figsize=(7, 5)) ax[0].plot(self.get_times(), self.accel, label='before') ax[1].plot(mo.get_times(), mo.accel, label='after') ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') for i in range(3): ax[i].legend() plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200) return mo def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1): '''Scale the ground motion. Parameters ---------- `new_name` : str `new_folder` : str, None `factor_x` : float The scaling factor on the time/period axis. `factor_y` : float The scaling factor on the accel axis. Returns ------- `motion` : Motion ''' mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder if factor_x != 1: mo.dt = self.dt * factor_x if factor_y != 1: mo.accel = self.accel * factor_y mo.spectrum = self.spectrum.scale(factor_x, factor_y) return mo def plot_timehistory(self, folder=None, save=True, engine='pyplot'): '''Plot the time history of the acceleration. Parameters ---------- `folder` : str, None Store folder. If None, use `Motion.folder` `save` : boolean Save the figure or not. `engine` : 'pyplot' or 'plotly' Outputs ------- if save is True, create a figure '{folder}/{self.name}_th'. ''' folder = self.folder if folder is None else folder if engine == 'pyplot': plt.figure() t = self.get_times() plt.plot(t, self.accel) if save: plt.savefig(f'{folder}/{self.name}_th.png', dpi=200) else: plt.show() elif engine == 'plotly': fig = go.Figure() fig.add_trace(go.Scatter( x=self.get_times(), y=self.accel, mode='lines', name=self.name )) if save: fig.write_html(f'{folder}/{self.name}_th.html') else: fig.show() else: print('ERROR: engine should be in "pyplot" or "plotly"') def plot(self, folder=None, save=True, engine='pyplot'): '''Plot the time history and the spectrum. Parameters ---------- See `Motion.plot_timehistory` Outputs ------- Create a figure named '{folder}/{name}' ''' folder = self.folder if folder is None else folder if engine == 'pyplot': _, ax = plt.subplots(2, 1, figsize=(7, 8)) ax[0].plot(self.get_times(), self.accel) ax[0].set_xlabel('Time(s)') ax[0].set_ylabel('Acceleration(g)') ax[0].set_title(f'PGA={self.get_PGA():.4f}') ax[1].plot(self.spectrum.periods, self.spectrum.spectrum) ax[1].set_xlabel('Period(s)') ax[1].set_ylabel('Spectrum Acceleration(g)') ax[1].set_title(f'damping={self.damping:.4f}') plt.subplots_adjust(hspace=0.3) if save: plt.savefig(f'{folder}/{self.name}.png', dpi=200) else: plt.show() elif engine == 'plotly': fig = make_subplots(rows=1, cols=2) fig.add_trace(go.Scatter( x=self.get_times(), y=self.accel, mode='lines', name='accel' ), row=1, col=2) fig.add_trace(go.Scatter( x=self.spectrum.periods, y=self.spectrum.spectrum, mode='lines', name='spectrum' ), row=1, col=1) if save: fig.write_html(f'{folder}/{self.name}.html') else: fig.show() else: print('Engine should be in "pyplot" or "plotly"') def write_accel(self, filename, time=True): '''Write the acceleration timehistory to a single file. Parameters ---------- `filename` : str The file path to store the accelration. `time` : boolean If True, a time column is created. To know dt in the file content, time should be True. Otherwise, dt should be stored in the filename. Outputs ------- An text file with acceleration information will be written. ''' if time: data = np.vstack([self.get_times(), self.accel]) np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel") else: np.savetxt(filename, self.accel, fmt="%.7e") def write_info(self, filename): '''Write the information to a single file. Parameters ---------- `filename` : str Outputs ------- A text file with information will be created. ''' with open(filename, 'w') as f: f.write(f'dt={self.dt:.3}\n') f.write(f'npts={len(self.accel)}\n') f.write(f'damping={self.damping:.3}\n') f.write(self.information) def write_spectrum(self, filename): '''Write the spectrum to a single file. Use the form of csv. Parameters ---------- `filename` : str Outputs ------- A csv file will be created. ''' self.spectrum.write_csv(filename) def read_accel(self, filename, dt=True): '''Load an single-column acceleration file to the current instance. Parameters ---------- `dt` : float, boolean if dt is float, `Motion.dt` will be set to dt. Else if dt is True, `Motion.dt` will be found in the file. Outputs ------- Change `Motion.dt` and `Motion.accel` in the CURRENT instance. ''' if not isinstance(dt, float): data = np.loadtxt(filename) self.accel = data[:, 1] self.dt = data[1, 0] else: self.accel = np.loadtxt(filename) self.dt = dt def read_spectrum(self, filename): '''Read the spectrum from csv file. Parameters ---------- `filename` : str Outputs ------- The `Motion.spectrum` of the CURRENT instance will be changed. ''' spectrum = Spectrum.read_csv(filename) self.spectrum = spectrum def read_info(self, filename): '''Read info from text file. Parameters ---------- `filename` : str Outputs ------- The `Motion.information` of the CURRENT instance will be changed. ''' with open(filename, 'r') as f: self.dt = float(f.readline().replace("dt=", "")) _ = int(f.readline().replace("npts=", "")) self.damping = float(f.readline().replace("damping=", "")) self.information = f.read() def to_dict(self): '''Convert the instance properties to dict. Returns ------- `it` : dict ''' return dict( dt=self.dt, npts=len(self.accel), damping=self.damping, information=self.information, accel=self.accel.tolist(), spectrum=self.spectrum.to_dict(), folder=self.folder, name=self.name, ) def to_json(self): '''Convert the instance properties to string using json format. Returns ------- `string` : str ''' return json.dumps(self.to_dict()) def save(self): '''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name` Outputs ------- A file named '{self.folder}/{self.name}.motion' will be created. ''' with open(f'{self.folder}/{self.name}.motion', 'w') as f: f.write(self.to_json()) @staticmethod def read_dict(it): '''Create a instance by the dict generated by `Motion.to_dict`. Parameters ---------- `it` : dict Returns ------- `mo` : `Motion` ''' mo = Motion(folder=it['folder'], name=it['name']) mo.dt = it['dt'] mo.damping = it['damping'] mo.information = it['information'] mo.accel = np.array(it['accel']) mo.spectrum = Spectrum.read_dict(it['spectrum']) return mo @staticmethod def read_json(string): '''Create a new instance by the string generated by `Motion.to_json`. Parameters ---------- `string` : str Returns ------- `mo` : `Motion` ''' it = json.loads(string) return Motion.read_dict(it) @staticmethod def load(filename): '''Load the file created from `Motion.save` method. Parameters ---------- `filename` : str Returns ------- `mo` : `Motion` ''' if not filename.endswith('.motion'): filename += '.motion' with open(filename, 'r') as f: return Motion.read_json(f.read()) def copy(self): '''Copy the instance. Returns ------- `mo` : `Motion` ''' return Motion.read_dict(self.to_dict())
Static methods
def load(filename)
-
Load the file created from
Motion.save()
method.Parameters
filename
: strReturns
mo
:Motion
Expand source code
@staticmethod def load(filename): '''Load the file created from `Motion.save` method. Parameters ---------- `filename` : str Returns ------- `mo` : `Motion` ''' if not filename.endswith('.motion'): filename += '.motion' with open(filename, 'r') as f: return Motion.read_json(f.read())
def read_dict(it)
-
Create a instance by the dict generated by
Motion.to_dict()
.Parameters
it
: dictReturns
mo
:Motion
Expand source code
@staticmethod def read_dict(it): '''Create a instance by the dict generated by `Motion.to_dict`. Parameters ---------- `it` : dict Returns ------- `mo` : `Motion` ''' mo = Motion(folder=it['folder'], name=it['name']) mo.dt = it['dt'] mo.damping = it['damping'] mo.information = it['information'] mo.accel = np.array(it['accel']) mo.spectrum = Spectrum.read_dict(it['spectrum']) return mo
def read_json(string)
-
Create a new instance by the string generated by
Motion.to_json()
.Parameters
string
: strReturns
mo
:Motion
Expand source code
@staticmethod def read_json(string): '''Create a new instance by the string generated by `Motion.to_json`. Parameters ---------- `string` : str Returns ------- `mo` : `Motion` ''' it = json.loads(string) return Motion.read_dict(it)
Instance variables
var accel
-
numpy.ndarray. Acceleration time history with evenly distributed sampling points.
var damping
-
float. The damping ratio of the spectrum. Note: init with 0.05.
var dt
-
float. Acceleration time sampling delta t.
var folder
-
str. See
Motion
. var information
-
str. Store massive infomation about the motion.
var name
-
str. See
Motion
. var spectrum
-
Spectrum
. The spectrum of the motion.
Methods
def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None)
-
Change dt for the accel. Linearly interpolate the acceleration, and re-sample. Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum.
Parameters
dt
: float
New sampling delta t.new_name
: strnew_folder
: str, Nonelog
: boolean
If log, create a log picture comparing the before and after accels and spectrum.log_folder
: str, None
Path for the folder to save the trim logs. If None, save in the previous folder.Returns
motion
:Motion
Outputs
If log, create a picture named '{log_folder}/{new_name}_dtlog.png'.
Expand source code
def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None): '''Change dt for the accel. Linearly interpolate the acceleration, and re-sample. Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum. Parameters ---------- `dt` : float New sampling delta t. `new_name` : str `new_folder` : str, None `log` : boolean If log, create a log picture comparing the before and after accels and spectrum. `log_folder` : str, None Path for the folder to save the trim logs. If None, save in the previous folder. Returns ------- `motion` : `Motion` Outputs ------- If log, create a picture named '{log_folder}/{new_name}_dtlog.png'. ''' if dt == self.dt: print(f'Motion {self.name} do not need to change dt') return self.copy() else: mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder f = interp1d(self.get_times(), self.accel) npts = np.floor(self.dt * len(self.accel) / dt) mo.accel = f(np.arange(npts)*dt) mo.dt = dt mo.generate_spectrum() print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.') if log: log_folder = self.folder if log_folder is None else log_folder os.makedirs(log_folder, exist_ok=True) _, ax = plt.subplots(3, 1, figsize=(7, 5)) ax[0].plot(self.get_times(), self.accel, label='before') ax[1].plot(mo.get_times(), mo.accel, label='after') ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') for i in range(3): ax[i].legend() plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.') plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200) return mo
def convert_peer(self, peer_filename)
-
A recipe to convert the peer acceleration file to the .motion file.
Parameters
peer_filename
: strfolder
: str
the folder to save .motion file.Outputs
A .motion file will be created in
folder
with the PEER filename.Expand source code
def convert_peer(self, peer_filename): '''A recipe to convert the peer acceleration file to the .motion file. Parameters ---------- `peer_filename` : str `folder` : str the folder to save .motion file. Outputs ------- A .motion file will be created in `folder` with the PEER filename. ''' print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}') self.load_peer(peer_filename) self.generate_spectrum() self.save()
def copy(self)
-
Copy the instance.
Returns
mo
:Motion
Expand source code
def copy(self): '''Copy the instance. Returns ------- `mo` : `Motion` ''' return Motion.read_dict(self.to_dict())
def generate_spectrum(self, periods=None)
-
Use scipy LTS signal processing module to generate the spectrum.
Note that the accel spectrum is pseudo-spectrum derived from disp spectrum.
So it's only accurate when damping is small.
IfMotion.dt
is still None, which usually occurs if the PEER file is not well formated, a blank spectrum will be returned.Parameters
periods
: List-like, None
The period sampling points.
If is None,Spectrum.periods_standard()
will be called.Returns
spectrum
:Spectrum
The generated spectrum.Outputs
Motion.spectrum
will be changed.Expand source code
def generate_spectrum(self, periods=None): '''Use scipy LTS signal processing module to generate the spectrum. Note that the accel spectrum is pseudo-spectrum derived from disp spectrum. So it's only accurate when damping is small. If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated, a blank spectrum will be returned. Parameters ---------- `periods` : List-like, None The period sampling points. If is None, `Spectrum.periods_standard` will be called. Returns ------- `spectrum` : `Spectrum` The generated spectrum. Outputs ------- `Motion.spectrum` will be changed. ''' if periods is None: periods = Spectrum.periods_standard() if self.dt is not None: print("Generating spectrum ...") omegas = 2 * np.pi / periods[1:] num = np.array([-1]) spectrum = np.zeros(len(periods)) spectrum[0] = self.get_PGA() for i, omega in enumerate(omegas): den = np.array([1, 2*self.damping*omega, omega**2]) _, y, _ = lsim((num, den), self.accel, self.get_times()) spectrum[i+1] = np.max(abs(y)) * omega ** 2 self.spectrum = Spectrum(periods, spectrum) else: print("Dt is not known. Assigning empty spectrum.") self.spectrum = Spectrum(periods=[], spectrum=[]) return self.spectrum
def get_PGA(self)
-
Returns
value
: float
The peak ground acceleration.Expand source code
def get_PGA(self): ''' Returns ------- `value` : float The peak ground acceleration. ''' return np.max(abs(self.accel))
def get_duration(self)
-
Returns
value
: float
The duration of the motion.Expand source code
def get_duration(self): ''' Returns ------- `value` : float The duration of the motion. ''' return (len(self.accel) - 1) * self.dt
def get_times(self)
-
Returns
value
: numpy.ndarray
The time series of the motionExpand source code
def get_times(self): ''' Returns ------- `value` : numpy.ndarray The time series of the motion ''' return np.arange(len(self.accel)) * self.dt
def load_peer(self, peer_filename)
-
Load acceleration time-history files downloaded from PEER.
The strings in the file will be parsed and also stored in self.information.Parameters
peer_filename
: strOutputs
Motion.dt
,Motion.accel
,Motion.information
,Motion.spectrum
will all be changed.Expand source code
def load_peer(self, peer_filename): '''Load acceleration time-history files downloaded from PEER. The strings in the file will be parsed and also stored in self.information. Parameters ---------- `peer_filename` : str Outputs ------- `Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed. ''' self.information += f'peer_filename: {peer_filename}' try: f = open(peer_filename, 'r') for _ in range(3): self.information += f.readline() s = f.readline() self.information += s re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC") mt = re_dt.match(s) try: self.dt = float(mt.group(1).replace('.', '0.')) print(f'Extracted dt = {self.dt:.4f}') except: print(f'ERROR: cannot find dt from {s}') self.dt = 0.000001 series = [] datastring = f.readlines() for dtline in datastring: for dt in dtline.split(" "): if dt: try: series.append(float(dt.replace(".", "0."))) except ValueError: break self.accel = np.array(series) print(f'Extracted npts = {len(self.accel)}') except IOError: print(f'No file named {self.name} is found.')
def plot(self, folder=None, save=True, engine='pyplot')
-
Plot the time history and the spectrum.
Parameters
Outputs
Create a figure named '{folder}/{name}'
Expand source code
def plot(self, folder=None, save=True, engine='pyplot'): '''Plot the time history and the spectrum. Parameters ---------- See `Motion.plot_timehistory` Outputs ------- Create a figure named '{folder}/{name}' ''' folder = self.folder if folder is None else folder if engine == 'pyplot': _, ax = plt.subplots(2, 1, figsize=(7, 8)) ax[0].plot(self.get_times(), self.accel) ax[0].set_xlabel('Time(s)') ax[0].set_ylabel('Acceleration(g)') ax[0].set_title(f'PGA={self.get_PGA():.4f}') ax[1].plot(self.spectrum.periods, self.spectrum.spectrum) ax[1].set_xlabel('Period(s)') ax[1].set_ylabel('Spectrum Acceleration(g)') ax[1].set_title(f'damping={self.damping:.4f}') plt.subplots_adjust(hspace=0.3) if save: plt.savefig(f'{folder}/{self.name}.png', dpi=200) else: plt.show() elif engine == 'plotly': fig = make_subplots(rows=1, cols=2) fig.add_trace(go.Scatter( x=self.get_times(), y=self.accel, mode='lines', name='accel' ), row=1, col=2) fig.add_trace(go.Scatter( x=self.spectrum.periods, y=self.spectrum.spectrum, mode='lines', name='spectrum' ), row=1, col=1) if save: fig.write_html(f'{folder}/{self.name}.html') else: fig.show() else: print('Engine should be in "pyplot" or "plotly"')
def plot_timehistory(self, folder=None, save=True, engine='pyplot')
-
Plot the time history of the acceleration.
Parameters
folder
: str, None
Store folder. If None, useMotion.folder
save
: boolean Save the figure or not.engine
: 'pyplot' or 'plotly'Outputs
if save is True, create a figure '{folder}/{self.name}_th'.
Expand source code
def plot_timehistory(self, folder=None, save=True, engine='pyplot'): '''Plot the time history of the acceleration. Parameters ---------- `folder` : str, None Store folder. If None, use `Motion.folder` `save` : boolean Save the figure or not. `engine` : 'pyplot' or 'plotly' Outputs ------- if save is True, create a figure '{folder}/{self.name}_th'. ''' folder = self.folder if folder is None else folder if engine == 'pyplot': plt.figure() t = self.get_times() plt.plot(t, self.accel) if save: plt.savefig(f'{folder}/{self.name}_th.png', dpi=200) else: plt.show() elif engine == 'plotly': fig = go.Figure() fig.add_trace(go.Scatter( x=self.get_times(), y=self.accel, mode='lines', name=self.name )) if save: fig.write_html(f'{folder}/{self.name}_th.html') else: fig.show() else: print('ERROR: engine should be in "pyplot" or "plotly"')
def read_accel(self, filename, dt=True)
-
Load an single-column acceleration file to the current instance.
Parameters
dt
: float, boolean if dt is float,Motion.dt
will be set to dt. Else if dt is True,Motion.dt
will be found in the file.Outputs
Change
Motion.dt
andMotion.accel
in the CURRENT instance.Expand source code
def read_accel(self, filename, dt=True): '''Load an single-column acceleration file to the current instance. Parameters ---------- `dt` : float, boolean if dt is float, `Motion.dt` will be set to dt. Else if dt is True, `Motion.dt` will be found in the file. Outputs ------- Change `Motion.dt` and `Motion.accel` in the CURRENT instance. ''' if not isinstance(dt, float): data = np.loadtxt(filename) self.accel = data[:, 1] self.dt = data[1, 0] else: self.accel = np.loadtxt(filename) self.dt = dt
def read_info(self, filename)
-
Read info from text file.
Parameters
filename
: strOutputs
The
Motion.information
of the CURRENT instance will be changed.Expand source code
def read_info(self, filename): '''Read info from text file. Parameters ---------- `filename` : str Outputs ------- The `Motion.information` of the CURRENT instance will be changed. ''' with open(filename, 'r') as f: self.dt = float(f.readline().replace("dt=", "")) _ = int(f.readline().replace("npts=", "")) self.damping = float(f.readline().replace("damping=", "")) self.information = f.read()
def read_spectrum(self, filename)
-
Read the spectrum from csv file.
Parameters
filename
: strOutputs
The
Motion.spectrum
of the CURRENT instance will be changed.Expand source code
def read_spectrum(self, filename): '''Read the spectrum from csv file. Parameters ---------- `filename` : str Outputs ------- The `Motion.spectrum` of the CURRENT instance will be changed. ''' spectrum = Spectrum.read_csv(filename) self.spectrum = spectrum
def save(self)
-
Save the instance to a file, which is defined by
Motion.folder
andMotion.name
Outputs
A file named '{self.folder}/{self.name}.motion' will be created.
Expand source code
def save(self): '''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name` Outputs ------- A file named '{self.folder}/{self.name}.motion' will be created. ''' with open(f'{self.folder}/{self.name}.motion', 'w') as f: f.write(self.to_json())
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1)
-
Scale the ground motion.
Parameters
new_name
: strnew_folder
: str, Nonefactor_x
: float
The scaling factor on the time/period axis.factor_y
: float The scaling factor on the accel axis.Returns
motion
: MotionExpand source code
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1): '''Scale the ground motion. Parameters ---------- `new_name` : str `new_folder` : str, None `factor_x` : float The scaling factor on the time/period axis. `factor_y` : float The scaling factor on the accel axis. Returns ------- `motion` : Motion ''' mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder if factor_x != 1: mo.dt = self.dt * factor_x if factor_y != 1: mo.accel = self.accel * factor_y mo.spectrum = self.spectrum.scale(factor_x, factor_y) return mo
def to_dict(self)
-
Convert the instance properties to dict.
Returns
it
: dictExpand source code
def to_dict(self): '''Convert the instance properties to dict. Returns ------- `it` : dict ''' return dict( dt=self.dt, npts=len(self.accel), damping=self.damping, information=self.information, accel=self.accel.tolist(), spectrum=self.spectrum.to_dict(), folder=self.folder, name=self.name, )
def to_json(self)
-
Convert the instance properties to string using json format.
Returns
string
: strExpand source code
def to_json(self): '''Convert the instance properties to string using json format. Returns ------- `string` : str ''' return json.dumps(self.to_dict())
def trim(self, new_name, new_folder=None, left=0.001, right=0.001, prepend_zero=True, log=True, log_folder=None)
-
Trim the motion with two coefficients of PGA.
This method is used when a long near-zero time-history is stored in the data file.Parameters
new_name
: strnew_folder
: str, Noneleft
,right
: float The coefficient of PGA to trim the time history. Search from left or right, when the target point is found, search for the nearest near-zero point. Then trim off the small values.prepend_zero
: boolean
If True, prepend a zero value to the trimmed acceleration.log
: boolean
If the log picture will be generated.log_folder
: str, None Path for the folder to save the trim logs. If None, save in the previous folder.Returns
motion
:Motion
The trimmed motion.Outputs
If log, plot a figure in log_folder named '{new_name}_trimlog.png'
Expand source code
def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None): '''Trim the motion with two coefficients of PGA. This method is used when a long near-zero time-history is stored in the data file. Parameters ---------- `new_name` : str `new_folder` : str, None `left`, `right` : float The coefficient of PGA to trim the time history. Search from left or right, when the target point is found, search for the nearest near-zero point. Then trim off the small values. `prepend_zero` : boolean If True, prepend a zero value to the trimmed acceleration. `log` : boolean If the log picture will be generated. `log_folder` : str, None Path for the folder to save the trim logs. If None, save in the previous folder. Returns ------- `motion` : `Motion` The trimmed motion. Outputs ------- If log, plot a figure in log_folder named '{new_name}_trimlog.png' ''' mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder pga = self.get_PGA() # search left i = 0 while abs(self.accel[i]) < pga * left: i += 1 while i > 0 and self.accel[i] * self.accel[i-1] > 0: i -= 1 i_left = i # search right i = -1 while abs(self.accel[i]) < pga * right: i -= 1 while i < -1 and self.accel[i] * self.accel[i+1] > 0: i += 1 i_right = len(self.accel) + i if prepend_zero: mo.accel = np.hstack([0, self.accel[i_left:i_right]]) else: mo.accel = self.accel[i_left:i_right] mo.generate_spectrum() print( f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') if log: log_folder = self.folder if log_folder is None else log_folder os.makedirs(log_folder, exist_ok=True) _, ax = plt.subplots(3, 1, figsize=(7, 5)) ax[0].plot(self.get_times(), self.accel, label='before') ax[1].plot(mo.get_times(), mo.accel, label='after') ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') for i in range(3): ax[i].legend() plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200) return mo
def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None)
-
Truncate the acceleration. Sometimes, a ground motion file may have two peaks or more. So if only one peak is needed, this method can be used to skip the other peaks.
Parameters
time
: float The accels after the time will be cut off.new_name
: strnew_folder
: str, Nonelog
: booleanlog_folder
: str, NoneReturns
motion
:Motion
Outputs
If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created.
Expand source code
def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None): '''Truncate the acceleration. Sometimes, a ground motion file may have two peaks or more. So if only one peak is needed, this method can be used to skip the other peaks. Parameters ---------- `time` : float The accels after the time will be cut off. `new_name` : str `new_folder` : str, None `log` : boolean `log_folder` : str, None Returns ------- `motion` : `Motion` Outputs ------- If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created. ''' i = round(time / self.dt) mo = self.copy() mo.name = new_name mo.folder = new_folder if new_folder is not None else self.folder if i >= len(self.accel): print(f"WARNING: time is longer than duration. {self.name} is not truncated.") return mo mo.accel = self.accel[:i] mo.generate_spectrum() print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') if log: log_folder = self.folder if log_folder is None else log_folder os.makedirs(log_folder, exist_ok=True) _, ax = plt.subplots(3, 1, figsize=(7, 5)) ax[0].plot(self.get_times(), self.accel, label='before') ax[1].plot(mo.get_times(), mo.accel, label='after') ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') for i in range(3): ax[i].legend() plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200) return mo
def write_accel(self, filename, time=True)
-
Write the acceleration timehistory to a single file.
Parameters
filename
: str
The file path to store the accelration.time
: boolean
If True, a time column is created. To know dt in the file content, time should be True. Otherwise, dt should be stored in the filename.Outputs
An text file with acceleration information will be written.
Expand source code
def write_accel(self, filename, time=True): '''Write the acceleration timehistory to a single file. Parameters ---------- `filename` : str The file path to store the accelration. `time` : boolean If True, a time column is created. To know dt in the file content, time should be True. Otherwise, dt should be stored in the filename. Outputs ------- An text file with acceleration information will be written. ''' if time: data = np.vstack([self.get_times(), self.accel]) np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel") else: np.savetxt(filename, self.accel, fmt="%.7e")
def write_info(self, filename)
-
Write the information to a single file.
Parameters
filename
: strOutputs
A text file with information will be created.
Expand source code
def write_info(self, filename): '''Write the information to a single file. Parameters ---------- `filename` : str Outputs ------- A text file with information will be created. ''' with open(filename, 'w') as f: f.write(f'dt={self.dt:.3}\n') f.write(f'npts={len(self.accel)}\n') f.write(f'damping={self.damping:.3}\n') f.write(self.information)
def write_spectrum(self, filename)
-
Write the spectrum to a single file. Use the form of csv.
Parameters
filename
: strOutputs
A csv file will be created.
Expand source code
def write_spectrum(self, filename): '''Write the spectrum to a single file. Use the form of csv. Parameters ---------- `filename` : str Outputs ------- A csv file will be created. ''' self.spectrum.write_csv(filename)
class Spectrum (periods, spectrum)
-
A class storing a spectrum.
It can be init-ed simply with (periods, spectrum) sampling points.
It can also be init-ed by static functions to generate Code-confined spectrums.Parameters
periods
: List, numpy.ndarray
List of sampling periods.
It can be generated by the static functionSpectrum.periods_standard()
. Otherwise, users can specify desired period sampling points.spectrum
: List, numpy.ndarray
List of the spectrum points corresponding to sampling periods.Expand source code
class Spectrum(): '''A class storing a spectrum. It can be init-ed simply with (periods, spectrum) sampling points. It can also be init-ed by static functions to generate Code-confined spectrums. Parameters ---------- `periods` : List, numpy.ndarray List of sampling periods. It can be generated by the static function `Spectrum.periods_standard` . Otherwise, users can specify desired period sampling points. `spectrum` : List, numpy.ndarray List of the spectrum points corresponding to sampling periods. ''' def __init__(self, periods, spectrum): self.periods = np.array(periods) '''numpy.ndarray, sampling periods''' self.spectrum = np.array(spectrum) '''numpy.ndarray, spectrum corresponding to sampling periods.''' def match(self, target, start, end, step=0.1): '''Match the current spectrum with a target spectrum. Use Least Square method. Parameters ---------- `target` : `Spectrum` the spectrum to be matched. `start`, `end` : float the matching period boundaries. `step` : float evenly sample the periods with the step. Returns ------- `coefficient` : float the coefficient to match the current spectrum to the target. `srss` : float the srss after match. ''' periods = np.arange(start, end+step, step) this_spectrum = np.array([self.get(p) for p in periods]) that_spectrum = np.array([target.get(p) for p in periods]) coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2) srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5 return coefficient, srss def get(self, period): '''Get the spectrum value. Either a list or a number is accepted. If the period exceeds the largest sampling period, the last spectrum value is returned. The spectrum value not on sampling points are linearly interpolated. Parameters ---------- `period` : number, list, numpy.ndarray The interested period. Returns ------- `spectrum` : number, numpy.ndarray if period is list-like, return a ndarray. if period is a number, return a number. ''' if not hasattr(period, '__iter__'): i = self.periods.searchsorted(period) if self.periods[i] == period: return self.spectrum[i] elif i >= len(self.periods): print("Warning: given period exceeds the sampling range.") return self.spectrum[-1] else: return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period) else: return np.array([self.get(p) for p in period]) def scale(self, factor_x=1, factor_y=1): '''Scale the spectrum. Parameters ---------- `factor_x` : number scaling factor in x direction (periods) `factor_y` : number scaling factor in y direction (spectrum) Returns ------- `spectrum` : `Spectrum` ''' return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y) def plot(self, filename=None, engine='pyplot'): '''Plot the spectrum. Parameters ---------- `filename` : string, None if None, show it. Else, save it to the filename `engine` : 'pyplot' or 'plotly' Plot engine. Outputs ------- A file with name `filename`.png or `filename`.html will be generated. ''' if engine == 'pyplot': plt.figure(figsize=(4, 3)) plt.plot(self.periods, self.spectrum, marker='o') plt.xlabel('Period (s)') plt.ylabel('Spetrum') plt.xlim([0, self.periods[-1]]) if filename is None: plt.show() else: plt.savefig(filename+'.png', dpi=200) elif engine == 'plotly': fig = go.Figure() fig.add_scatter(dict( x='self.periods', y='self.spectrum', mode='lines+markers', )) if filename is None: fig.show() else: fig.write_html(filename+'.html') else: print('ERROR: engine should be in "pyplot" or "plotly"') def to_dict(self): '''Convert the instance properties to a dict. Returns ------- `it` : dict ''' return dict( periods=self.periods.tolist(), spectrum=self.spectrum.tolist() ) def to_json(self): '''Convert the properties to json. Returns ------- `json` : string ''' return json.dumps(self.to_dict()) def save(self, filename): '''Save the spectrum to a file. Currently use json form. Parameters ---------- `filename` : string The file name to be saved. The extension '.spectrum' will be automatically added. Outputs ------- A file with name `filename`.spectrum will be generated. ''' with open(filename + '.spectrum', 'w') as f: f.write(self.to_json()) def write_csv(self, filename): '''Write the spectrum to a csv file. Parameters ---------- `filename` : string The extension '.csv' will be automatically added. Outputs ------- A file named `filename`.csv will be generated. ''' np.savetxt(filename+'.csv', np.vstack( [self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum') @staticmethod def read_csv(filename): '''Create an instance by reading the written csv file. Parameters ---------- `filename` : string The filename to read. Returns ------- `spectrum` : `Spectrum` New instance. ''' if not filename.endswith('.csv'): filename += '.csv' data = np.loadtxt(filename, delimiter=',') return Spectrum(periods=data[:, 0], spectrum=data[:, 1]) @staticmethod def read_dict(it): '''Create an instance from the dict. Parameters ---------- `it` : dict The dict generated by `Spectrum.to_dict` Returns ------- `spectrum` : `Spectrum` New instance. ''' return Spectrum(periods=it['periods'], spectrum=it['spectrum']) @staticmethod def read_json(string): '''Create an instance from json string. Parameters ---------- `string` : str The json string Returns ------- `spectrum` : `Spectrum` New instance. ''' it = json.loads(string) return Spectrum.read_dict(it) @staticmethod def load(filename): '''Load the spectrum from file. Parameters ---------- `filename` : str Returns ------- `spectrum` : `Spectrum` New instance. ''' if not filename.endswith('.spectrum'): filename += '.spectrum' with open(filename, 'r') as f: string = f.read() return Spectrum.read_json(string) @staticmethod def periods_standard(until=6): '''Get a standard sampling of the periods. The sampling points are more sparse with the increase of period. Parameters ---------- `until` : number The end of the period sampling list. It should >= 4.0. Returns ------- `periods` : numpy.ndarray The standard sampling points. ''' assert until >= 4 return np.hstack([np.arange(0, 0.2, 0.01), np.arange(0.2, 0.5, 0.02), np.arange(0.5, 1.0, 0.05), np.arange(1.0, 2.0, 0.1), np.arange(2.0, 4.0, 0.2), np.arange(4.0, until+0.5, 0.5)]) @staticmethod def chinese_point(period, intensity, major, site, group, damping=0.05): '''Get spectrum point for Chinese code, given a period. Parameters ---------- `period` : number The given period. `intensity` : number Intensity represented by a number. Use 7.5 or 8.5 for half-level intensity. `major` : boolean If True, it's for major (rare) earthquake. If False, for minor (frequent) earthquake. `site` : int Site number (Roman number in the code. 0 for I0, 1 for I1.) `group` : int Site class (Chinese number in the code.) `damping` : float Spectrum damping. Returns ------- `value` : float the specific spectrum point value corresponding to the period. ''' # alpha max if intensity == 6: alphamax = 0.28 if major else 0.04 elif intensity == 7: alphamax = 0.50 if major else 0.08 elif intensity == 7.5: alphamax = 0.72 if major else 0.12 elif intensity == 8: alphamax = 0.9 if major else 0.16 elif intensity == 8.5: alphamax = 1.2 if major else 0.24 elif intensity == 9: alphamax = 1.40 if major else 0.32 else: raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]") # tg tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65], [0.25, 0.3, 0.4, 0.55, 0.75], [0.3, 0.35, 0.45, 0.65, 0.9]] assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4] tg = tg_table[group-1][site] # other parameters gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping) eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping) eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping) # calculate if period < 0.1: return ((10*eta2 - 4.5) * period + 0.45) * alphamax elif period < tg: return eta2 * alphamax elif period < 5*tg: return (tg/period)**gamma * eta2 * alphamax elif period <= 6: return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax else: return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax @staticmethod def chinese(intensity=8, major=True, site=3, group=2, damping=0.05): '''Return a new spectrum instance that confines to Chinese Code. Use `Spectrum.periods_standard` to get sampling periods. Parameters ---------- refer to `Spectrum.chinese_point` Returns ------- `spectrum` : `Spectrum` ''' periods = Spectrum.periods_standard(until=6) spectrum = [Spectrum.chinese_point(p, intensity=intensity, major=major, site=site, group=group, damping=damping) for p in periods] return Spectrum(periods, spectrum)
Static methods
def chinese(intensity=8, major=True, site=3, group=2, damping=0.05)
-
Return a new spectrum instance that confines to Chinese Code.
UseSpectrum.periods_standard()
to get sampling periods.Parameters
refer to
Spectrum.chinese_point()
Returns
spectrum
:Spectrum
Expand source code
@staticmethod def chinese(intensity=8, major=True, site=3, group=2, damping=0.05): '''Return a new spectrum instance that confines to Chinese Code. Use `Spectrum.periods_standard` to get sampling periods. Parameters ---------- refer to `Spectrum.chinese_point` Returns ------- `spectrum` : `Spectrum` ''' periods = Spectrum.periods_standard(until=6) spectrum = [Spectrum.chinese_point(p, intensity=intensity, major=major, site=site, group=group, damping=damping) for p in periods] return Spectrum(periods, spectrum)
def chinese_point(period, intensity, major, site, group, damping=0.05)
-
Get spectrum point for Chinese code, given a period.
Parameters
period
: number
The given period.intensity
: number
Intensity represented by a number.
Use 7.5 or 8.5 for half-level intensity.major
: boolean If True, it's for major (rare) earthquake.
If False, for minor (frequent) earthquake.site
: int Site number (Roman number in the code. 0 for I0, 1 for I1.)group
: int
Site class (Chinese number in the code.)damping
: float
Spectrum damping.Returns
value
: float the specific spectrum point value corresponding to the period.Expand source code
@staticmethod def chinese_point(period, intensity, major, site, group, damping=0.05): '''Get spectrum point for Chinese code, given a period. Parameters ---------- `period` : number The given period. `intensity` : number Intensity represented by a number. Use 7.5 or 8.5 for half-level intensity. `major` : boolean If True, it's for major (rare) earthquake. If False, for minor (frequent) earthquake. `site` : int Site number (Roman number in the code. 0 for I0, 1 for I1.) `group` : int Site class (Chinese number in the code.) `damping` : float Spectrum damping. Returns ------- `value` : float the specific spectrum point value corresponding to the period. ''' # alpha max if intensity == 6: alphamax = 0.28 if major else 0.04 elif intensity == 7: alphamax = 0.50 if major else 0.08 elif intensity == 7.5: alphamax = 0.72 if major else 0.12 elif intensity == 8: alphamax = 0.9 if major else 0.16 elif intensity == 8.5: alphamax = 1.2 if major else 0.24 elif intensity == 9: alphamax = 1.40 if major else 0.32 else: raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]") # tg tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65], [0.25, 0.3, 0.4, 0.55, 0.75], [0.3, 0.35, 0.45, 0.65, 0.9]] assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4] tg = tg_table[group-1][site] # other parameters gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping) eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping) eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping) # calculate if period < 0.1: return ((10*eta2 - 4.5) * period + 0.45) * alphamax elif period < tg: return eta2 * alphamax elif period < 5*tg: return (tg/period)**gamma * eta2 * alphamax elif period <= 6: return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax else: return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax
def load(filename)
-
Load the spectrum from file.
Parameters
filename
: strReturns
spectrum
:Spectrum
New instance.Expand source code
@staticmethod def load(filename): '''Load the spectrum from file. Parameters ---------- `filename` : str Returns ------- `spectrum` : `Spectrum` New instance. ''' if not filename.endswith('.spectrum'): filename += '.spectrum' with open(filename, 'r') as f: string = f.read() return Spectrum.read_json(string)
def periods_standard(until=6)
-
Get a standard sampling of the periods.
The sampling points are more sparse with the increase of period.Parameters
until
: number
The end of the period sampling list. It should >= 4.0.Returns
periods
: numpy.ndarray
The standard sampling points.Expand source code
@staticmethod def periods_standard(until=6): '''Get a standard sampling of the periods. The sampling points are more sparse with the increase of period. Parameters ---------- `until` : number The end of the period sampling list. It should >= 4.0. Returns ------- `periods` : numpy.ndarray The standard sampling points. ''' assert until >= 4 return np.hstack([np.arange(0, 0.2, 0.01), np.arange(0.2, 0.5, 0.02), np.arange(0.5, 1.0, 0.05), np.arange(1.0, 2.0, 0.1), np.arange(2.0, 4.0, 0.2), np.arange(4.0, until+0.5, 0.5)])
def read_csv(filename)
-
Create an instance by reading the written csv file.
Parameters
filename
: string
The filename to read.Returns
spectrum
:Spectrum
New instance.Expand source code
@staticmethod def read_csv(filename): '''Create an instance by reading the written csv file. Parameters ---------- `filename` : string The filename to read. Returns ------- `spectrum` : `Spectrum` New instance. ''' if not filename.endswith('.csv'): filename += '.csv' data = np.loadtxt(filename, delimiter=',') return Spectrum(periods=data[:, 0], spectrum=data[:, 1])
def read_dict(it)
-
Create an instance from the dict.
Parameters
it
: dict
The dict generated bySpectrum.to_dict()
Returns
spectrum
:Spectrum
New instance.Expand source code
@staticmethod def read_dict(it): '''Create an instance from the dict. Parameters ---------- `it` : dict The dict generated by `Spectrum.to_dict` Returns ------- `spectrum` : `Spectrum` New instance. ''' return Spectrum(periods=it['periods'], spectrum=it['spectrum'])
def read_json(string)
-
Create an instance from json string.
Parameters
string
: str
The json stringReturns
spectrum
:Spectrum
New instance.Expand source code
@staticmethod def read_json(string): '''Create an instance from json string. Parameters ---------- `string` : str The json string Returns ------- `spectrum` : `Spectrum` New instance. ''' it = json.loads(string) return Spectrum.read_dict(it)
Instance variables
var periods
-
numpy.ndarray, sampling periods
var spectrum
-
numpy.ndarray, spectrum corresponding to sampling periods.
Methods
def get(self, period)
-
Get the spectrum value. Either a list or a number is accepted.
If the period exceeds the largest sampling period, the last spectrum value is returned.
The spectrum value not on sampling points are linearly interpolated.Parameters
period
: number, list, numpy.ndarray
The interested period.Returns
spectrum
: number, numpy.ndarray
if period is list-like, return a ndarray. if period is a number, return a number.Expand source code
def get(self, period): '''Get the spectrum value. Either a list or a number is accepted. If the period exceeds the largest sampling period, the last spectrum value is returned. The spectrum value not on sampling points are linearly interpolated. Parameters ---------- `period` : number, list, numpy.ndarray The interested period. Returns ------- `spectrum` : number, numpy.ndarray if period is list-like, return a ndarray. if period is a number, return a number. ''' if not hasattr(period, '__iter__'): i = self.periods.searchsorted(period) if self.periods[i] == period: return self.spectrum[i] elif i >= len(self.periods): print("Warning: given period exceeds the sampling range.") return self.spectrum[-1] else: return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period) else: return np.array([self.get(p) for p in period])
def match(self, target, start, end, step=0.1)
-
Match the current spectrum with a target spectrum. Use Least Square method.
Parameters
target
:Spectrum
the spectrum to be matched.start
,end
: float
the matching period boundaries.step
: float evenly sample the periods with the step.Returns
coefficient
: float
the coefficient to match the current spectrum to the target.srss
: float
the srss after match.Expand source code
def match(self, target, start, end, step=0.1): '''Match the current spectrum with a target spectrum. Use Least Square method. Parameters ---------- `target` : `Spectrum` the spectrum to be matched. `start`, `end` : float the matching period boundaries. `step` : float evenly sample the periods with the step. Returns ------- `coefficient` : float the coefficient to match the current spectrum to the target. `srss` : float the srss after match. ''' periods = np.arange(start, end+step, step) this_spectrum = np.array([self.get(p) for p in periods]) that_spectrum = np.array([target.get(p) for p in periods]) coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2) srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5 return coefficient, srss
def plot(self, filename=None, engine='pyplot')
-
Plot the spectrum.
Parameters
filename
: string, None
if None, show it. Else, save it to the filenameengine
: 'pyplot' or 'plotly'
Plot engine.Outputs
A file with name
filename
.png orfilename
.html will be generated.Expand source code
def plot(self, filename=None, engine='pyplot'): '''Plot the spectrum. Parameters ---------- `filename` : string, None if None, show it. Else, save it to the filename `engine` : 'pyplot' or 'plotly' Plot engine. Outputs ------- A file with name `filename`.png or `filename`.html will be generated. ''' if engine == 'pyplot': plt.figure(figsize=(4, 3)) plt.plot(self.periods, self.spectrum, marker='o') plt.xlabel('Period (s)') plt.ylabel('Spetrum') plt.xlim([0, self.periods[-1]]) if filename is None: plt.show() else: plt.savefig(filename+'.png', dpi=200) elif engine == 'plotly': fig = go.Figure() fig.add_scatter(dict( x='self.periods', y='self.spectrum', mode='lines+markers', )) if filename is None: fig.show() else: fig.write_html(filename+'.html') else: print('ERROR: engine should be in "pyplot" or "plotly"')
def save(self, filename)
-
Save the spectrum to a file. Currently use json form.
Parameters
filename
: string
The file name to be saved.
The extension '.spectrum' will be automatically added.Outputs
A file with name
filename
.spectrum will be generated.Expand source code
def save(self, filename): '''Save the spectrum to a file. Currently use json form. Parameters ---------- `filename` : string The file name to be saved. The extension '.spectrum' will be automatically added. Outputs ------- A file with name `filename`.spectrum will be generated. ''' with open(filename + '.spectrum', 'w') as f: f.write(self.to_json())
def scale(self, factor_x=1, factor_y=1)
-
Scale the spectrum.
Parameters
factor_x
: number
scaling factor in x direction (periods)factor_y
: number
scaling factor in y direction (spectrum)Returns
spectrum
:Spectrum
Expand source code
def scale(self, factor_x=1, factor_y=1): '''Scale the spectrum. Parameters ---------- `factor_x` : number scaling factor in x direction (periods) `factor_y` : number scaling factor in y direction (spectrum) Returns ------- `spectrum` : `Spectrum` ''' return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y)
def to_dict(self)
-
Convert the instance properties to a dict.
Returns
it
: dictExpand source code
def to_dict(self): '''Convert the instance properties to a dict. Returns ------- `it` : dict ''' return dict( periods=self.periods.tolist(), spectrum=self.spectrum.tolist() )
def to_json(self)
-
Convert the properties to json.
Returns
json
: stringExpand source code
def to_json(self): '''Convert the properties to json. Returns ------- `json` : string ''' return json.dumps(self.to_dict())
def write_csv(self, filename)
-
Write the spectrum to a csv file.
Parameters
filename
: string
The extension '.csv' will be automatically added.Outputs
A file named
filename
.csv will be generated.Expand source code
def write_csv(self, filename): '''Write the spectrum to a csv file. Parameters ---------- `filename` : string The extension '.csv' will be automatically added. Outputs ------- A file named `filename`.csv will be generated. ''' np.savetxt(filename+'.csv', np.vstack( [self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum')
class Suite (folder, name, target_spectrum, period_bound)
-
A class that contains mainly a series of
Motion
instances, and corresponding series of factors, matching the suite of motions to the targetSpectrum
.Parameters
folder
: str
A path-like string for the suite.name
: str A specific name for the suite. The instance will be saved in '{folder}/{name}.suite'target_spectrum
:Spectrum
The target spectrum of the suite.period_bound
: Listlen=2
The matching period boundaries.Expand source code
class Suite(object): '''A class that contains mainly a series of `Motion` instances, and corresponding series of factors, matching the suite of motions to the target `Spectrum`. ''' def __init__(self, folder, name, target_spectrum, period_bound): ''' Parameters ---------- `folder` : str A path-like string for the suite. `name` : str A specific name for the suite. The instance will be saved in '{folder}/{name}.suite' `target_spectrum` : `Spectrum` The target spectrum of the suite. `period_bound` : List<float> len=2 The matching period boundaries. ''' self.folder = folder '''str. See `Suite`''' os.makedirs(folder, exist_ok=True) self.name = name '''str. See `Suite`''' self.motions = [] '''List<`Motion`>. Store all the motions.''' self.factors = [] '''List<float>. Store all the factors that should be applied to the motions.''' self.target_spectrum = target_spectrum '''`Spectrum`. See `Suite`''' self.period_bound = period_bound '''List<float>. See `Suite`''' self.period_samples = np.linspace(period_bound[0], period_bound[1], 200) '''numpy.ndarray. The period within the `period_bound` is evenly sampled, with 200 points.''' self.spectrum_matrix = None '''numpy.ndarray. A matrix with shape (len(`Suite.period_samples`), len(`Suite.motions`)). This matrix times the `Suite.factors` vector products the average spectrum with `Suite.period_samples`.''' self.target_vector = target_spectrum.get(self.period_samples) '''numpy.ndarray. The vector generated by the target_spectrum sampling the `Suite.period_samples`''' def set_folder(self, folder): '''Reset the folder for the suite and the motions. Parameters ---------- `folder` : str Outputs ------- `Suite.folder` and `Suite.motions` will be changed. ''' if folder is not None: self.folder = folder os.makedirs(folder, exist_ok=True) for mo in self.motions: mo.folder = folder def add_motion(self, motion, factor=1): '''Add motion to the suite together with a factor, to keep the lengths are identical. Parameters ---------- `motion` : `Motion` `factor` : number Outputs ------- `Suite.motions` and `Suite.factors` will be changed. ''' self.motions.append(motion) self.factors.append(factor) def load_motions_from_names(self, names, folder=None): '''Give a list of name and the folder, load motions. Parameters ---------- `names` : List<str> The list of motion names. `folder` : str, None The folder of the motions. If is None, use `Suite.folder` Outputs ------- `Suite.motions` and `Suite.factors` will be changed. ''' folder = self.folder if folder is None else folder for name in names: mo = Motion.load(f'{folder}/{name}.motion') self.add_motion(mo) print(f'Loaded motion {mo.name}') self.reset_spectrum_matrix() def load_motions_from_folder(self, folder=None): '''Find all the .motion file from the given folder, and load them. Parameters ---------- `folder` : str, None The folder to search the .motion files. If is None, use `Suite.folder` instead. Outputs ------- `Suite.motions` and `Suite.factors` will be changed. ''' folder = self.folder if folder is None else folder filenames = os.listdir(self.folder) for filename in filenames: if filename.endswith('.motion'): mo = Motion.load(f'{folder}/{filename}') self.add_motion(mo) print(f'Loadad motion {mo.name}') self.reset_spectrum_matrix() def filter_by_IDs(self, ids, new_name, new_folder=None): '''Give a series of selected IDs, filter the motions. Parameters ---------- `ids` : List<int> The list of indexes of the selected motions in the current suite. `new_name` : str `new_folder` : str, None Returns ------- `suite` : `Suite` A new suite with the selected motions and their factors only. ''' suite = self.copy() suite.name = new_name suite.set_folder(new_folder) suite.motions = [self.motions[i] for i in ids] suite.factors = [self.factors[i] for i in ids] suite.reset_spectrum_matrix() print(f'The motions have been filtered. {len(ids)} motions remain.') return suite def filter_by_file(self, filename, new_name, new_folder=None): '''Give a file with indexes selected. Parameters ---------- Refer to `Suite.filter_by_IDs`. Returns ------- Refer to `Suite.filter_by_IDs`. ''' with open(filename, 'r') as f: lines = f.readlines() ids = [int(l) for l in lines] return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder) def get_average_spectrum(self): '''Return a spectrum which is the average spectrum of the suite. Outputs ------- `spectrum` : `Spectrum` ''' periods = self.motions[0].spectrum.periods average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0] for i in range(1, len(self.motions)): average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i] average_spectrum /= len(self.motions) return Spectrum(periods, average_spectrum) def get_srss(self): '''Get the srss of the current averate spectrum to the target spectrum in the period boundary. Returns ------- `srss` : float ''' return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item() def reset_spectrum_matrix(self): '''Reset the spectrum matrix, which produces the average spectrum in the boundary by multiplying factors vector. This method should be called everytime `Suite.motions` change. Outputs ------- `Suite.spectrum_matrix` and `Suite.target_vector` will be changed. ''' mat = np.zeros((len(self.period_samples), len(self.motions))) for i in range(len(self.motions)): spectrum = self.motions[i].spectrum.get(self.period_samples) mat[:, i] = spectrum / len(self.motions) self.spectrum_matrix = mat self.target_vector = self.target_spectrum.get(self.period_samples) def eliminate_neg(self): '''Eliminate negative target vector by multiplying a factor for every individual. In other words, make the average spectrum above the target spectrum. Returns ------- `factor` : float The global factor that should be applied to each of the factors. Outputs ------- `Suite.factors` are all amplified by the returned `factor`. ''' error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector argmin = np.argmin(error) factor = 1 / (1 + error[argmin] / self.target_vector[argmin]) print(f'Multiply {factor:.4} to eliminate negative spectrum.') self.factors = [self.factors[i] * factor for i in range(len(self.factors))] return factor def match_individual_LSQ(self): '''Use Least square method for each individual motion to match the spectrum. Returns ------- `residuals` : numpy.ndarray srss of each motion matched to the target spectrum. Outputs ------- `Suite.factors` are changed. ''' residuals = np.zeros(len(self.motions)) for i in range(len(self.motions)): this = self.spectrum_matrix[:, i] * len(self.motions) that = self.target_vector factor = np.dot(this, that) / np.dot(this, this) residual = np.linalg.norm(this * factor - that) self.factors[i] = factor residuals[i] = residual print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}") return residuals def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000): '''Use Monte Carlo simulation and Least Square method to optimize the suite. after running, a bunch of suites will be written to the new_folder. Parameters ---------- `count` : int The number of motions in the resulting suite. `times` : int The number of times that MC simulation with dimension is run. The total number of samples of MC simulation will be {times} * {dimension}. `use_lsq` : boolean Whether use LSQ optimizer to better optimize the suite. `output_count` : int The number of outputs. if 0, output all. `new_folder` : str `lower_bound`, `upper_bound` : float For the purpose that the individual motions are not too far from the individually matched factors, the lower and upper boundaries of amplifying the individually matched factors are set. This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer. `dimension` : int Each time the Monte Carlo simulation is run, means that {dimension} times of simulation have been done by using a matrix target. Returns ------- List<`Suite`> with len=`output_count`. The list is sorted. The best answer is the first result. However, visual check should be done. Outputs ------- Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted. ''' new_folder = new_folder if new_folder is not None else self.folder os.makedirs(new_folder, exist_ok=True) suites = [] srsses = [] for i in range(times): suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder) suites.append(suite_new) srsses.append(suite_new.get_srss()) if use_lsq: suite_new2 = suite_new.optimize_LSQ( new_name=f'lsq{i:03d}', new_folder=new_folder, lower_bound=lower_bound, upper_bound=upper_bound ) suites.append(suite_new2) srsses.append(suite_new2.get_srss()) argsorted = np.argsort(np.array(srsses)) results = [suites[i] for i in argsorted[:output_count]] for i, suite in enumerate(results): suite.name = f'no{i}-{suite.name}' suite.save() suite.plot_all_spectrums(engine='pyplot') return results def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000): '''Use Monte Carlo simulation to filter the suite to count numbers of motions. Parameters ---------- Refer to `Suite.filter_optimize`. Returns ------- `suite` : `Suite` The optimized result after {dimension} times of MC simulations. ''' print(f'Running Monte Carlo simulation, dimension={dimension}') self.match_individual_LSQ() factors = np.array(self.factors) xs = np.zeros((len(self.motions), dimension)) for i in range(dimension): indexes = random.sample(range(len(self.motions)), count) xs[indexes, i] = factors[indexes] mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs) loss = mul - self.target_vector.reshape(-1, 1) argmin = np.argmin(loss, axis=0) lossmin = np.min(loss, axis=0) target_at_lossmin = self.target_vector[argmin] amps = 1 / (lossmin / target_at_lossmin + 1) lossAmp = mul * amps - self.target_vector.reshape(-1, 1) srss = np.linalg.norm(lossAmp, axis=0) arg = np.argmin(srss) factors = xs[:, arg] * amps[arg] suvives = np.where(factors > 1.0e-3) suite = self.copy() suite.name = new_name suite.set_folder(new_folder) suite.motions = [self.motions[i] for i in suvives[0]] suite.factors = [factors[i] for i in suvives[0]] suite.reset_spectrum_matrix() return suite def _loss(self, x): '''Utility function: calculate the loss for optimization.''' factors = x * np.array(self.factors) mul = np.matmul(self.spectrum_matrix, factors) loss = mul - self.target_vector argmin = np.argmin(loss) amp = 1 / (loss[argmin] / self.target_vector[argmin] + 1) return mul * amp - self.target_vector def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4): '''Use Least Square method from scipy to optimize the factors. Parameters ---------- Refer to `Suite.filter_optimize`. Returns ------- `suite` : `Suite` Optimized suite. ''' print('Running Bounded Least Square optimization ...') res = least_squares(self._loss, np.ones(len(self.motions)), bounds=(lower_bound, upper_bound), verbose=0) factors = res.x * self.factors suite = self.copy() suite.name = new_name suite.set_folder(new_folder) suite.factors = factors.tolist() suite.eliminate_neg() return suite def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False): '''From a folder load all the PEER motions into the suite. Extract the downloaded peer zip file first. Parameters ---------- `peer_folder` : str The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found. `h1`, `h2`, `v` : boolean Whether load the three directional files or not. Outputs ------- `Suite.motions`, `Suite.factors` will be changed. All the PEER motions will be saved in .motion format in the new folder. ''' with open(f'{peer_folder}/_SearchResults.csv', "r") as f: for _ in range(34): f.readline() i = 0 positions = [] if h1: positions.append(19) if h2: positions.append(20) if v: positions.append(21) line = f.readline() while line != '\n': cells = line.split(',') for pos in positions: filename = cells[pos].replace(' ', '') motion_name = filename.replace('.AT2', '') print(f'Loading {motion_name}') mo = Motion(folder=self.folder, name=motion_name) mo.load_peer(peer_filename=f'{peer_folder}/{filename}') mo.generate_spectrum() mo.save() self.add_motion(mo) i += 1 line = f.readline() self.reset_spectrum_matrix() print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.") return True def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True): '''Beautify the suite in 4 steps: 1. truncate the motions, see `Motion.truncate` 2. trim the near-zero parts, see `Motion.trim` 3. change dt to uniform, see `Motion.change_dt` 4. rename the motions to a series. This procedure is always logged to the current folder. Parameters ---------- `new_name` : str `run_steps` : List<int> The step number to run. Users can select which one out of the 4 steps to run. The order will also follow this List. `dt` : float The uniform delta t. `new_folder` : str `trunc_dict` : dict A dict that defines the `time` parameter in `Motion.truncate` In the dict, keys are the indexes of the motions in the suite, values are the `time` values. `left`, `right` : float The parameters of PGA to trim. Refer to `Motion.trim`. Returns ------- `suite` : `Suite` Outputs ------- All the logs and the pre- and post- beautify average spectrum. ''' suite = self.copy() spectrum_hist = suite.get_average_spectrum() suite.plot_all_spectrums(engine='pyplot') for i, motion in enumerate(self.motions): mo = motion.copy() mo.name = f'{self.name}-{i}-b' for step in run_steps: if step == 1: if i in trunc_dict: mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1') elif step == 2: mo = mo.trim(new_name=mo.name+'2', left=left, right=right) elif step == 3: mo = mo.change_dt(dt=dt, new_name=mo.name+'3') elif step == 4: mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}' else: print(f'step={step} is not recognized and ignored.') suite.motions[i] = mo spectrum_new = suite.get_average_spectrum() plt.figure() plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before') plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after') plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target') plt.legend() plt.suptitle(f'Beautify: {self.name} -> {new_name}') plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200) suite.name = new_name if new_folder is not None: suite.set_folder(new_folder) return suite def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True): '''Scale the motions both horizontally and vertically. Parameters ---------- `new_name` : str `new_folder` : str, None `factor_x`, `factor_y` : float The factors to amplify in x axis and y axis. `scale_target_spectrum` : boolean Whether scale the target spectrum as well. Returns ------- `suite` : `Suite` ''' suite = self.copy() suite.name = new_name suite.set_folder(new_folder) for i, motion in enumerate(self.motions): suite.motions[i] = motion.scale( new_name=motion.name, factor_x=factor_x, factor_y=factor_y) if scale_target_spectrum: suite.target_spectrum = self.target_spectrum.scale( factor_x=factor_x, factor_y=factor_y) if factor_x != 1: suite.period_bound = [p * factor_x for p in self.period_bound] suite.period_samples = self.period_samples * factor_x if factor_y != 1: suite.factors = [f*factor_y for f in self.factors] suite.reset_spectrum_matrix() return suite def write_TCL(self, folder=None, filename=None): '''Write the suite to TCL forms for OpenSees. Currently the TCL contains only one dict. The keys of the dict is the motion names. Then the essential properties are stored. At the same time, the accelerations for each motion is written to a text file. Parameters ---------- `folder` : str, None The folder to write the tcl files. If is None, use `Suite.folder`. `filename` : str, None The tcl file name. If is None, use `Suite.name`. Outputs ------- A tcl file and a series of acceleration files. ''' folder = self.folder if folder is None else folder filename = self.name if filename is None else filename.replace( '.tcl', '') + '.tcl' os.makedirs(folder, exist_ok=True) config = '' config += 'dict set motion keys {' names = [mo.name for mo in self.motions] config += ' '.join(names) config += '}\n\n' for i, mo in enumerate(self.motions): mo.write_accel(f'{folder}/{mo.name}', time=False) config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n' config += f'dict set motion {mo.name} npts {len(mo.accel)}\n' config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n' config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n' with open(f'{folder}/{filename}.tcl', 'w') as f: f.write(config) def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'): '''Plot amplified accelograms to a single file. Parameters ---------- `folder` : str, None If is None, use `Suite.folder`. `filename` : str, None If is None, use `Suite.name` + '_accel' `save` : boolean If True, save the file. Else, show immediately. `engine` : 'pyplot' or 'plotly' Outputs ------- A figure is plotted. ''' folder = self.folder if folder is None else folder if engine == 'plotly': filename = f'{self.name}_accel.html' if filename is None else filename + '.html' fig = go.Figure() for i, motion in enumerate(self.motions): fig.add_trace(go.Scatter( x=np.arange(len(motion.accels)) * motion.dt, y=motion.accels, mode='lines', name=str(i) + motion.name, text=motion.name )) if save: fig.write_html(f'{folder}/{filename}') else: fig.show() elif engine == 'pyplot': print('Pyplot is not supported.') else: print('ERROR: engine should be in "pyplot" and "plotly"') def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'): '''Plot all amplified spectrums in one file. Parameters ---------- Refer to `Suite.plot_all_accels`. The filename if is None will be `Suite.name` + '_spectrum' Outputs ------- A figure will be generated. ''' if engine == 'plotly': folder = self.folder if folder is None else folder filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html' fig = go.Figure() for i, motion in enumerate(self.motions): fig.add_trace(go.Scatter( x=motion.spectrum.periods, y=motion.spectrum.spectrum * self.factors[i], mode='lines', name=motion.name, text=motion.name )) spectrum1 = self.target_spectrum spectrum2 = self.get_average_spectrum() fig.add_trace(go.Scatter( x=spectrum1.periods, y=spectrum1.spectrum, mode='lines', name='Target', text='Target', line=dict(width=5), )) fig.add_trace(go.Scatter( x=spectrum2.periods, y=spectrum2.spectrum, mode='lines', name='Average', text='Average', line=dict(width=5), )) if save: fig.write_html(f'{folder}/{filename}') else: fig.show() elif engine == 'pyplot': folder = self.folder if folder is None else folder filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png' plt.figure(figsize=(7, 5)) for i, motion in enumerate(self.motions): plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver') plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3) averageSpectrum = self.get_average_spectrum() plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2) plt.legend(loc='best', fontsize='x-small') plt.title(f'SRSS={self.get_srss():.4f}') if save: plt.savefig(os.path.join(folder, filename), dpi=200) else: plt.show() else: print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.") def plot_individual(self, save=True): '''Plot all individual ground motions and the spectrum. Parameters ---------- Refer to `Suite.plot_all_accels`. Outputs ------- If save, a figure for each motion will be generated. ''' for i, mo in enumerate(self.motions): _, ax = plt.subplots(2, 1, figsize=(7, 5)) ax[0].plot(np.arange(len(mo.accel)) * mo.dt, mo.accel * self.factors[i]) ax[1].plot(mo.spectrum.periods, mo.spectrum.spectrum * self.factors[i]) ax[1].plot(self.target_spectrum.periods, self.target_spectrum.spectrum, linewidth=2) plt.suptitle( f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}') filename = f'{self.folder}/motion{i}.png' if save: plt.savefig(filename, dpi=200) else: plt.show() def plot_interactive(self, save=True): '''Plot the spectrum and the acceleration time-histories to an interactive html file. Parameters ---------- `save` : boolean If True, write to html file. Otherwise, show directly without saving. Outputs ------- A html file with interactive plotting sliders. ''' fig = make_subplots(rows=1, cols=2) for i in range(len(self.motions)): fig.add_trace(go.Scatter( x=self.motions[i].spectrum.periods, y=self.motions[i].spectrum.spectrum * self.factors[i], name=self.motions[i].name, ), row=1, col=1) fig.add_trace(go.Scatter( x=self.motions[i].get_times(), y=self.motions[i].accel * self.factors[i], name=self.motions[i].name, ), row=1, col=2) spectrum_average = self.get_average_spectrum() fig.add_trace(go.Scatter( x=spectrum_average.periods, y=spectrum_average.spectrum, name='average', line={'width': 5} )) fig.add_trace(go.Scatter( x=self.target_spectrum.periods, y=self.target_spectrum.spectrum, name='target', line={'width': 5} )) def _title(i): return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}' steps = [] for i in range(len(self.motions)): step = dict( method='update', args=[ {'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]}, {'title': _title(i)} ], label=str(i), ) steps.append(step) sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})] updatemenus = [dict( buttons=[dict( label='All', method='update', args=[ {'visible': [True] * (len(self.motions)*2+2)}, {'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'} ] )], type='buttons', x=1, y=0, xanchor='right', yanchor='top', pad={'t': 30}, )] fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus) if save: fig.write_html(f'{self.folder}/{self.name}.html') else: fig.show() def to_dict(self, separate_motion=True): '''Convert all the properties to dict. Parameters ---------- `separate_motion` : boolean If True, the motion details are not included into the dict. User has to make sure these motions are not moved. Returns ------- `it` : dict ''' return dict( folder=self.folder, name=self.name, motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [ f'{mo.folder}/{mo.name}' for mo in self.motions], factors=self.factors if type(self.factors[0] == float) else [ fact.item() for fact in self.factors], period_bound=self.period_bound, target_spectrum=self.target_spectrum.to_dict(), period_samples=self.period_samples.tolist(), srss=self.get_srss(), average_spectrum=self.get_average_spectrum().to_dict(), ) def to_json(self, separate_motion=True): '''Convert the properties to json. Parameters ---------- Refer to `Suite.to_dict`. Returns ------- `json` : str ''' return json.dumps(self.to_dict(separate_motion=separate_motion)) def save(self, separate_motion=True, rewrite=False): '''Save the suite to a .suite file. Parameters ---------- `separate_motion` : boolean If True, the motion details are not included into the file. User has to make sure the motions are not moved. `rewrite` : boolean If True, all the '.motion' files will be rewritten. otherwise, the motion files will be ignored. Outputs ------- Create a .suite file to store the suite. ''' with open(f'{self.folder}/{self.name}.suite', 'w') as f: f.write(self.to_json(separate_motion=separate_motion)) if separate_motion: for mo in self.motions: if not os.path.exists(f'{mo.folder}/{mo.name}.motion'): mo.save() print(f'Motion saved to {mo.folder}/{mo.name}.motion') else: if rewrite: mo.save() print(f'Motion {mo.folder}/{mo.name}.motion is written.') print(f'Saved to {self.folder}/{self.name}.suite') @staticmethod def read_dict(it): '''Read the dictionary representing the properties of the class. If key 'motions' is a instance of dict, then construct motions with dict. else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file. Parameters ---------- `it` : dict The dict generated by `Suite.to_dict` Returns ------- `suite` : `Suite` ''' suite = Suite( folder=it['folder'], name=it['name'], target_spectrum=Spectrum.read_dict(it['target_spectrum']), period_bound=it['period_bound'] ) for i, mo_content in enumerate(it['motions']): if isinstance(mo_content, dict): suite.add_motion(Motion.read_dict( mo_content), it['factors'][i]) else: suite.add_motion(Motion.load( f'{mo_content}.motion'), it['factors'][i]) suite.period_samples = np.array(it['period_samples']) suite.target_vector = suite.target_spectrum.get(suite.period_samples) suite.reset_spectrum_matrix() return suite @staticmethod def load(filename): '''Load a the suite from a file. Parameters ---------- `filename` : str File that stores the suite. Returns ------- `suite` : `Suite` ''' if not filename.endswith('.suite'): filename += '.suite' with open(filename, 'r') as f: return Suite.read_dict(json.loads(f.read())) def copy(self): '''Copy the instance. Returns ------- `suite` : `Suite` ''' return Suite.read_dict(self.to_dict())
Static methods
def load(filename)
-
Load a the suite from a file.
Parameters
filename
: str
File that stores the suite.Returns
suite
:Suite
Expand source code
@staticmethod def load(filename): '''Load a the suite from a file. Parameters ---------- `filename` : str File that stores the suite. Returns ------- `suite` : `Suite` ''' if not filename.endswith('.suite'): filename += '.suite' with open(filename, 'r') as f: return Suite.read_dict(json.loads(f.read()))
def read_dict(it)
-
Read the dictionary representing the properties of the class.
If key 'motions' is a instance of dict, then construct motions with dict.
else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file.Parameters
it
: dict
The dict generated bySuite.to_dict()
Returns
suite
:Suite
Expand source code
@staticmethod def read_dict(it): '''Read the dictionary representing the properties of the class. If key 'motions' is a instance of dict, then construct motions with dict. else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file. Parameters ---------- `it` : dict The dict generated by `Suite.to_dict` Returns ------- `suite` : `Suite` ''' suite = Suite( folder=it['folder'], name=it['name'], target_spectrum=Spectrum.read_dict(it['target_spectrum']), period_bound=it['period_bound'] ) for i, mo_content in enumerate(it['motions']): if isinstance(mo_content, dict): suite.add_motion(Motion.read_dict( mo_content), it['factors'][i]) else: suite.add_motion(Motion.load( f'{mo_content}.motion'), it['factors'][i]) suite.period_samples = np.array(it['period_samples']) suite.target_vector = suite.target_spectrum.get(suite.period_samples) suite.reset_spectrum_matrix() return suite
Instance variables
var factors
-
List
. Store all the factors that should be applied to the motions. var folder
-
str. See
Suite
var motions
-
List<
Motion
>. Store all the motions. var name
-
str. See
Suite
var period_bound
-
List
. See Suite
var period_samples
-
numpy.ndarray. The period within the
period_bound
is evenly sampled, with 200 points. var spectrum_matrix
-
numpy.ndarray. A matrix with shape (len(
Suite.period_samples
), len(Suite.motions
)). This matrix times theSuite.factors
vector products the average spectrum withSuite.period_samples
. var target_spectrum
-
var target_vector
-
numpy.ndarray. The vector generated by the target_spectrum sampling the
Suite.period_samples
Methods
def add_motion(self, motion, factor=1)
-
Add motion to the suite together with a factor, to keep the lengths are identical.
Parameters
motion
:Motion
factor
: numberOutputs
Suite.motions
andSuite.factors
will be changed.Expand source code
def add_motion(self, motion, factor=1): '''Add motion to the suite together with a factor, to keep the lengths are identical. Parameters ---------- `motion` : `Motion` `factor` : number Outputs ------- `Suite.motions` and `Suite.factors` will be changed. ''' self.motions.append(motion) self.factors.append(factor)
def beautify(self, new_name, run_steps=[1, 2, 3, 4], dt=0.01, new_folder=None, trunc_dict={}, left=0.001, right=0.001, prepend_zero=True)
-
Beautify the suite in 4 steps:
- truncate the motions, see
Motion.truncate()
- trim the near-zero parts, see
Motion.trim()
- change dt to uniform, see
Motion.change_dt()
- rename the motions to a series.
This procedure is always logged to the current folder.
Parameters
new_name
: strrun_steps
: List
The step number to run. Users can select which one out of the 4 steps to run. The order will also follow this List.dt
: float
The uniform delta t.new_folder
: strtrunc_dict
: dict
A dict that defines thetime
parameter inMotion.truncate()
In the dict, keys are the indexes of the motions in the suite, values are thetime
values.left
,right
: float
The parameters of PGA to trim. Refer toMotion.trim()
.Returns
suite
:Suite
Outputs
All the logs and the pre- and post- beautify average spectrum.
Expand source code
def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True): '''Beautify the suite in 4 steps: 1. truncate the motions, see `Motion.truncate` 2. trim the near-zero parts, see `Motion.trim` 3. change dt to uniform, see `Motion.change_dt` 4. rename the motions to a series. This procedure is always logged to the current folder. Parameters ---------- `new_name` : str `run_steps` : List<int> The step number to run. Users can select which one out of the 4 steps to run. The order will also follow this List. `dt` : float The uniform delta t. `new_folder` : str `trunc_dict` : dict A dict that defines the `time` parameter in `Motion.truncate` In the dict, keys are the indexes of the motions in the suite, values are the `time` values. `left`, `right` : float The parameters of PGA to trim. Refer to `Motion.trim`. Returns ------- `suite` : `Suite` Outputs ------- All the logs and the pre- and post- beautify average spectrum. ''' suite = self.copy() spectrum_hist = suite.get_average_spectrum() suite.plot_all_spectrums(engine='pyplot') for i, motion in enumerate(self.motions): mo = motion.copy() mo.name = f'{self.name}-{i}-b' for step in run_steps: if step == 1: if i in trunc_dict: mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1') elif step == 2: mo = mo.trim(new_name=mo.name+'2', left=left, right=right) elif step == 3: mo = mo.change_dt(dt=dt, new_name=mo.name+'3') elif step == 4: mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}' else: print(f'step={step} is not recognized and ignored.') suite.motions[i] = mo spectrum_new = suite.get_average_spectrum() plt.figure() plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before') plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after') plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target') plt.legend() plt.suptitle(f'Beautify: {self.name} -> {new_name}') plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200) suite.name = new_name if new_folder is not None: suite.set_folder(new_folder) return suite
- truncate the motions, see
def copy(self)
-
Copy the instance.
Returns
suite
:Suite
Expand source code
def copy(self): '''Copy the instance. Returns ------- `suite` : `Suite` ''' return Suite.read_dict(self.to_dict())
def eliminate_neg(self)
-
Eliminate negative target vector by multiplying a factor for every individual. In other words, make the average spectrum above the target spectrum.
Returns
factor
: float
The global factor that should be applied to each of the factors.Outputs
Suite.factors
are all amplified by the returnedfactor
.Expand source code
def eliminate_neg(self): '''Eliminate negative target vector by multiplying a factor for every individual. In other words, make the average spectrum above the target spectrum. Returns ------- `factor` : float The global factor that should be applied to each of the factors. Outputs ------- `Suite.factors` are all amplified by the returned `factor`. ''' error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector argmin = np.argmin(error) factor = 1 / (1 + error[argmin] / self.target_vector[argmin]) print(f'Multiply {factor:.4} to eliminate negative spectrum.') self.factors = [self.factors[i] * factor for i in range(len(self.factors))] return factor
def filter_by_IDs(self, ids, new_name, new_folder=None)
-
Give a series of selected IDs, filter the motions.
Parameters
ids
: List
The list of indexes of the selected motions in the current suite.new_name
: strnew_folder
: str, NoneReturns
suite
:Suite
A new suite with the selected motions and their factors only.Expand source code
def filter_by_IDs(self, ids, new_name, new_folder=None): '''Give a series of selected IDs, filter the motions. Parameters ---------- `ids` : List<int> The list of indexes of the selected motions in the current suite. `new_name` : str `new_folder` : str, None Returns ------- `suite` : `Suite` A new suite with the selected motions and their factors only. ''' suite = self.copy() suite.name = new_name suite.set_folder(new_folder) suite.motions = [self.motions[i] for i in ids] suite.factors = [self.factors[i] for i in ids] suite.reset_spectrum_matrix() print(f'The motions have been filtered. {len(ids)} motions remain.') return suite
def filter_by_file(self, filename, new_name, new_folder=None)
-
Give a file with indexes selected.
Parameters
Refer to
Suite.filter_by_IDs()
.Returns
Refer to
Suite.filter_by_IDs()
.Expand source code
def filter_by_file(self, filename, new_name, new_folder=None): '''Give a file with indexes selected. Parameters ---------- Refer to `Suite.filter_by_IDs`. Returns ------- Refer to `Suite.filter_by_IDs`. ''' with open(filename, 'r') as f: lines = f.readlines() ids = [int(l) for l in lines] return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder)
def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000)
-
Use Monte Carlo simulation to filter the suite to count numbers of motions.
Parameters
Refer to
Suite.filter_optimize()
.Returns
suite
:Suite
The optimized result after {dimension} times of MC simulations.Expand source code
def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000): '''Use Monte Carlo simulation to filter the suite to count numbers of motions. Parameters ---------- Refer to `Suite.filter_optimize`. Returns ------- `suite` : `Suite` The optimized result after {dimension} times of MC simulations. ''' print(f'Running Monte Carlo simulation, dimension={dimension}') self.match_individual_LSQ() factors = np.array(self.factors) xs = np.zeros((len(self.motions), dimension)) for i in range(dimension): indexes = random.sample(range(len(self.motions)), count) xs[indexes, i] = factors[indexes] mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs) loss = mul - self.target_vector.reshape(-1, 1) argmin = np.argmin(loss, axis=0) lossmin = np.min(loss, axis=0) target_at_lossmin = self.target_vector[argmin] amps = 1 / (lossmin / target_at_lossmin + 1) lossAmp = mul * amps - self.target_vector.reshape(-1, 1) srss = np.linalg.norm(lossAmp, axis=0) arg = np.argmin(srss) factors = xs[:, arg] * amps[arg] suvives = np.where(factors > 1.0e-3) suite = self.copy() suite.name = new_name suite.set_folder(new_folder) suite.motions = [self.motions[i] for i in suvives[0]] suite.factors = [factors[i] for i in suvives[0]] suite.reset_spectrum_matrix() return suite
def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000)
-
Use Monte Carlo simulation and Least Square method to optimize the suite. after running, a bunch of suites will be written to the new_folder.
Parameters
count
: int
The number of motions in the resulting suite.times
: int
The number of times that MC simulation with dimension is run.
The total number of samples of MC simulation will be {times} * {dimension}.use_lsq
: boolean
Whether use LSQ optimizer to better optimize the suite.output_count
: int
The number of outputs. if 0, output all.new_folder
: strlower_bound
,upper_bound
: float
For the purpose that the individual motions are not too far from the individually matched factors, the lower and upper boundaries of amplifying the individually matched factors are set.
This is only useful whenuse_lsq
is set, to add boundaries to the LSQ optimizer.dimension
: int
Each time the Monte Carlo simulation is run, means that {dimension} times of simulation have been done by using a matrix target.Returns
List<
Suite
> with len=output_count
.
The list is sorted. The best answer is the first result. However, visual check should be done.Outputs
Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted.
Expand source code
def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000): '''Use Monte Carlo simulation and Least Square method to optimize the suite. after running, a bunch of suites will be written to the new_folder. Parameters ---------- `count` : int The number of motions in the resulting suite. `times` : int The number of times that MC simulation with dimension is run. The total number of samples of MC simulation will be {times} * {dimension}. `use_lsq` : boolean Whether use LSQ optimizer to better optimize the suite. `output_count` : int The number of outputs. if 0, output all. `new_folder` : str `lower_bound`, `upper_bound` : float For the purpose that the individual motions are not too far from the individually matched factors, the lower and upper boundaries of amplifying the individually matched factors are set. This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer. `dimension` : int Each time the Monte Carlo simulation is run, means that {dimension} times of simulation have been done by using a matrix target. Returns ------- List<`Suite`> with len=`output_count`. The list is sorted. The best answer is the first result. However, visual check should be done. Outputs ------- Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted. ''' new_folder = new_folder if new_folder is not None else self.folder os.makedirs(new_folder, exist_ok=True) suites = [] srsses = [] for i in range(times): suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder) suites.append(suite_new) srsses.append(suite_new.get_srss()) if use_lsq: suite_new2 = suite_new.optimize_LSQ( new_name=f'lsq{i:03d}', new_folder=new_folder, lower_bound=lower_bound, upper_bound=upper_bound ) suites.append(suite_new2) srsses.append(suite_new2.get_srss()) argsorted = np.argsort(np.array(srsses)) results = [suites[i] for i in argsorted[:output_count]] for i, suite in enumerate(results): suite.name = f'no{i}-{suite.name}' suite.save() suite.plot_all_spectrums(engine='pyplot') return results
def get_average_spectrum(self)
-
Return a spectrum which is the average spectrum of the suite.
Outputs
spectrum
:Spectrum
Expand source code
def get_average_spectrum(self): '''Return a spectrum which is the average spectrum of the suite. Outputs ------- `spectrum` : `Spectrum` ''' periods = self.motions[0].spectrum.periods average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0] for i in range(1, len(self.motions)): average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i] average_spectrum /= len(self.motions) return Spectrum(periods, average_spectrum)
def get_srss(self)
-
Get the srss of the current averate spectrum to the target spectrum in the period boundary.
Returns
srss
: floatExpand source code
def get_srss(self): '''Get the srss of the current averate spectrum to the target spectrum in the period boundary. Returns ------- `srss` : float ''' return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item()
def load_motions_from_folder(self, folder=None)
-
Find all the .motion file from the given folder, and load them.
Parameters
folder
: str, None
The folder to search the .motion files.
If is None, useSuite.folder
instead.Outputs
Suite.motions
andSuite.factors
will be changed.Expand source code
def load_motions_from_folder(self, folder=None): '''Find all the .motion file from the given folder, and load them. Parameters ---------- `folder` : str, None The folder to search the .motion files. If is None, use `Suite.folder` instead. Outputs ------- `Suite.motions` and `Suite.factors` will be changed. ''' folder = self.folder if folder is None else folder filenames = os.listdir(self.folder) for filename in filenames: if filename.endswith('.motion'): mo = Motion.load(f'{folder}/{filename}') self.add_motion(mo) print(f'Loadad motion {mo.name}') self.reset_spectrum_matrix()
def load_motions_from_names(self, names, folder=None)
-
Give a list of name and the folder, load motions.
Parameters
names
: List
The list of motion names.folder
: str, None
The folder of the motions.
If is None, useSuite.folder
Outputs
Suite.motions
andSuite.factors
will be changed.Expand source code
def load_motions_from_names(self, names, folder=None): '''Give a list of name and the folder, load motions. Parameters ---------- `names` : List<str> The list of motion names. `folder` : str, None The folder of the motions. If is None, use `Suite.folder` Outputs ------- `Suite.motions` and `Suite.factors` will be changed. ''' folder = self.folder if folder is None else folder for name in names: mo = Motion.load(f'{folder}/{name}.motion') self.add_motion(mo) print(f'Loaded motion {mo.name}') self.reset_spectrum_matrix()
def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False)
-
From a folder load all the PEER motions into the suite.
Extract the downloaded peer zip file first.Parameters
peer_folder
: str
The folder path of the extracted PEER data. Where_SearchResults.csv
can be found.h1
,h2
,v
: boolean
Whether load the three directional files or not.Outputs
Suite.motions
,Suite.factors
will be changed.
All the PEER motions will be saved in .motion format in the new folder.Expand source code
def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False): '''From a folder load all the PEER motions into the suite. Extract the downloaded peer zip file first. Parameters ---------- `peer_folder` : str The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found. `h1`, `h2`, `v` : boolean Whether load the three directional files or not. Outputs ------- `Suite.motions`, `Suite.factors` will be changed. All the PEER motions will be saved in .motion format in the new folder. ''' with open(f'{peer_folder}/_SearchResults.csv', "r") as f: for _ in range(34): f.readline() i = 0 positions = [] if h1: positions.append(19) if h2: positions.append(20) if v: positions.append(21) line = f.readline() while line != '\n': cells = line.split(',') for pos in positions: filename = cells[pos].replace(' ', '') motion_name = filename.replace('.AT2', '') print(f'Loading {motion_name}') mo = Motion(folder=self.folder, name=motion_name) mo.load_peer(peer_filename=f'{peer_folder}/{filename}') mo.generate_spectrum() mo.save() self.add_motion(mo) i += 1 line = f.readline() self.reset_spectrum_matrix() print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.") return True
def match_individual_LSQ(self)
-
Use Least square method for each individual motion to match the spectrum.
Returns
residuals
: numpy.ndarray
srss of each motion matched to the target spectrum.Outputs
Suite.factors
are changed.Expand source code
def match_individual_LSQ(self): '''Use Least square method for each individual motion to match the spectrum. Returns ------- `residuals` : numpy.ndarray srss of each motion matched to the target spectrum. Outputs ------- `Suite.factors` are changed. ''' residuals = np.zeros(len(self.motions)) for i in range(len(self.motions)): this = self.spectrum_matrix[:, i] * len(self.motions) that = self.target_vector factor = np.dot(this, that) / np.dot(this, this) residual = np.linalg.norm(this * factor - that) self.factors[i] = factor residuals[i] = residual print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}") return residuals
def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4)
-
Use Least Square method from scipy to optimize the factors.
Parameters
Refer to
Suite.filter_optimize()
.Returns
suite
:Suite
Optimized suite.Expand source code
def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4): '''Use Least Square method from scipy to optimize the factors. Parameters ---------- Refer to `Suite.filter_optimize`. Returns ------- `suite` : `Suite` Optimized suite. ''' print('Running Bounded Least Square optimization ...') res = least_squares(self._loss, np.ones(len(self.motions)), bounds=(lower_bound, upper_bound), verbose=0) factors = res.x * self.factors suite = self.copy() suite.name = new_name suite.set_folder(new_folder) suite.factors = factors.tolist() suite.eliminate_neg() return suite
def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot')
-
Plot amplified accelograms to a single file.
Parameters
folder
: str, None
If is None, useSuite.folder
.filename
: str, None
If is None, useSuite.name
+ '_accel'save
: boolean
If True, save the file. Else, show immediately.engine
: 'pyplot' or 'plotly'Outputs
A figure is plotted.
Expand source code
def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'): '''Plot amplified accelograms to a single file. Parameters ---------- `folder` : str, None If is None, use `Suite.folder`. `filename` : str, None If is None, use `Suite.name` + '_accel' `save` : boolean If True, save the file. Else, show immediately. `engine` : 'pyplot' or 'plotly' Outputs ------- A figure is plotted. ''' folder = self.folder if folder is None else folder if engine == 'plotly': filename = f'{self.name}_accel.html' if filename is None else filename + '.html' fig = go.Figure() for i, motion in enumerate(self.motions): fig.add_trace(go.Scatter( x=np.arange(len(motion.accels)) * motion.dt, y=motion.accels, mode='lines', name=str(i) + motion.name, text=motion.name )) if save: fig.write_html(f'{folder}/{filename}') else: fig.show() elif engine == 'pyplot': print('Pyplot is not supported.') else: print('ERROR: engine should be in "pyplot" and "plotly"')
def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot')
-
Plot all amplified spectrums in one file.
Parameters
Refer to
Suite.plot_all_accels()
.The filename if is None will be
Suite.name
+ '_spectrum'Outputs
A figure will be generated.
Expand source code
def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'): '''Plot all amplified spectrums in one file. Parameters ---------- Refer to `Suite.plot_all_accels`. The filename if is None will be `Suite.name` + '_spectrum' Outputs ------- A figure will be generated. ''' if engine == 'plotly': folder = self.folder if folder is None else folder filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html' fig = go.Figure() for i, motion in enumerate(self.motions): fig.add_trace(go.Scatter( x=motion.spectrum.periods, y=motion.spectrum.spectrum * self.factors[i], mode='lines', name=motion.name, text=motion.name )) spectrum1 = self.target_spectrum spectrum2 = self.get_average_spectrum() fig.add_trace(go.Scatter( x=spectrum1.periods, y=spectrum1.spectrum, mode='lines', name='Target', text='Target', line=dict(width=5), )) fig.add_trace(go.Scatter( x=spectrum2.periods, y=spectrum2.spectrum, mode='lines', name='Average', text='Average', line=dict(width=5), )) if save: fig.write_html(f'{folder}/{filename}') else: fig.show() elif engine == 'pyplot': folder = self.folder if folder is None else folder filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png' plt.figure(figsize=(7, 5)) for i, motion in enumerate(self.motions): plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver') plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3) averageSpectrum = self.get_average_spectrum() plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2) plt.legend(loc='best', fontsize='x-small') plt.title(f'SRSS={self.get_srss():.4f}') if save: plt.savefig(os.path.join(folder, filename), dpi=200) else: plt.show() else: print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.")
def plot_individual(self, save=True)
-
Plot all individual ground motions and the spectrum.
Parameters
Refer to
Suite.plot_all_accels()
.Outputs
If save, a figure for each motion will be generated.
Expand source code
def plot_individual(self, save=True): '''Plot all individual ground motions and the spectrum. Parameters ---------- Refer to `Suite.plot_all_accels`. Outputs ------- If save, a figure for each motion will be generated. ''' for i, mo in enumerate(self.motions): _, ax = plt.subplots(2, 1, figsize=(7, 5)) ax[0].plot(np.arange(len(mo.accel)) * mo.dt, mo.accel * self.factors[i]) ax[1].plot(mo.spectrum.periods, mo.spectrum.spectrum * self.factors[i]) ax[1].plot(self.target_spectrum.periods, self.target_spectrum.spectrum, linewidth=2) plt.suptitle( f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}') filename = f'{self.folder}/motion{i}.png' if save: plt.savefig(filename, dpi=200) else: plt.show()
def plot_interactive(self, save=True)
-
Plot the spectrum and the acceleration time-histories to an interactive html file.
Parameters
save
: boolean
If True, write to html file. Otherwise, show directly without saving.Outputs
A html file with interactive plotting sliders.
Expand source code
def plot_interactive(self, save=True): '''Plot the spectrum and the acceleration time-histories to an interactive html file. Parameters ---------- `save` : boolean If True, write to html file. Otherwise, show directly without saving. Outputs ------- A html file with interactive plotting sliders. ''' fig = make_subplots(rows=1, cols=2) for i in range(len(self.motions)): fig.add_trace(go.Scatter( x=self.motions[i].spectrum.periods, y=self.motions[i].spectrum.spectrum * self.factors[i], name=self.motions[i].name, ), row=1, col=1) fig.add_trace(go.Scatter( x=self.motions[i].get_times(), y=self.motions[i].accel * self.factors[i], name=self.motions[i].name, ), row=1, col=2) spectrum_average = self.get_average_spectrum() fig.add_trace(go.Scatter( x=spectrum_average.periods, y=spectrum_average.spectrum, name='average', line={'width': 5} )) fig.add_trace(go.Scatter( x=self.target_spectrum.periods, y=self.target_spectrum.spectrum, name='target', line={'width': 5} )) def _title(i): return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}' steps = [] for i in range(len(self.motions)): step = dict( method='update', args=[ {'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]}, {'title': _title(i)} ], label=str(i), ) steps.append(step) sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})] updatemenus = [dict( buttons=[dict( label='All', method='update', args=[ {'visible': [True] * (len(self.motions)*2+2)}, {'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'} ] )], type='buttons', x=1, y=0, xanchor='right', yanchor='top', pad={'t': 30}, )] fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus) if save: fig.write_html(f'{self.folder}/{self.name}.html') else: fig.show()
def reset_spectrum_matrix(self)
-
Reset the spectrum matrix, which produces the average spectrum in the boundary by multiplying factors vector. This method should be called everytime
Suite.motions
change.Outputs
Suite.spectrum_matrix
andSuite.target_vector
will be changed.Expand source code
def reset_spectrum_matrix(self): '''Reset the spectrum matrix, which produces the average spectrum in the boundary by multiplying factors vector. This method should be called everytime `Suite.motions` change. Outputs ------- `Suite.spectrum_matrix` and `Suite.target_vector` will be changed. ''' mat = np.zeros((len(self.period_samples), len(self.motions))) for i in range(len(self.motions)): spectrum = self.motions[i].spectrum.get(self.period_samples) mat[:, i] = spectrum / len(self.motions) self.spectrum_matrix = mat self.target_vector = self.target_spectrum.get(self.period_samples)
def save(self, separate_motion=True, rewrite=False)
-
Save the suite to a .suite file.
Parameters
separate_motion
: boolean
If True, the motion details are not included into the file.
User has to make sure the motions are not moved.rewrite
: boolean
If True, all the '.motion' files will be rewritten.
otherwise, the motion files will be ignored.Outputs
Create a .suite file to store the suite.
Expand source code
def save(self, separate_motion=True, rewrite=False): '''Save the suite to a .suite file. Parameters ---------- `separate_motion` : boolean If True, the motion details are not included into the file. User has to make sure the motions are not moved. `rewrite` : boolean If True, all the '.motion' files will be rewritten. otherwise, the motion files will be ignored. Outputs ------- Create a .suite file to store the suite. ''' with open(f'{self.folder}/{self.name}.suite', 'w') as f: f.write(self.to_json(separate_motion=separate_motion)) if separate_motion: for mo in self.motions: if not os.path.exists(f'{mo.folder}/{mo.name}.motion'): mo.save() print(f'Motion saved to {mo.folder}/{mo.name}.motion') else: if rewrite: mo.save() print(f'Motion {mo.folder}/{mo.name}.motion is written.') print(f'Saved to {self.folder}/{self.name}.suite')
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True)
-
Scale the motions both horizontally and vertically.
Parameters
new_name
: strnew_folder
: str, Nonefactor_x
,factor_y
: float
The factors to amplify in x axis and y axis.scale_target_spectrum
: boolean
Whether scale the target spectrum as well.Returns
suite
:Suite
Expand source code
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True): '''Scale the motions both horizontally and vertically. Parameters ---------- `new_name` : str `new_folder` : str, None `factor_x`, `factor_y` : float The factors to amplify in x axis and y axis. `scale_target_spectrum` : boolean Whether scale the target spectrum as well. Returns ------- `suite` : `Suite` ''' suite = self.copy() suite.name = new_name suite.set_folder(new_folder) for i, motion in enumerate(self.motions): suite.motions[i] = motion.scale( new_name=motion.name, factor_x=factor_x, factor_y=factor_y) if scale_target_spectrum: suite.target_spectrum = self.target_spectrum.scale( factor_x=factor_x, factor_y=factor_y) if factor_x != 1: suite.period_bound = [p * factor_x for p in self.period_bound] suite.period_samples = self.period_samples * factor_x if factor_y != 1: suite.factors = [f*factor_y for f in self.factors] suite.reset_spectrum_matrix() return suite
def set_folder(self, folder)
-
Reset the folder for the suite and the motions.
Parameters
folder
: strOutputs
Suite.folder
andSuite.motions
will be changed.Expand source code
def set_folder(self, folder): '''Reset the folder for the suite and the motions. Parameters ---------- `folder` : str Outputs ------- `Suite.folder` and `Suite.motions` will be changed. ''' if folder is not None: self.folder = folder os.makedirs(folder, exist_ok=True) for mo in self.motions: mo.folder = folder
def to_dict(self, separate_motion=True)
-
Convert all the properties to dict.
Parameters
separate_motion
: boolean
If True, the motion details are not included into the dict.
User has to make sure these motions are not moved.Returns
it
: dictExpand source code
def to_dict(self, separate_motion=True): '''Convert all the properties to dict. Parameters ---------- `separate_motion` : boolean If True, the motion details are not included into the dict. User has to make sure these motions are not moved. Returns ------- `it` : dict ''' return dict( folder=self.folder, name=self.name, motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [ f'{mo.folder}/{mo.name}' for mo in self.motions], factors=self.factors if type(self.factors[0] == float) else [ fact.item() for fact in self.factors], period_bound=self.period_bound, target_spectrum=self.target_spectrum.to_dict(), period_samples=self.period_samples.tolist(), srss=self.get_srss(), average_spectrum=self.get_average_spectrum().to_dict(), )
def to_json(self, separate_motion=True)
-
Convert the properties to json.
Parameters
Refer to
Suite.to_dict()
.Returns
json
: strExpand source code
def to_json(self, separate_motion=True): '''Convert the properties to json. Parameters ---------- Refer to `Suite.to_dict`. Returns ------- `json` : str ''' return json.dumps(self.to_dict(separate_motion=separate_motion))
def write_TCL(self, folder=None, filename=None)
-
Write the suite to TCL forms for OpenSees.
Currently the TCL contains only one dict. The keys of the dict is the motion names. Then the essential properties are stored.
At the same time, the accelerations for each motion is written to a text file.Parameters
folder
: str, None
The folder to write the tcl files. If is None, useSuite.folder
.filename
: str, None
The tcl file name. If is None, useSuite.name
.Outputs
A tcl file and a series of acceleration files.
Expand source code
def write_TCL(self, folder=None, filename=None): '''Write the suite to TCL forms for OpenSees. Currently the TCL contains only one dict. The keys of the dict is the motion names. Then the essential properties are stored. At the same time, the accelerations for each motion is written to a text file. Parameters ---------- `folder` : str, None The folder to write the tcl files. If is None, use `Suite.folder`. `filename` : str, None The tcl file name. If is None, use `Suite.name`. Outputs ------- A tcl file and a series of acceleration files. ''' folder = self.folder if folder is None else folder filename = self.name if filename is None else filename.replace( '.tcl', '') + '.tcl' os.makedirs(folder, exist_ok=True) config = '' config += 'dict set motion keys {' names = [mo.name for mo in self.motions] config += ' '.join(names) config += '}\n\n' for i, mo in enumerate(self.motions): mo.write_accel(f'{folder}/{mo.name}', time=False) config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n' config += f'dict set motion {mo.name} npts {len(mo.accel)}\n' config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n' config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n' with open(f'{folder}/{filename}.tcl', 'w') as f: f.write(config)