Module pyearthquake

Introduction

pyearthquake is a light-weight module with basic earthquake engineering utilities. The aim of this module is to help engineers play with earthquakes by python programming.

Features

  1. Convert .AT2 file downloaded from PEER database to nicely aligned form.
  2. Select and match ground motions to the target spectrum with Monte Carlo simulation and Least Square optimization.
  3. Beautify the ground motions by truncating, trimming, changing dt, and renaming.
  4. Store a suite of motions in a single file or separate files.
  5. Generate displacement spectrums and pesudo acceleration spectrums.
  6. Inspect a suite or a motion with nice html interactive plots, thanks to plotly.
  7. Create Code Spectrums. Currently support Chinese Code.

Fact Sheet

Version: 1.0
Author: Hanlin Dong
License: MIT
Last Update: 2020-04-08

Content Briefing

Currently, it has three main classes: Spectrum, Motion, and Suite

Spectrum : a class storing seismic spectrum.
Some static methods are created confining to Code Spectrums.

Motion : a class storing acceleration timehistory.
Motion has utility functions to nicely parse PEER motion file.
Motion also generates spectrums.

Suite : a class storing a suite of motions and their target spectrum.
Suite has utilities to load a PEER file folder.
Suite has optimization functions to select a bunch of earthquakes out of a large number of earthquakes with Monte Carlo simulation and bounded least square optimization. The advantage is that the shape of the accels is not changed unlike wavelet methods, but the average match result is still good.
Suite can write all the acceleration files seperately or integrally.
Suite can plot all the accelerations and spectrums interactively for users to inspect.
Suite can generate .tcl file to be used in OpenSees.

All the classes mentioned above can be stored as string file. Currently they are json files with ext: .spectrum, .motion, .suite. These files can be read by static methods in every class.

Change log:

2020-04-08 13:53:32 v1.0

  • Join the three classes to one single file, refactor some functions.
  • Unify some methods, and create good documentation.
  • Add plotly interactive plots.
Expand source code
'''
Introduction
============

`pyearthquake` is a light-weight module with basic earthquake engineering utilities.
The aim of this module is to help engineers play with earthquakes by python programming.

*Features*
--------

1. Convert .AT2 file downloaded from PEER database to nicely aligned form.  
2. Select and match ground motions to the target spectrum with Monte Carlo simulation and Least Square optimization.
3. Beautify the ground motions by truncating, trimming, changing dt, and renaming.
4. Store a suite of motions in a single file or separate files.
5. Generate displacement spectrums and pesudo acceleration spectrums.
6. Inspect a suite or a motion with nice html interactive plots, thanks to [plotly](https://plot.ly).  
7. Create Code Spectrums. Currently support Chinese Code.

Fact Sheet  
----------  
Version: 1.0   
Author: [Hanlin Dong](http://www.hanlindong.com)  
License: MIT  
Last Update: 2020-04-08   

Content Briefing
----------------

Currently, it has three main classes: `Spectrum`, `Motion`, and `Suite`

`Spectrum` : a class storing seismic spectrum.  
    Some static methods are created confining to Code Spectrums.

`Motion` : a class storing acceleration timehistory.  
    Motion has utility functions to nicely parse PEER motion file.  
    Motion also generates spectrums.

`Suite` : a class storing a suite of motions and their target spectrum.  
    Suite has utilities to load a PEER file folder.  
    Suite has optimization functions to select a bunch of earthquakes out of a large number of earthquakes
        with Monte Carlo simulation and bounded least square optimization.
        The advantage is that the shape of the accels is not changed unlike wavelet methods,
        but the average match result is still good.  
    Suite can write all the acceleration files seperately or integrally.   
    Suite can plot all the accelerations and spectrums interactively for users to inspect.  
    Suite can generate .tcl file to be used in OpenSees.  

All the classes mentioned above can be stored as string file.
    Currently they are json files with ext: .spectrum, .motion, .suite.
    These files can be read by static methods in every class.

Change log:
-----------
2020-04-08 13:53:32 v1.0  

- Join the three classes to one single file, refactor some functions.  
- Unify some methods, and create good documentation.
- Add plotly interactive plots.
'''

import os
import re
import json
import random
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.signal import lsim
from scipy.interpolate import interp1d
from scipy.optimize import least_squares


class Spectrum():
    '''A class storing a spectrum.  
    It can be init-ed simply with (periods, spectrum) sampling points.  
    It can also be init-ed by static functions to generate Code-confined spectrums.

    Parameters  
    ----------  
    `periods` : List, numpy.ndarray  
        List of sampling periods.  
        It can be generated by the static function `Spectrum.periods_standard` .
        Otherwise, users can specify desired period sampling points.  

    `spectrum` : List, numpy.ndarray  
        List of the spectrum points corresponding to sampling periods.  
    '''

    def __init__(self, periods, spectrum):
        self.periods = np.array(periods)
        '''numpy.ndarray, sampling periods'''
        self.spectrum = np.array(spectrum)
        '''numpy.ndarray, spectrum corresponding to sampling periods.'''

    def match(self, target, start, end, step=0.1):
        '''Match the current spectrum with a target spectrum. Use Least Square method.  

        Parameters  
        ----------  
        `target` : `Spectrum`  
            the spectrum to be matched.  

        `start`, `end` : float  
            the matching period boundaries.  

        `step` : float
            evenly sample the periods with the step.

        Returns
        -------
        `coefficient` : float  
            the coefficient to match the current spectrum to the target.

        `srss` : float   
            the srss after match.
        '''
        periods = np.arange(start, end+step, step)
        this_spectrum = np.array([self.get(p) for p in periods])
        that_spectrum = np.array([target.get(p) for p in periods])
        coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2)
        srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5
        return coefficient, srss

    def get(self, period):
        '''Get the spectrum value. Either a list or a number is accepted.  
        If the period exceeds the largest sampling period, the last spectrum value is returned.  
        The spectrum value not on sampling points are linearly interpolated.  

        Parameters
        ----------
        `period` : number, list, numpy.ndarray  
            The interested period.

        Returns
        -------
        `spectrum` : number, numpy.ndarray  
            if period is list-like, return a ndarray.
            if period is a number, return a number.
        '''
        if not hasattr(period, '__iter__'):
            i = self.periods.searchsorted(period)
            if self.periods[i] == period:
                return self.spectrum[i]
            elif i >= len(self.periods):
                print("Warning: given period exceeds the sampling range.")
                return self.spectrum[-1]
            else:
                return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period)
        else:
            return np.array([self.get(p) for p in period])

    def scale(self, factor_x=1, factor_y=1):
        '''Scale the spectrum.

        Parameters
        ----------
        `factor_x` : number  
            scaling factor in x direction (periods)  

        `factor_y` : number  
            scaling factor in y direction (spectrum)

        Returns
        -------
        `spectrum` : `Spectrum`
        '''
        return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y)

    def plot(self, filename=None, engine='pyplot'):
        '''Plot the spectrum.

        Parameters
        ----------
        `filename` : string, None  
            if None, show it. Else, save it to the filename  

        `engine` : 'pyplot' or 'plotly'  
            Plot engine.

        Outputs
        -------
        A file with name `filename`.png or `filename`.html will be generated.
        '''
        if engine == 'pyplot':
            plt.figure(figsize=(4, 3))
            plt.plot(self.periods, self.spectrum, marker='o')
            plt.xlabel('Period (s)')
            plt.ylabel('Spetrum')
            plt.xlim([0, self.periods[-1]])
            if filename is None:
                plt.show()
            else:
                plt.savefig(filename+'.png', dpi=200)
        elif engine == 'plotly':
            fig = go.Figure()
            fig.add_scatter(dict(
                x='self.periods',
                y='self.spectrum',
                mode='lines+markers',
            ))
            if filename is None:
                fig.show()
            else:
                fig.write_html(filename+'.html')
        else:
            print('ERROR: engine should be in "pyplot" or "plotly"')

    def to_dict(self):
        '''Convert the instance properties to a dict.  

        Returns
        -------
        `it` : dict
        '''
        return dict(
            periods=self.periods.tolist(),
            spectrum=self.spectrum.tolist()
        )

    def to_json(self):
        '''Convert the properties to json.  

        Returns
        -------
        `json` : string  
        '''
        return json.dumps(self.to_dict())

    def save(self, filename):
        '''Save the spectrum to a file. Currently use json form.  

        Parameters
        ----------
        `filename` : string  
            The file name to be saved.   
            The extension '.spectrum' will be automatically added.  

        Outputs
        -------
        A file with name `filename`.spectrum will be generated.
        '''
        with open(filename + '.spectrum', 'w') as f:
            f.write(self.to_json())

    def write_csv(self, filename):
        '''Write the spectrum to a csv file.  

        Parameters
        ----------
        `filename` : string  
            The extension '.csv' will be automatically added.

        Outputs
        -------
        A file named `filename`.csv will be generated.
        '''
        np.savetxt(filename+'.csv', np.vstack(
            [self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum')

    @staticmethod
    def read_csv(filename):
        '''Create an instance by reading the written csv file.  

        Parameters
        ----------
        `filename` : string  
            The filename to read.

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        if not filename.endswith('.csv'):
            filename += '.csv'
        data = np.loadtxt(filename, delimiter=',')
        return Spectrum(periods=data[:, 0], spectrum=data[:, 1])

    @staticmethod
    def read_dict(it):
        '''Create an instance from the dict.  

        Parameters
        ----------
        `it` : dict  
            The dict generated by `Spectrum.to_dict`

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        return Spectrum(periods=it['periods'], spectrum=it['spectrum'])

    @staticmethod
    def read_json(string):
        '''Create an instance from json string.  

        Parameters
        ----------
        `string` : str  
            The json string 

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        it = json.loads(string)
        return Spectrum.read_dict(it)

    @staticmethod
    def load(filename):
        '''Load the spectrum from file.  

        Parameters
        ----------
        `filename` : str  

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        if not filename.endswith('.spectrum'):
            filename += '.spectrum'
        with open(filename, 'r') as f:
            string = f.read()
        return Spectrum.read_json(string)

    @staticmethod
    def periods_standard(until=6):
        '''Get a standard sampling of the periods.  
        The sampling points are more sparse with the increase of period.  

        Parameters
        ----------  
        `until` : number  
            The end of the period sampling list. It should >= 4.0.

        Returns
        -------
        `periods` : numpy.ndarray  
            The standard sampling points.
        '''
        assert until >= 4
        return np.hstack([np.arange(0, 0.2, 0.01),
                          np.arange(0.2, 0.5, 0.02),
                          np.arange(0.5, 1.0, 0.05),
                          np.arange(1.0, 2.0, 0.1),
                          np.arange(2.0, 4.0, 0.2),
                          np.arange(4.0, until+0.5, 0.5)])

    @staticmethod
    def chinese_point(period, intensity, major, site, group, damping=0.05):
        '''Get spectrum point for Chinese code, given a period.  

        Parameters
        ----------  
        `period` : number  
            The given period.  

        `intensity` : number  
            Intensity represented by a number.   
            Use 7.5 or 8.5 for half-level intensity.  

        `major` : boolean
            If True, it's for major (rare) earthquake.  
            If False, for minor (frequent) earthquake.  

        `site` : int
            Site number (Roman number in the code. 0 for I0, 1 for I1.)  

        `group` : int  
            Site class (Chinese number in the code.)  

        `damping` : float  
            Spectrum damping.  

        Returns
        -------  
        `value` : float
            the specific spectrum point value corresponding to the period.  
        '''
        # alpha max
        if intensity == 6:
            alphamax = 0.28 if major else 0.04
        elif intensity == 7:
            alphamax = 0.50 if major else 0.08
        elif intensity == 7.5:
            alphamax = 0.72 if major else 0.12
        elif intensity == 8:
            alphamax = 0.9 if major else 0.16
        elif intensity == 8.5:
            alphamax = 1.2 if major else 0.24
        elif intensity == 9:
            alphamax = 1.40 if major else 0.32
        else:
            raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]")
        # tg
        tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65],
                    [0.25, 0.3, 0.4, 0.55, 0.75],
                    [0.3, 0.35, 0.45, 0.65, 0.9]]
        assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4]
        tg = tg_table[group-1][site]
        # other parameters
        gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping)
        eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping)
        eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping)
        # calculate
        if period < 0.1:
            return ((10*eta2 - 4.5) * period + 0.45) * alphamax
        elif period < tg:
            return eta2 * alphamax
        elif period < 5*tg:
            return (tg/period)**gamma * eta2 * alphamax
        elif period <= 6:
            return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax
        else:
            return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax

    @staticmethod
    def chinese(intensity=8, major=True, site=3, group=2, damping=0.05):
        '''Return a new spectrum instance that confines to Chinese Code.  
        Use `Spectrum.periods_standard` to get sampling periods.  

        Parameters
        ----------
        refer to `Spectrum.chinese_point`

        Returns
        -------
        `spectrum` : `Spectrum`
        '''
        periods = Spectrum.periods_standard(until=6)
        spectrum = [Spectrum.chinese_point(p, 
                                           intensity=intensity,
                                           major=major,
                                           site=site,
                                           group=group,
                                           damping=damping) for p in periods]
        return Spectrum(periods, spectrum)


class Motion(object):
    '''A class plays with ground motion. Store accel time history and accel spectrum only.  
    '''

    def __init__(self, folder, name, damping=0.05):
        '''
        Parameters
        ----------
        `folder` : str   
            the folder this motion is stored.

        `name` : str  
            The motion name. The motion will be saved to '{folder}/{name}.motion'

        `damping` : float  
            The damping ratio of the motion spectrum.  
        '''
        self.folder = folder
        '''str. See `Motion`.'''
        os.makedirs(folder, exist_ok=True)
        self.name = name
        '''str. See `Motion`.'''
        self.accel = None
        '''numpy.ndarray. Acceleration time history with evenly distributed sampling points.'''
        self.dt = None
        '''float. Acceleration time sampling delta t.'''
        self.information = ''
        '''str. Store massive infomation about the motion. '''
        self.spectrum = None
        '''`Spectrum`. The spectrum of the motion.'''
        self.damping = damping
        '''float. The damping ratio of the spectrum. Note: init with 0.05.'''

    def get_PGA(self):
        '''
        Returns
        -------
        `value` : float  
            The peak ground acceleration.  
        '''
        return np.max(abs(self.accel))

    def get_duration(self):
        '''
        Returns
        -------
        `value` : float  
            The duration of the motion.  
        '''
        return (len(self.accel) - 1) * self.dt

    def get_times(self):
        '''
        Returns
        -------
        `value` : numpy.ndarray  
            The time series of the motion  
        '''
        return np.arange(len(self.accel)) * self.dt

    def generate_spectrum(self, periods=None):
        '''Use scipy LTS signal processing module to generate the spectrum.  
        Note that the accel spectrum is pseudo-spectrum derived from disp spectrum.  
        So it's only accurate when damping is small.  
        If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated, 
            a blank spectrum will be returned.  

        Parameters
        ----------
        `periods` : List-like, None    
            The period sampling points.   
            If is None, `Spectrum.periods_standard` will be called.

        Returns
        -------
        `spectrum` : `Spectrum`  
            The generated spectrum.

        Outputs
        -------
        `Motion.spectrum` will be changed.
        '''
        if periods is None:
            periods = Spectrum.periods_standard()
        if self.dt is not None:
            print("Generating spectrum ...")
            omegas = 2 * np.pi / periods[1:]
            num = np.array([-1])
            spectrum = np.zeros(len(periods))
            spectrum[0] = self.get_PGA()
            for i, omega in enumerate(omegas):
                den = np.array([1, 2*self.damping*omega, omega**2])
                _, y, _ = lsim((num, den), self.accel, self.get_times())
                spectrum[i+1] = np.max(abs(y)) * omega ** 2
            self.spectrum = Spectrum(periods, spectrum)
        else:
            print("Dt is not known. Assigning empty spectrum.")
            self.spectrum = Spectrum(periods=[], spectrum=[])
        return self.spectrum

    def load_peer(self, peer_filename):
        '''Load acceleration time-history files downloaded from PEER.  
        The strings in the file will be parsed and also stored in self.information.  

        Parameters
        ----------
        `peer_filename` : str

        Outputs
        -------
        `Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed.  
        '''
        self.information += f'peer_filename: {peer_filename}'
        try:
            f = open(peer_filename, 'r')
            for _ in range(3):
                self.information += f.readline()
            s = f.readline()
            self.information += s
            re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC")
            mt = re_dt.match(s)
            try:
                self.dt = float(mt.group(1).replace('.', '0.'))
                print(f'Extracted dt = {self.dt:.4f}')
            except:
                print(f'ERROR: cannot find dt from {s}')
                self.dt = 0.000001
            series = []
            datastring = f.readlines()
            for dtline in datastring:
                for dt in dtline.split("  "):
                    if dt:
                        try:
                            series.append(float(dt.replace(".", "0.")))
                        except ValueError:
                            break
            self.accel = np.array(series)
            print(f'Extracted npts = {len(self.accel)}')
        except IOError:
            print(f'No file named {self.name} is found.')

    def convert_peer(self, peer_filename):
        '''A recipe to convert the peer acceleration file to the .motion file.  

        Parameters
        ----------
        `peer_filename` : str  

        `folder` : str  
            the folder to save .motion file.

        Outputs
        -------
        A .motion file will be created in `folder` with the PEER filename.
        '''
        print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}')
        self.load_peer(peer_filename)
        self.generate_spectrum()
        self.save()

    def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None):
        '''Trim the motion with two coefficients of PGA.  
        This method is used when a long near-zero time-history is stored in the data file.  

        Parameters
        ----------
        `new_name` : str  

        `new_folder` : str, None  

        `left`, `right` : float 
            The coefficient of PGA to trim the time history.
            Search from left or right, when the target point is found,
            search for the nearest near-zero point. Then trim off the small values.  


        `prepend_zero` : boolean  
            If True, prepend a zero value to the trimmed acceleration.

        `log` : boolean  
            If the log picture will be generated.

        `log_folder` : str, None
            Path for the folder to save the trim logs.
            If None, save in the previous folder.

        Returns
        -------
        `motion` : `Motion`  
            The trimmed motion.

        Outputs
        -------
        If log, plot a figure in log_folder named '{new_name}_trimlog.png'
        '''
        mo = self.copy()
        mo.name = new_name
        mo.folder = new_folder if new_folder is not None else self.folder
        pga = self.get_PGA()
        # search left
        i = 0
        while abs(self.accel[i]) < pga * left:
            i += 1
        while i > 0 and self.accel[i] * self.accel[i-1] > 0:
            i -= 1
        i_left = i
        # search right
        i = -1
        while abs(self.accel[i]) < pga * right:
            i -= 1
        while i < -1 and self.accel[i] * self.accel[i+1] > 0:
            i += 1
        i_right = len(self.accel) + i
        if prepend_zero:
            mo.accel = np.hstack([0, self.accel[i_left:i_right]])
        else:
            mo.accel = self.accel[i_left:i_right]
        mo.generate_spectrum()
        print(
            f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
        if log:
            log_folder = self.folder if log_folder is None else log_folder
            os.makedirs(log_folder, exist_ok=True)
            _, ax = plt.subplots(3, 1, figsize=(7, 5))
            ax[0].plot(self.get_times(), self.accel, label='before')
            ax[1].plot(mo.get_times(), mo.accel, label='after')
            ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
            ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
            for i in range(3):
                ax[i].legend()
            plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
            plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200)
        return mo

    def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None):
        '''Change dt for the accel. Linearly interpolate the acceleration, and re-sample.
        Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum.  

        Parameters
        ----------
        `dt` : float  
            New sampling delta t. 

        `new_name` : str  

        `new_folder` : str, None  

        `log` : boolean  
            If log, create a log picture comparing the before and after accels and spectrum.

        `log_folder` : str, None  
            Path for the folder to save the trim logs.
            If None, save in the previous folder.

        Returns
        -------
        `motion` : `Motion`  

        Outputs
        -------
        If log, create a picture named '{log_folder}/{new_name}_dtlog.png'.
        '''
        if dt == self.dt:
            print(f'Motion {self.name} do not need to change dt')
            return self.copy()
        else:
            mo = self.copy()
            mo.name = new_name
            mo.folder = new_folder if new_folder is not None else self.folder
            f = interp1d(self.get_times(), self.accel)
            npts = np.floor(self.dt * len(self.accel) / dt)
            mo.accel = f(np.arange(npts)*dt)
            mo.dt = dt
            mo.generate_spectrum()
            print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
        if log:
            log_folder = self.folder if log_folder is None else log_folder
            os.makedirs(log_folder, exist_ok=True)
            _, ax = plt.subplots(3, 1, figsize=(7, 5))
            ax[0].plot(self.get_times(), self.accel, label='before')
            ax[1].plot(mo.get_times(), mo.accel, label='after')
            ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
            ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
            for i in range(3):
                ax[i].legend()
            plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
            plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200)
        return mo

    def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None):
        '''Truncate the acceleration. 
        Sometimes, a ground motion file may have two peaks or more. 
        So if only one peak is needed, this method can be used to skip the other peaks.  

        Parameters
        ----------
        `time` : float
            The accels after the time will be cut off.  

        `new_name` : str

        `new_folder` : str, None  

        `log` : boolean  

        `log_folder` : str, None  

        Returns
        -------
        `motion` : `Motion`

        Outputs
        -------
        If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created.
        '''
        i = round(time / self.dt)
        mo = self.copy()
        mo.name = new_name
        mo.folder = new_folder if new_folder is not None else self.folder
        if i >= len(self.accel):
            print(f"WARNING: time is longer than duration. {self.name} is not truncated.")
            return mo
        mo.accel = self.accel[:i]
        mo.generate_spectrum()
        print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
        if log:
            log_folder = self.folder if log_folder is None else log_folder
            os.makedirs(log_folder, exist_ok=True)
            _, ax = plt.subplots(3, 1, figsize=(7, 5))
            ax[0].plot(self.get_times(), self.accel, label='before')
            ax[1].plot(mo.get_times(), mo.accel, label='after')
            ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
            ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
            for i in range(3):
                ax[i].legend()
            plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
            plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200)
        return mo

    def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1):
        '''Scale the ground motion.  

        Parameters
        ----------
        `new_name` : str

        `new_folder` : str, None

        `factor_x` : float  
            The scaling factor on the time/period axis.  

        `factor_y` : float
            The scaling factor on the accel axis.

        Returns
        -------
        `motion` : Motion
        '''
        mo = self.copy()
        mo.name = new_name
        mo.folder = new_folder if new_folder is not None else self.folder
        if factor_x != 1:
            mo.dt = self.dt * factor_x
        if factor_y != 1:
            mo.accel = self.accel * factor_y
        mo.spectrum = self.spectrum.scale(factor_x, factor_y)
        return mo

    def plot_timehistory(self, folder=None, save=True, engine='pyplot'):
        '''Plot the time history of the acceleration.

        Parameters
        ----------
        `folder` : str, None  
            Store folder. If None, use `Motion.folder` 

        `save` : boolean
            Save the figure or not. 

        `engine` : 'pyplot' or 'plotly'  

        Outputs
        -------
        if save is True, create a figure '{folder}/{self.name}_th'.
        '''
        folder = self.folder if folder is None else folder
        if engine == 'pyplot':
            plt.figure()
            t = self.get_times()
            plt.plot(t, self.accel)
            if save:
                plt.savefig(f'{folder}/{self.name}_th.png', dpi=200)
            else:
                plt.show()
        elif engine == 'plotly':
            fig = go.Figure()
            fig.add_trace(go.Scatter(
                x=self.get_times(),
                y=self.accel,
                mode='lines',
                name=self.name
            ))
            if save:
                fig.write_html(f'{folder}/{self.name}_th.html')
            else:
                fig.show()
        else:
            print('ERROR: engine should be in "pyplot" or "plotly"')

    def plot(self, folder=None, save=True, engine='pyplot'):
        '''Plot the time history and the spectrum.

        Parameters
        ----------
        See `Motion.plot_timehistory`

        Outputs
        -------
        Create a figure named '{folder}/{name}'
        '''
        folder = self.folder if folder is None else folder
        if engine == 'pyplot':
            _, ax = plt.subplots(2, 1, figsize=(7, 8))
            ax[0].plot(self.get_times(), self.accel)
            ax[0].set_xlabel('Time(s)')
            ax[0].set_ylabel('Acceleration(g)')
            ax[0].set_title(f'PGA={self.get_PGA():.4f}')
            ax[1].plot(self.spectrum.periods, self.spectrum.spectrum)
            ax[1].set_xlabel('Period(s)')
            ax[1].set_ylabel('Spectrum Acceleration(g)')
            ax[1].set_title(f'damping={self.damping:.4f}')
            plt.subplots_adjust(hspace=0.3)
            if save:
                plt.savefig(f'{folder}/{self.name}.png', dpi=200)
            else:
                plt.show()
        elif engine == 'plotly':
            fig = make_subplots(rows=1, cols=2)
            fig.add_trace(go.Scatter(
                x=self.get_times(),
                y=self.accel,
                mode='lines',
                name='accel'
            ), row=1, col=2)
            fig.add_trace(go.Scatter(
                x=self.spectrum.periods,
                y=self.spectrum.spectrum,
                mode='lines',
                name='spectrum'
            ), row=1, col=1)
            if save:
                fig.write_html(f'{folder}/{self.name}.html')
            else:
                fig.show()
        else:
            print('Engine should be in "pyplot" or "plotly"')

    def write_accel(self, filename, time=True):
        '''Write the acceleration timehistory to a single file.  

        Parameters
        ----------
        `filename` : str  
            The file path to store the accelration.  

        `time` : boolean  
            If True, a time column is created.
            To know dt in the file content, time should be True.
            Otherwise, dt should be stored in the filename.

        Outputs
        -------
        An text file with acceleration information will be written.
        '''
        if time:
            data = np.vstack([self.get_times(), self.accel])
            np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel")
        else:
            np.savetxt(filename, self.accel, fmt="%.7e")

    def write_info(self, filename):
        '''Write the information to a single file.  

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        A text file with information will be created.
        '''
        with open(filename, 'w') as f:
            f.write(f'dt={self.dt:.3}\n')
            f.write(f'npts={len(self.accel)}\n')
            f.write(f'damping={self.damping:.3}\n')
            f.write(self.information)

    def write_spectrum(self, filename):
        '''Write the spectrum to a single file. Use the form of csv.

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        A csv file will be created.
        '''
        self.spectrum.write_csv(filename)

    def read_accel(self, filename, dt=True):
        '''Load an single-column acceleration file to the current instance.

        Parameters
        ----------
        `dt` : float, boolean
            if dt is float, `Motion.dt` will be set to dt.
            Else if dt is True, `Motion.dt` will be found in the file.

        Outputs
        -------
        Change `Motion.dt` and `Motion.accel` in the CURRENT instance.
        '''
        if not isinstance(dt, float):
            data = np.loadtxt(filename)
            self.accel = data[:, 1]
            self.dt = data[1, 0]
        else:
            self.accel = np.loadtxt(filename)
            self.dt = dt

    def read_spectrum(self, filename):
        '''Read the spectrum from csv file.  

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        The `Motion.spectrum` of the CURRENT instance will be changed.
        '''
        spectrum = Spectrum.read_csv(filename)
        self.spectrum = spectrum

    def read_info(self, filename):
        '''Read info from text file.  

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        The `Motion.information` of the CURRENT instance will be changed.
        '''
        with open(filename, 'r') as f:
            self.dt = float(f.readline().replace("dt=", ""))
            _ = int(f.readline().replace("npts=", ""))
            self.damping = float(f.readline().replace("damping=", ""))
            self.information = f.read()

    def to_dict(self):
        '''Convert the instance properties to dict.  

        Returns
        -------
        `it` : dict
        '''
        return dict(
            dt=self.dt,
            npts=len(self.accel),
            damping=self.damping,
            information=self.information,
            accel=self.accel.tolist(),
            spectrum=self.spectrum.to_dict(),
            folder=self.folder,
            name=self.name,
        )

    def to_json(self):
        '''Convert the instance properties to string using json format.  

        Returns
        -------
        `string` : str
        '''
        return json.dumps(self.to_dict())

    def save(self):
        '''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name`  

        Outputs
        -------
        A file named '{self.folder}/{self.name}.motion' will be created.
        '''
        with open(f'{self.folder}/{self.name}.motion', 'w') as f:
            f.write(self.to_json())

    @staticmethod
    def read_dict(it):
        '''Create a instance by the dict generated by `Motion.to_dict`.  

        Parameters
        ----------
        `it` : dict

        Returns
        -------
        `mo` : `Motion`
        '''
        mo = Motion(folder=it['folder'], name=it['name'])
        mo.dt = it['dt']
        mo.damping = it['damping']
        mo.information = it['information']
        mo.accel = np.array(it['accel'])
        mo.spectrum = Spectrum.read_dict(it['spectrum'])
        return mo

    @staticmethod
    def read_json(string):
        '''Create a new instance by the string generated by `Motion.to_json`.  

        Parameters
        ----------
        `string` : str

        Returns
        -------
        `mo` : `Motion`
        '''
        it = json.loads(string)
        return Motion.read_dict(it)

    @staticmethod
    def load(filename):
        '''Load the file created from `Motion.save` method.  

        Parameters
        ----------
        `filename` : str

        Returns
        -------
        `mo` : `Motion`
        '''
        if not filename.endswith('.motion'):
            filename += '.motion'
        with open(filename, 'r') as f:
            return Motion.read_json(f.read())

    def copy(self):
        '''Copy the instance.

        Returns
        -------
        `mo` : `Motion`
        '''
        return Motion.read_dict(self.to_dict())


class Suite(object):
    '''A class that contains mainly a series of `Motion` instances, 
    and corresponding series of factors, matching the suite of motions to the target `Spectrum`.  
    '''

    def __init__(self, folder, name, target_spectrum, period_bound):
        '''
        Parameters
        ----------
        `folder` : str  
            A path-like string for the suite.

        `name` : str
            A specific name for the suite. The instance will be saved in '{folder}/{name}.suite'  

        `target_spectrum` : `Spectrum`  
            The target spectrum of the suite.  

        `period_bound` : List<float> len=2  
            The matching period boundaries.
        '''
        self.folder = folder
        '''str. See `Suite`'''
        os.makedirs(folder, exist_ok=True)
        self.name = name
        '''str. See `Suite`'''
        self.motions = []
        '''List<`Motion`>. Store all the motions.'''
        self.factors = []
        '''List<float>. Store all the factors that should be applied to the motions.'''
        self.target_spectrum = target_spectrum
        '''`Spectrum`. See `Suite`'''
        self.period_bound = period_bound
        '''List<float>. See `Suite`'''
        self.period_samples = np.linspace(period_bound[0], period_bound[1], 200)
        '''numpy.ndarray. The period within the `period_bound` is evenly sampled, with 200 points.'''
        self.spectrum_matrix = None
        '''numpy.ndarray. A matrix with shape (len(`Suite.period_samples`), len(`Suite.motions`)).
        This matrix times the `Suite.factors` vector products the average spectrum with `Suite.period_samples`.'''
        self.target_vector = target_spectrum.get(self.period_samples)
        '''numpy.ndarray. The vector generated by the target_spectrum sampling the `Suite.period_samples`'''

    def set_folder(self, folder):
        '''Reset the folder for the suite and the motions.

        Parameters
        ----------
        `folder` : str

        Outputs
        -------
        `Suite.folder` and `Suite.motions` will be changed.
        '''
        if folder is not None:
            self.folder = folder
            os.makedirs(folder, exist_ok=True)
            for mo in self.motions:
                mo.folder = folder

    def add_motion(self, motion, factor=1):
        '''Add motion to the suite together with a factor, to keep the lengths are identical.

        Parameters
        ----------
        `motion` : `Motion`

        `factor` : number

        Outputs
        -------
        `Suite.motions` and `Suite.factors` will be changed.
        '''
        self.motions.append(motion)
        self.factors.append(factor)

    def load_motions_from_names(self, names, folder=None):
        '''Give a list of name and the folder, load motions.

        Parameters
        ----------
        `names` : List<str>  
            The list of motion names.

        `folder` : str, None  
            The folder of the motions.  
            If is None, use `Suite.folder`

        Outputs
        -------
        `Suite.motions` and `Suite.factors` will be changed.
        '''
        folder = self.folder if folder is None else folder
        for name in names:
            mo = Motion.load(f'{folder}/{name}.motion')
            self.add_motion(mo)
            print(f'Loaded motion {mo.name}')
        self.reset_spectrum_matrix()

    def load_motions_from_folder(self, folder=None):
        '''Find all the .motion file from the given folder, and load them.  

        Parameters
        ----------
        `folder` : str, None   
            The folder to search the .motion files.  
            If is None, use `Suite.folder` instead.  

        Outputs
        -------
        `Suite.motions` and `Suite.factors` will be changed.
        '''
        folder = self.folder if folder is None else folder
        filenames = os.listdir(self.folder)
        for filename in filenames:
            if filename.endswith('.motion'):
                mo = Motion.load(f'{folder}/{filename}')
                self.add_motion(mo)
                print(f'Loadad motion {mo.name}')
        self.reset_spectrum_matrix()

    def filter_by_IDs(self, ids, new_name, new_folder=None):
        '''Give a series of selected IDs, filter the motions.  

        Parameters
        ----------
        `ids` : List<int>  
            The list of indexes of the selected motions in the current suite.  

        `new_name` : str  

        `new_folder` : str, None

        Returns
        -------
        `suite` : `Suite`  
            A new suite with the selected motions and their factors only.  
        '''
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        suite.motions = [self.motions[i] for i in ids]
        suite.factors = [self.factors[i] for i in ids]
        suite.reset_spectrum_matrix()
        print(f'The motions have been filtered. {len(ids)} motions remain.')
        return suite

    def filter_by_file(self, filename, new_name, new_folder=None):
        '''Give a file with indexes selected.  

        Parameters
        ----------
        Refer to `Suite.filter_by_IDs`.  

        Returns
        -------
        Refer to `Suite.filter_by_IDs`.  
        '''
        with open(filename, 'r') as f:
            lines = f.readlines()
        ids = [int(l) for l in lines]
        return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder)

    def get_average_spectrum(self):
        '''Return a spectrum which is the average spectrum of the suite.

        Outputs
        -------
        `spectrum` : `Spectrum`  
        '''
        periods = self.motions[0].spectrum.periods
        average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0]
        for i in range(1, len(self.motions)):
            average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i]
        average_spectrum /= len(self.motions)
        return Spectrum(periods, average_spectrum)

    def get_srss(self):
        '''Get the srss of the current averate spectrum to the target spectrum in the period boundary.

        Returns
        -------
        `srss` : float
        '''
        return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item()

    def reset_spectrum_matrix(self):
        '''Reset the spectrum matrix, which produces the average spectrum 
        in the boundary by multiplying factors vector.
        This method should be called everytime `Suite.motions` change.

        Outputs
        -------
        `Suite.spectrum_matrix` and `Suite.target_vector` will be changed.  
        '''
        mat = np.zeros((len(self.period_samples), len(self.motions)))
        for i in range(len(self.motions)):
            spectrum = self.motions[i].spectrum.get(self.period_samples)
            mat[:, i] = spectrum / len(self.motions)
        self.spectrum_matrix = mat
        self.target_vector = self.target_spectrum.get(self.period_samples)

    def eliminate_neg(self):
        '''Eliminate negative target vector by multiplying a factor for every individual.
        In other words, make the average spectrum above the target spectrum.

        Returns
        -------
        `factor` : float  
            The global factor that should be applied to each of the factors.  

        Outputs
        -------
        `Suite.factors` are all amplified by the returned `factor`.
        '''
        error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector
        argmin = np.argmin(error)
        factor = 1 / (1 + error[argmin] / self.target_vector[argmin])
        print(f'Multiply {factor:.4} to eliminate negative spectrum.')
        self.factors = [self.factors[i] * factor for i in range(len(self.factors))]
        return factor

    def match_individual_LSQ(self):
        '''Use Least square method for each individual motion to match the spectrum.

        Returns
        -------
        `residuals` : numpy.ndarray  
            srss of each motion matched to the target spectrum.  

        Outputs
        -------
        `Suite.factors` are changed.
        '''
        residuals = np.zeros(len(self.motions))
        for i in range(len(self.motions)):
            this = self.spectrum_matrix[:, i] * len(self.motions)
            that = self.target_vector
            factor = np.dot(this, that) / np.dot(this, this)
            residual = np.linalg.norm(this * factor - that)
            self.factors[i] = factor
            residuals[i] = residual
        print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}")
        return residuals

    def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000):
        '''Use Monte Carlo simulation and Least Square method to optimize the suite.
        after running, a bunch of suites will be written to the new_folder.

        Parameters  
        ----------
        `count` : int  
            The number of motions in the resulting suite.  

        `times` : int  
            The number of times that MC simulation with dimension is run.  
            The total number of samples of MC simulation will be {times} * {dimension}.  

        `use_lsq` : boolean  
            Whether use LSQ optimizer to better optimize the suite.  

        `output_count` : int  
            The number of outputs. if 0, output all.  

        `new_folder` : str 
        `lower_bound`, `upper_bound` : float  
            For the purpose that the individual motions are not too far from the individually matched factors,
            the lower and upper boundaries of amplifying the individually matched factors are set.  
            This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer.  

        `dimension` : int  
            Each time the Monte Carlo simulation is run, 
            means that {dimension} times of simulation have been done by using a matrix target.

        Returns
        -------
        List<`Suite`> with len=`output_count`.  
        The list is sorted. The best answer is the first result. However, visual check should be done.

        Outputs
        -------
        Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted.
        '''
        new_folder = new_folder if new_folder is not None else self.folder
        os.makedirs(new_folder, exist_ok=True)
        suites = []
        srsses = []
        for i in range(times):
            suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder)
            suites.append(suite_new)
            srsses.append(suite_new.get_srss())
            if use_lsq:
                suite_new2 = suite_new.optimize_LSQ(
                    new_name=f'lsq{i:03d}', 
                    new_folder=new_folder, 
                    lower_bound=lower_bound, 
                    upper_bound=upper_bound
                )
                suites.append(suite_new2)
                srsses.append(suite_new2.get_srss())
        argsorted = np.argsort(np.array(srsses))
        results = [suites[i] for i in argsorted[:output_count]]
        for i, suite in enumerate(results):
            suite.name = f'no{i}-{suite.name}'
            suite.save()
            suite.plot_all_spectrums(engine='pyplot')
        return results

    def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000):
        '''Use Monte Carlo simulation to filter the suite to count numbers of motions.

        Parameters
        ---------- 
        Refer to `Suite.filter_optimize`.  

        Returns
        -------
        `suite` : `Suite`   
            The optimized result after {dimension} times of MC simulations.
        '''
        print(f'Running Monte Carlo simulation, dimension={dimension}')
        self.match_individual_LSQ()
        factors = np.array(self.factors)
        xs = np.zeros((len(self.motions), dimension))
        for i in range(dimension):
            indexes = random.sample(range(len(self.motions)), count)
            xs[indexes, i] = factors[indexes]
        mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs)
        loss = mul - self.target_vector.reshape(-1, 1)
        argmin = np.argmin(loss, axis=0)
        lossmin = np.min(loss, axis=0)
        target_at_lossmin = self.target_vector[argmin]
        amps = 1 / (lossmin / target_at_lossmin + 1)
        lossAmp = mul * amps - self.target_vector.reshape(-1, 1)
        srss = np.linalg.norm(lossAmp, axis=0)
        arg = np.argmin(srss)
        factors = xs[:, arg] * amps[arg]
        suvives = np.where(factors > 1.0e-3)
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        suite.motions = [self.motions[i] for i in suvives[0]]
        suite.factors = [factors[i] for i in suvives[0]]
        suite.reset_spectrum_matrix()
        return suite

    def _loss(self, x):
        '''Utility function: calculate the loss for optimization.'''
        factors = x * np.array(self.factors)
        mul = np.matmul(self.spectrum_matrix, factors)
        loss = mul - self.target_vector
        argmin = np.argmin(loss)
        amp = 1 / (loss[argmin] / self.target_vector[argmin] + 1)
        return mul * amp - self.target_vector

    def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4):
        '''Use Least Square method from scipy to optimize the factors.

        Parameters
        ----------
        Refer to `Suite.filter_optimize`.   

        Returns
        -------
        `suite` : `Suite`  
            Optimized suite.
        '''
        print('Running Bounded Least Square optimization ...')
        res = least_squares(self._loss,
                            np.ones(len(self.motions)),
                            bounds=(lower_bound, upper_bound),
                            verbose=0)
        factors = res.x * self.factors
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        suite.factors = factors.tolist()
        suite.eliminate_neg()
        return suite

    def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False):
        '''From a folder load all the PEER motions into the suite.  
        Extract the downloaded peer zip file first.

        Parameters
        ----------
        `peer_folder` : str  
            The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found.

        `h1`, `h2`, `v` : boolean  
            Whether load the three directional files or not.  

        Outputs
        -------
        `Suite.motions`, `Suite.factors` will be changed.  
        All the PEER motions will be saved in .motion format in the new folder.  
        '''
        with open(f'{peer_folder}/_SearchResults.csv', "r") as f:
            for _ in range(34):
                f.readline()
            i = 0
            positions = []
            if h1:
                positions.append(19)
            if h2:
                positions.append(20)
            if v:
                positions.append(21)
            line = f.readline()
            while line != '\n':
                cells = line.split(',')
                for pos in positions:
                    filename = cells[pos].replace(' ', '')
                    motion_name = filename.replace('.AT2', '')
                    print(f'Loading {motion_name}')
                    mo = Motion(folder=self.folder, name=motion_name)
                    mo.load_peer(peer_filename=f'{peer_folder}/{filename}')
                    mo.generate_spectrum()
                    mo.save()
                    self.add_motion(mo)
                    i += 1
                line = f.readline()
        self.reset_spectrum_matrix()
        print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.")
        return True

    def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True):
        '''Beautify the suite in 4 steps:  

        1. truncate the motions, see `Motion.truncate`  
        2. trim the near-zero parts, see `Motion.trim` 
        3. change dt to uniform, see `Motion.change_dt`
        4. rename the motions to a series.  

        This procedure is always logged to the current folder.  

        Parameters
        ----------
        `new_name` : str  

        `run_steps` : List<int>  
            The step number to run. Users can select which one out of the 4 steps to run.
            The order will also follow this List.  

        `dt` : float  
            The uniform delta t.   

        `new_folder` : str  

        `trunc_dict` : dict  
            A dict that defines the `time` parameter in `Motion.truncate`  
            In the dict, keys are the indexes of the motions in the suite, values are the `time` values.

        `left`, `right` : float  
            The parameters of PGA to trim. Refer to `Motion.trim`.  

        Returns
        -------
        `suite` : `Suite`

        Outputs
        -------
        All the logs and the pre- and post- beautify average spectrum.
        '''
        suite = self.copy()
        spectrum_hist = suite.get_average_spectrum()
        suite.plot_all_spectrums(engine='pyplot')
        for i, motion in enumerate(self.motions):
            mo = motion.copy()
            mo.name = f'{self.name}-{i}-b'
            for step in run_steps:
                if step == 1:
                    if i in trunc_dict:
                        mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1')
                elif step == 2:
                    mo = mo.trim(new_name=mo.name+'2', left=left, right=right)
                elif step == 3:
                    mo = mo.change_dt(dt=dt, new_name=mo.name+'3')
                elif step == 4:
                    mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}'
                else:
                    print(f'step={step} is not recognized and ignored.')
            suite.motions[i] = mo
        spectrum_new = suite.get_average_spectrum()
        plt.figure()
        plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before')
        plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after')
        plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target')
        plt.legend()
        plt.suptitle(f'Beautify: {self.name} -> {new_name}')
        plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200)
        suite.name = new_name 
        if new_folder is not None:
            suite.set_folder(new_folder)
        return suite

    def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True):
        '''Scale the motions both horizontally and vertically.  

        Parameters
        ----------
        `new_name` : str  

        `new_folder` : str, None  

        `factor_x`, `factor_y` : float  
            The factors to amplify in x axis and y axis.  

        `scale_target_spectrum` : boolean  
            Whether scale the target spectrum as well.

        Returns
        -------
        `suite` : `Suite`
        '''
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        for i, motion in enumerate(self.motions):
            suite.motions[i] = motion.scale(
                new_name=motion.name, factor_x=factor_x, factor_y=factor_y)
        if scale_target_spectrum:
            suite.target_spectrum = self.target_spectrum.scale(
                factor_x=factor_x, factor_y=factor_y)
        if factor_x != 1:
            suite.period_bound = [p * factor_x for p in self.period_bound]
            suite.period_samples = self.period_samples * factor_x
        if factor_y != 1:
            suite.factors = [f*factor_y for f in self.factors]
        suite.reset_spectrum_matrix()
        return suite

    def write_TCL(self, folder=None, filename=None):
        '''Write the suite to TCL forms for OpenSees.  
        Currently the TCL contains only one dict. 
        The keys of the dict is the motion names. Then the essential properties are stored.  
        At the same time, the accelerations for each motion is written to a text file.

        Parameters
        ----------
        `folder` : str, None  
            The folder to write the tcl files.
            If is None, use `Suite.folder`.

        `filename` :  str, None  
            The tcl file name. If is None, use `Suite.name`.

        Outputs
        -------
        A tcl file and a series of acceleration files.
        '''
        folder = self.folder if folder is None else folder
        filename = self.name if filename is None else filename.replace(
            '.tcl', '') + '.tcl'
        os.makedirs(folder, exist_ok=True)
        config = ''
        config += 'dict set motion keys {'
        names = [mo.name for mo in self.motions]
        config += ' '.join(names)
        config += '}\n\n'
        for i, mo in enumerate(self.motions):
            mo.write_accel(f'{folder}/{mo.name}', time=False)
            config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n'
            config += f'dict set motion {mo.name} npts {len(mo.accel)}\n'
            config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n'
            config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n'
        with open(f'{folder}/{filename}.tcl', 'w') as f:
            f.write(config)

    def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'):
        '''Plot amplified accelograms to a single file.  

        Parameters
        ----------
        `folder` : str, None  
            If is None, use `Suite.folder`.

        `filename` : str, None  
            If is None, use `Suite.name` + '_accel'

        `save` : boolean  
            If True, save the file. Else, show immediately.  

        `engine` : 'pyplot' or 'plotly'  

        Outputs
        -------
        A figure is plotted.
        '''
        folder = self.folder if folder is None else folder
        if engine == 'plotly':
            filename = f'{self.name}_accel.html' if filename is None else filename + '.html'
            fig = go.Figure()
            for i, motion in enumerate(self.motions):
                fig.add_trace(go.Scatter(
                    x=np.arange(len(motion.accels)) * motion.dt,
                    y=motion.accels,
                    mode='lines',
                    name=str(i) + motion.name,
                    text=motion.name
                ))
            if save:
                fig.write_html(f'{folder}/{filename}')
            else:
                fig.show()
        elif engine == 'pyplot':
            print('Pyplot is not supported.')
        else:
            print('ERROR: engine should be in "pyplot" and "plotly"')

    def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'):
        '''Plot all amplified spectrums in one file.  

        Parameters
        ----------
        Refer to `Suite.plot_all_accels`. 

        The filename if is None will be `Suite.name` + '_spectrum'  

        Outputs
        -------
        A figure will be generated.
        '''
        if engine == 'plotly':
            folder = self.folder if folder is None else folder
            filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html'
            fig = go.Figure()
            for i, motion in enumerate(self.motions):
                fig.add_trace(go.Scatter(
                    x=motion.spectrum.periods,
                    y=motion.spectrum.spectrum * self.factors[i],
                    mode='lines',
                    name=motion.name,
                    text=motion.name
                ))
            spectrum1 = self.target_spectrum
            spectrum2 = self.get_average_spectrum()
            fig.add_trace(go.Scatter(
                x=spectrum1.periods,
                y=spectrum1.spectrum,
                mode='lines',
                name='Target',
                text='Target',
                line=dict(width=5),
            ))
            fig.add_trace(go.Scatter(
                x=spectrum2.periods,
                y=spectrum2.spectrum,
                mode='lines',
                name='Average',
                text='Average',
                line=dict(width=5),
            ))
            if save:
                fig.write_html(f'{folder}/{filename}')
            else:
                fig.show()
        elif engine == 'pyplot':
            folder = self.folder if folder is None else folder
            filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png'
            plt.figure(figsize=(7, 5))
            for i, motion in enumerate(self.motions):
                plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver')
            plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3)
            averageSpectrum = self.get_average_spectrum()
            plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2)
            plt.legend(loc='best', fontsize='x-small')
            plt.title(f'SRSS={self.get_srss():.4f}')
            if save:
                plt.savefig(os.path.join(folder, filename), dpi=200)
            else:
                plt.show()
        else:
            print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.")

    def plot_individual(self, save=True):
        '''Plot all individual ground motions and the spectrum.

        Parameters
        ----------
        Refer to `Suite.plot_all_accels`.  

        Outputs
        -------
        If save, a figure for each motion will be generated.  
        '''
        for i, mo in enumerate(self.motions):
            _, ax = plt.subplots(2, 1, figsize=(7, 5))
            ax[0].plot(np.arange(len(mo.accel)) * mo.dt,
                        mo.accel * self.factors[i])
            ax[1].plot(mo.spectrum.periods,
                        mo.spectrum.spectrum * self.factors[i])
            ax[1].plot(self.target_spectrum.periods,
                        self.target_spectrum.spectrum, linewidth=2)
            plt.suptitle(
                f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}')
            filename = f'{self.folder}/motion{i}.png'
            if save:
                plt.savefig(filename, dpi=200)
            else:
                plt.show()

    def plot_interactive(self, save=True):
        '''Plot the spectrum and the acceleration time-histories to an interactive html file.
        
        Parameters
        ----------
        `save` : boolean  
            If True, write to html file. Otherwise, show directly without saving.  
        
        Outputs
        -------
        A html file with interactive plotting sliders.
        '''
        fig = make_subplots(rows=1, cols=2)
        for i in range(len(self.motions)):
            fig.add_trace(go.Scatter(
                x=self.motions[i].spectrum.periods,
                y=self.motions[i].spectrum.spectrum * self.factors[i],
                name=self.motions[i].name,
            ), row=1, col=1)
            fig.add_trace(go.Scatter(
                x=self.motions[i].get_times(),
                y=self.motions[i].accel * self.factors[i],
                    name=self.motions[i].name,
            ), row=1, col=2)
        spectrum_average = self.get_average_spectrum()
        fig.add_trace(go.Scatter(
            x=spectrum_average.periods,
            y=spectrum_average.spectrum,
            name='average',
            line={'width': 5}
        ))
        fig.add_trace(go.Scatter(
            x=self.target_spectrum.periods,
            y=self.target_spectrum.spectrum,
            name='target',
            line={'width': 5}
        ))

        def _title(i):
            return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}'
        
        steps = []
        for i in range(len(self.motions)):
            step = dict(
                method='update',
                args=[
                    {'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]},
                    {'title': _title(i)}
                ],
                label=str(i),
            )
            steps.append(step)
        sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})]
        updatemenus = [dict(
            buttons=[dict(
                label='All',
                method='update',
                args=[
                    {'visible': [True] * (len(self.motions)*2+2)},
                    {'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'}
                ]
            )],
            type='buttons',
            x=1,
            y=0,
            xanchor='right',
            yanchor='top',
            pad={'t': 30},
        )]
        fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus)
        if save:
            fig.write_html(f'{self.folder}/{self.name}.html')
        else:
            fig.show()
    
    def to_dict(self, separate_motion=True):
        '''Convert all the properties to dict.  

        Parameters
        ----------  
        `separate_motion` : boolean  
            If True, the motion details are not included into the dict.  
            User has to make sure these motions are not moved.

        Returns
        -------  
        `it` : dict  
        '''
        return dict(
            folder=self.folder,
            name=self.name,
            motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [
                f'{mo.folder}/{mo.name}' for mo in self.motions],
            factors=self.factors if type(self.factors[0] == float) else [
                fact.item() for fact in self.factors],
            period_bound=self.period_bound,
            target_spectrum=self.target_spectrum.to_dict(),
            period_samples=self.period_samples.tolist(),
            srss=self.get_srss(),
            average_spectrum=self.get_average_spectrum().to_dict(),
        )

    def to_json(self, separate_motion=True):
        '''Convert the properties to json.  

        Parameters
        ----------
        Refer to `Suite.to_dict`.  

        Returns
        -------
        `json` : str
        '''
        return json.dumps(self.to_dict(separate_motion=separate_motion))

    def save(self, separate_motion=True, rewrite=False):
        '''Save the suite to a .suite file.  

        Parameters
        ----------  
        `separate_motion` : boolean  
            If True, the motion details are not included into the file.  
            User has to make sure the motions are not moved.

        `rewrite` : boolean   
            If True, all the '.motion' files will be rewritten.  
            otherwise, the motion files will be ignored.

        Outputs
        -------
        Create a .suite file to store the suite.
        '''
        with open(f'{self.folder}/{self.name}.suite', 'w') as f:
            f.write(self.to_json(separate_motion=separate_motion))
        if separate_motion:
            for mo in self.motions:
                if not os.path.exists(f'{mo.folder}/{mo.name}.motion'):
                    mo.save()
                    print(f'Motion saved to {mo.folder}/{mo.name}.motion')
                else:
                    if rewrite:
                        mo.save()
                        print(f'Motion {mo.folder}/{mo.name}.motion is written.')
        print(f'Saved to {self.folder}/{self.name}.suite')

    @staticmethod
    def read_dict(it):
        '''Read the dictionary representing the properties of the class.  
        If key 'motions' is a instance of dict, then construct motions with dict.   
        else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file.  

        Parameters
        ----------  
        `it` : dict  
            The dict generated by `Suite.to_dict`  

        Returns
        -------  
        `suite` : `Suite` 
        '''
        suite = Suite(
            folder=it['folder'],
            name=it['name'],
            target_spectrum=Spectrum.read_dict(it['target_spectrum']),
            period_bound=it['period_bound']
        )
        for i, mo_content in enumerate(it['motions']):
            if isinstance(mo_content, dict):
                suite.add_motion(Motion.read_dict(
                    mo_content), it['factors'][i])
            else:
                suite.add_motion(Motion.load(
                    f'{mo_content}.motion'), it['factors'][i])
        suite.period_samples = np.array(it['period_samples'])
        suite.target_vector = suite.target_spectrum.get(suite.period_samples)
        suite.reset_spectrum_matrix()
        return suite

    @staticmethod
    def load(filename):
        '''Load a the suite from a file.  

        Parameters
        ----------
        `filename` : str  
            File that stores the suite.

        Returns
        -------
        `suite` : `Suite`
        '''
        if not filename.endswith('.suite'):
            filename += '.suite'
        with open(filename, 'r') as f:
            return Suite.read_dict(json.loads(f.read()))

    def copy(self):
        '''Copy the instance.  
        Returns
        -------
        `suite` : `Suite`
        '''
        return Suite.read_dict(self.to_dict())

Classes

class Motion (folder, name, damping=0.05)

A class plays with ground motion. Store accel time history and accel spectrum only.

Parameters

folder : str
the folder this motion is stored.

name : str
The motion name. The motion will be saved to '{folder}/{name}.motion'

damping : float
The damping ratio of the motion spectrum.

Expand source code
class Motion(object):
    '''A class plays with ground motion. Store accel time history and accel spectrum only.  
    '''

    def __init__(self, folder, name, damping=0.05):
        '''
        Parameters
        ----------
        `folder` : str   
            the folder this motion is stored.

        `name` : str  
            The motion name. The motion will be saved to '{folder}/{name}.motion'

        `damping` : float  
            The damping ratio of the motion spectrum.  
        '''
        self.folder = folder
        '''str. See `Motion`.'''
        os.makedirs(folder, exist_ok=True)
        self.name = name
        '''str. See `Motion`.'''
        self.accel = None
        '''numpy.ndarray. Acceleration time history with evenly distributed sampling points.'''
        self.dt = None
        '''float. Acceleration time sampling delta t.'''
        self.information = ''
        '''str. Store massive infomation about the motion. '''
        self.spectrum = None
        '''`Spectrum`. The spectrum of the motion.'''
        self.damping = damping
        '''float. The damping ratio of the spectrum. Note: init with 0.05.'''

    def get_PGA(self):
        '''
        Returns
        -------
        `value` : float  
            The peak ground acceleration.  
        '''
        return np.max(abs(self.accel))

    def get_duration(self):
        '''
        Returns
        -------
        `value` : float  
            The duration of the motion.  
        '''
        return (len(self.accel) - 1) * self.dt

    def get_times(self):
        '''
        Returns
        -------
        `value` : numpy.ndarray  
            The time series of the motion  
        '''
        return np.arange(len(self.accel)) * self.dt

    def generate_spectrum(self, periods=None):
        '''Use scipy LTS signal processing module to generate the spectrum.  
        Note that the accel spectrum is pseudo-spectrum derived from disp spectrum.  
        So it's only accurate when damping is small.  
        If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated, 
            a blank spectrum will be returned.  

        Parameters
        ----------
        `periods` : List-like, None    
            The period sampling points.   
            If is None, `Spectrum.periods_standard` will be called.

        Returns
        -------
        `spectrum` : `Spectrum`  
            The generated spectrum.

        Outputs
        -------
        `Motion.spectrum` will be changed.
        '''
        if periods is None:
            periods = Spectrum.periods_standard()
        if self.dt is not None:
            print("Generating spectrum ...")
            omegas = 2 * np.pi / periods[1:]
            num = np.array([-1])
            spectrum = np.zeros(len(periods))
            spectrum[0] = self.get_PGA()
            for i, omega in enumerate(omegas):
                den = np.array([1, 2*self.damping*omega, omega**2])
                _, y, _ = lsim((num, den), self.accel, self.get_times())
                spectrum[i+1] = np.max(abs(y)) * omega ** 2
            self.spectrum = Spectrum(periods, spectrum)
        else:
            print("Dt is not known. Assigning empty spectrum.")
            self.spectrum = Spectrum(periods=[], spectrum=[])
        return self.spectrum

    def load_peer(self, peer_filename):
        '''Load acceleration time-history files downloaded from PEER.  
        The strings in the file will be parsed and also stored in self.information.  

        Parameters
        ----------
        `peer_filename` : str

        Outputs
        -------
        `Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed.  
        '''
        self.information += f'peer_filename: {peer_filename}'
        try:
            f = open(peer_filename, 'r')
            for _ in range(3):
                self.information += f.readline()
            s = f.readline()
            self.information += s
            re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC")
            mt = re_dt.match(s)
            try:
                self.dt = float(mt.group(1).replace('.', '0.'))
                print(f'Extracted dt = {self.dt:.4f}')
            except:
                print(f'ERROR: cannot find dt from {s}')
                self.dt = 0.000001
            series = []
            datastring = f.readlines()
            for dtline in datastring:
                for dt in dtline.split("  "):
                    if dt:
                        try:
                            series.append(float(dt.replace(".", "0.")))
                        except ValueError:
                            break
            self.accel = np.array(series)
            print(f'Extracted npts = {len(self.accel)}')
        except IOError:
            print(f'No file named {self.name} is found.')

    def convert_peer(self, peer_filename):
        '''A recipe to convert the peer acceleration file to the .motion file.  

        Parameters
        ----------
        `peer_filename` : str  

        `folder` : str  
            the folder to save .motion file.

        Outputs
        -------
        A .motion file will be created in `folder` with the PEER filename.
        '''
        print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}')
        self.load_peer(peer_filename)
        self.generate_spectrum()
        self.save()

    def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None):
        '''Trim the motion with two coefficients of PGA.  
        This method is used when a long near-zero time-history is stored in the data file.  

        Parameters
        ----------
        `new_name` : str  

        `new_folder` : str, None  

        `left`, `right` : float 
            The coefficient of PGA to trim the time history.
            Search from left or right, when the target point is found,
            search for the nearest near-zero point. Then trim off the small values.  


        `prepend_zero` : boolean  
            If True, prepend a zero value to the trimmed acceleration.

        `log` : boolean  
            If the log picture will be generated.

        `log_folder` : str, None
            Path for the folder to save the trim logs.
            If None, save in the previous folder.

        Returns
        -------
        `motion` : `Motion`  
            The trimmed motion.

        Outputs
        -------
        If log, plot a figure in log_folder named '{new_name}_trimlog.png'
        '''
        mo = self.copy()
        mo.name = new_name
        mo.folder = new_folder if new_folder is not None else self.folder
        pga = self.get_PGA()
        # search left
        i = 0
        while abs(self.accel[i]) < pga * left:
            i += 1
        while i > 0 and self.accel[i] * self.accel[i-1] > 0:
            i -= 1
        i_left = i
        # search right
        i = -1
        while abs(self.accel[i]) < pga * right:
            i -= 1
        while i < -1 and self.accel[i] * self.accel[i+1] > 0:
            i += 1
        i_right = len(self.accel) + i
        if prepend_zero:
            mo.accel = np.hstack([0, self.accel[i_left:i_right]])
        else:
            mo.accel = self.accel[i_left:i_right]
        mo.generate_spectrum()
        print(
            f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
        if log:
            log_folder = self.folder if log_folder is None else log_folder
            os.makedirs(log_folder, exist_ok=True)
            _, ax = plt.subplots(3, 1, figsize=(7, 5))
            ax[0].plot(self.get_times(), self.accel, label='before')
            ax[1].plot(mo.get_times(), mo.accel, label='after')
            ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
            ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
            for i in range(3):
                ax[i].legend()
            plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
            plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200)
        return mo

    def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None):
        '''Change dt for the accel. Linearly interpolate the acceleration, and re-sample.
        Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum.  

        Parameters
        ----------
        `dt` : float  
            New sampling delta t. 

        `new_name` : str  

        `new_folder` : str, None  

        `log` : boolean  
            If log, create a log picture comparing the before and after accels and spectrum.

        `log_folder` : str, None  
            Path for the folder to save the trim logs.
            If None, save in the previous folder.

        Returns
        -------
        `motion` : `Motion`  

        Outputs
        -------
        If log, create a picture named '{log_folder}/{new_name}_dtlog.png'.
        '''
        if dt == self.dt:
            print(f'Motion {self.name} do not need to change dt')
            return self.copy()
        else:
            mo = self.copy()
            mo.name = new_name
            mo.folder = new_folder if new_folder is not None else self.folder
            f = interp1d(self.get_times(), self.accel)
            npts = np.floor(self.dt * len(self.accel) / dt)
            mo.accel = f(np.arange(npts)*dt)
            mo.dt = dt
            mo.generate_spectrum()
            print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
        if log:
            log_folder = self.folder if log_folder is None else log_folder
            os.makedirs(log_folder, exist_ok=True)
            _, ax = plt.subplots(3, 1, figsize=(7, 5))
            ax[0].plot(self.get_times(), self.accel, label='before')
            ax[1].plot(mo.get_times(), mo.accel, label='after')
            ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
            ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
            for i in range(3):
                ax[i].legend()
            plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
            plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200)
        return mo

    def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None):
        '''Truncate the acceleration. 
        Sometimes, a ground motion file may have two peaks or more. 
        So if only one peak is needed, this method can be used to skip the other peaks.  

        Parameters
        ----------
        `time` : float
            The accels after the time will be cut off.  

        `new_name` : str

        `new_folder` : str, None  

        `log` : boolean  

        `log_folder` : str, None  

        Returns
        -------
        `motion` : `Motion`

        Outputs
        -------
        If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created.
        '''
        i = round(time / self.dt)
        mo = self.copy()
        mo.name = new_name
        mo.folder = new_folder if new_folder is not None else self.folder
        if i >= len(self.accel):
            print(f"WARNING: time is longer than duration. {self.name} is not truncated.")
            return mo
        mo.accel = self.accel[:i]
        mo.generate_spectrum()
        print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
        if log:
            log_folder = self.folder if log_folder is None else log_folder
            os.makedirs(log_folder, exist_ok=True)
            _, ax = plt.subplots(3, 1, figsize=(7, 5))
            ax[0].plot(self.get_times(), self.accel, label='before')
            ax[1].plot(mo.get_times(), mo.accel, label='after')
            ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
            ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
            for i in range(3):
                ax[i].legend()
            plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
            plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200)
        return mo

    def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1):
        '''Scale the ground motion.  

        Parameters
        ----------
        `new_name` : str

        `new_folder` : str, None

        `factor_x` : float  
            The scaling factor on the time/period axis.  

        `factor_y` : float
            The scaling factor on the accel axis.

        Returns
        -------
        `motion` : Motion
        '''
        mo = self.copy()
        mo.name = new_name
        mo.folder = new_folder if new_folder is not None else self.folder
        if factor_x != 1:
            mo.dt = self.dt * factor_x
        if factor_y != 1:
            mo.accel = self.accel * factor_y
        mo.spectrum = self.spectrum.scale(factor_x, factor_y)
        return mo

    def plot_timehistory(self, folder=None, save=True, engine='pyplot'):
        '''Plot the time history of the acceleration.

        Parameters
        ----------
        `folder` : str, None  
            Store folder. If None, use `Motion.folder` 

        `save` : boolean
            Save the figure or not. 

        `engine` : 'pyplot' or 'plotly'  

        Outputs
        -------
        if save is True, create a figure '{folder}/{self.name}_th'.
        '''
        folder = self.folder if folder is None else folder
        if engine == 'pyplot':
            plt.figure()
            t = self.get_times()
            plt.plot(t, self.accel)
            if save:
                plt.savefig(f'{folder}/{self.name}_th.png', dpi=200)
            else:
                plt.show()
        elif engine == 'plotly':
            fig = go.Figure()
            fig.add_trace(go.Scatter(
                x=self.get_times(),
                y=self.accel,
                mode='lines',
                name=self.name
            ))
            if save:
                fig.write_html(f'{folder}/{self.name}_th.html')
            else:
                fig.show()
        else:
            print('ERROR: engine should be in "pyplot" or "plotly"')

    def plot(self, folder=None, save=True, engine='pyplot'):
        '''Plot the time history and the spectrum.

        Parameters
        ----------
        See `Motion.plot_timehistory`

        Outputs
        -------
        Create a figure named '{folder}/{name}'
        '''
        folder = self.folder if folder is None else folder
        if engine == 'pyplot':
            _, ax = plt.subplots(2, 1, figsize=(7, 8))
            ax[0].plot(self.get_times(), self.accel)
            ax[0].set_xlabel('Time(s)')
            ax[0].set_ylabel('Acceleration(g)')
            ax[0].set_title(f'PGA={self.get_PGA():.4f}')
            ax[1].plot(self.spectrum.periods, self.spectrum.spectrum)
            ax[1].set_xlabel('Period(s)')
            ax[1].set_ylabel('Spectrum Acceleration(g)')
            ax[1].set_title(f'damping={self.damping:.4f}')
            plt.subplots_adjust(hspace=0.3)
            if save:
                plt.savefig(f'{folder}/{self.name}.png', dpi=200)
            else:
                plt.show()
        elif engine == 'plotly':
            fig = make_subplots(rows=1, cols=2)
            fig.add_trace(go.Scatter(
                x=self.get_times(),
                y=self.accel,
                mode='lines',
                name='accel'
            ), row=1, col=2)
            fig.add_trace(go.Scatter(
                x=self.spectrum.periods,
                y=self.spectrum.spectrum,
                mode='lines',
                name='spectrum'
            ), row=1, col=1)
            if save:
                fig.write_html(f'{folder}/{self.name}.html')
            else:
                fig.show()
        else:
            print('Engine should be in "pyplot" or "plotly"')

    def write_accel(self, filename, time=True):
        '''Write the acceleration timehistory to a single file.  

        Parameters
        ----------
        `filename` : str  
            The file path to store the accelration.  

        `time` : boolean  
            If True, a time column is created.
            To know dt in the file content, time should be True.
            Otherwise, dt should be stored in the filename.

        Outputs
        -------
        An text file with acceleration information will be written.
        '''
        if time:
            data = np.vstack([self.get_times(), self.accel])
            np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel")
        else:
            np.savetxt(filename, self.accel, fmt="%.7e")

    def write_info(self, filename):
        '''Write the information to a single file.  

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        A text file with information will be created.
        '''
        with open(filename, 'w') as f:
            f.write(f'dt={self.dt:.3}\n')
            f.write(f'npts={len(self.accel)}\n')
            f.write(f'damping={self.damping:.3}\n')
            f.write(self.information)

    def write_spectrum(self, filename):
        '''Write the spectrum to a single file. Use the form of csv.

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        A csv file will be created.
        '''
        self.spectrum.write_csv(filename)

    def read_accel(self, filename, dt=True):
        '''Load an single-column acceleration file to the current instance.

        Parameters
        ----------
        `dt` : float, boolean
            if dt is float, `Motion.dt` will be set to dt.
            Else if dt is True, `Motion.dt` will be found in the file.

        Outputs
        -------
        Change `Motion.dt` and `Motion.accel` in the CURRENT instance.
        '''
        if not isinstance(dt, float):
            data = np.loadtxt(filename)
            self.accel = data[:, 1]
            self.dt = data[1, 0]
        else:
            self.accel = np.loadtxt(filename)
            self.dt = dt

    def read_spectrum(self, filename):
        '''Read the spectrum from csv file.  

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        The `Motion.spectrum` of the CURRENT instance will be changed.
        '''
        spectrum = Spectrum.read_csv(filename)
        self.spectrum = spectrum

    def read_info(self, filename):
        '''Read info from text file.  

        Parameters
        ----------
        `filename` : str

        Outputs
        -------
        The `Motion.information` of the CURRENT instance will be changed.
        '''
        with open(filename, 'r') as f:
            self.dt = float(f.readline().replace("dt=", ""))
            _ = int(f.readline().replace("npts=", ""))
            self.damping = float(f.readline().replace("damping=", ""))
            self.information = f.read()

    def to_dict(self):
        '''Convert the instance properties to dict.  

        Returns
        -------
        `it` : dict
        '''
        return dict(
            dt=self.dt,
            npts=len(self.accel),
            damping=self.damping,
            information=self.information,
            accel=self.accel.tolist(),
            spectrum=self.spectrum.to_dict(),
            folder=self.folder,
            name=self.name,
        )

    def to_json(self):
        '''Convert the instance properties to string using json format.  

        Returns
        -------
        `string` : str
        '''
        return json.dumps(self.to_dict())

    def save(self):
        '''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name`  

        Outputs
        -------
        A file named '{self.folder}/{self.name}.motion' will be created.
        '''
        with open(f'{self.folder}/{self.name}.motion', 'w') as f:
            f.write(self.to_json())

    @staticmethod
    def read_dict(it):
        '''Create a instance by the dict generated by `Motion.to_dict`.  

        Parameters
        ----------
        `it` : dict

        Returns
        -------
        `mo` : `Motion`
        '''
        mo = Motion(folder=it['folder'], name=it['name'])
        mo.dt = it['dt']
        mo.damping = it['damping']
        mo.information = it['information']
        mo.accel = np.array(it['accel'])
        mo.spectrum = Spectrum.read_dict(it['spectrum'])
        return mo

    @staticmethod
    def read_json(string):
        '''Create a new instance by the string generated by `Motion.to_json`.  

        Parameters
        ----------
        `string` : str

        Returns
        -------
        `mo` : `Motion`
        '''
        it = json.loads(string)
        return Motion.read_dict(it)

    @staticmethod
    def load(filename):
        '''Load the file created from `Motion.save` method.  

        Parameters
        ----------
        `filename` : str

        Returns
        -------
        `mo` : `Motion`
        '''
        if not filename.endswith('.motion'):
            filename += '.motion'
        with open(filename, 'r') as f:
            return Motion.read_json(f.read())

    def copy(self):
        '''Copy the instance.

        Returns
        -------
        `mo` : `Motion`
        '''
        return Motion.read_dict(self.to_dict())

Static methods

def load(filename)

Load the file created from Motion.save() method.

Parameters

filename : str

Returns

mo : Motion

Expand source code
@staticmethod
def load(filename):
    '''Load the file created from `Motion.save` method.  

    Parameters
    ----------
    `filename` : str

    Returns
    -------
    `mo` : `Motion`
    '''
    if not filename.endswith('.motion'):
        filename += '.motion'
    with open(filename, 'r') as f:
        return Motion.read_json(f.read())
def read_dict(it)

Create a instance by the dict generated by Motion.to_dict().

Parameters

it : dict

Returns

mo : Motion

Expand source code
@staticmethod
def read_dict(it):
    '''Create a instance by the dict generated by `Motion.to_dict`.  

    Parameters
    ----------
    `it` : dict

    Returns
    -------
    `mo` : `Motion`
    '''
    mo = Motion(folder=it['folder'], name=it['name'])
    mo.dt = it['dt']
    mo.damping = it['damping']
    mo.information = it['information']
    mo.accel = np.array(it['accel'])
    mo.spectrum = Spectrum.read_dict(it['spectrum'])
    return mo
def read_json(string)

Create a new instance by the string generated by Motion.to_json().

Parameters

string : str

Returns

mo : Motion

Expand source code
@staticmethod
def read_json(string):
    '''Create a new instance by the string generated by `Motion.to_json`.  

    Parameters
    ----------
    `string` : str

    Returns
    -------
    `mo` : `Motion`
    '''
    it = json.loads(string)
    return Motion.read_dict(it)

Instance variables

var accel

numpy.ndarray. Acceleration time history with evenly distributed sampling points.

var damping

float. The damping ratio of the spectrum. Note: init with 0.05.

var dt

float. Acceleration time sampling delta t.

var folder

str. See Motion.

var information

str. Store massive infomation about the motion.

var name

str. See Motion.

var spectrum

Spectrum. The spectrum of the motion.

Methods

def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None)

Change dt for the accel. Linearly interpolate the acceleration, and re-sample. Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum.

Parameters

dt : float
New sampling delta t.

new_name : str

new_folder : str, None

log : boolean
If log, create a log picture comparing the before and after accels and spectrum.

log_folder : str, None
Path for the folder to save the trim logs. If None, save in the previous folder.

Returns

motion : Motion

Outputs

If log, create a picture named '{log_folder}/{new_name}_dtlog.png'.

Expand source code
def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None):
    '''Change dt for the accel. Linearly interpolate the acceleration, and re-sample.
    Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum.  

    Parameters
    ----------
    `dt` : float  
        New sampling delta t. 

    `new_name` : str  

    `new_folder` : str, None  

    `log` : boolean  
        If log, create a log picture comparing the before and after accels and spectrum.

    `log_folder` : str, None  
        Path for the folder to save the trim logs.
        If None, save in the previous folder.

    Returns
    -------
    `motion` : `Motion`  

    Outputs
    -------
    If log, create a picture named '{log_folder}/{new_name}_dtlog.png'.
    '''
    if dt == self.dt:
        print(f'Motion {self.name} do not need to change dt')
        return self.copy()
    else:
        mo = self.copy()
        mo.name = new_name
        mo.folder = new_folder if new_folder is not None else self.folder
        f = interp1d(self.get_times(), self.accel)
        npts = np.floor(self.dt * len(self.accel) / dt)
        mo.accel = f(np.arange(npts)*dt)
        mo.dt = dt
        mo.generate_spectrum()
        print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
    if log:
        log_folder = self.folder if log_folder is None else log_folder
        os.makedirs(log_folder, exist_ok=True)
        _, ax = plt.subplots(3, 1, figsize=(7, 5))
        ax[0].plot(self.get_times(), self.accel, label='before')
        ax[1].plot(mo.get_times(), mo.accel, label='after')
        ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
        ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
        for i in range(3):
            ax[i].legend()
        plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
        plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200)
    return mo
def convert_peer(self, peer_filename)

A recipe to convert the peer acceleration file to the .motion file.

Parameters

peer_filename : str

folder : str
the folder to save .motion file.

Outputs

A .motion file will be created in folder with the PEER filename.

Expand source code
def convert_peer(self, peer_filename):
    '''A recipe to convert the peer acceleration file to the .motion file.  

    Parameters
    ----------
    `peer_filename` : str  

    `folder` : str  
        the folder to save .motion file.

    Outputs
    -------
    A .motion file will be created in `folder` with the PEER filename.
    '''
    print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}')
    self.load_peer(peer_filename)
    self.generate_spectrum()
    self.save()
def copy(self)

Copy the instance.

Returns

mo : Motion

Expand source code
def copy(self):
    '''Copy the instance.

    Returns
    -------
    `mo` : `Motion`
    '''
    return Motion.read_dict(self.to_dict())
def generate_spectrum(self, periods=None)

Use scipy LTS signal processing module to generate the spectrum.
Note that the accel spectrum is pseudo-spectrum derived from disp spectrum.
So it's only accurate when damping is small.
If Motion.dt is still None, which usually occurs if the PEER file is not well formated, a blank spectrum will be returned.

Parameters

periods : List-like, None
The period sampling points.
If is None, Spectrum.periods_standard() will be called.

Returns

spectrum : Spectrum
The generated spectrum.

Outputs

Motion.spectrum will be changed.

Expand source code
def generate_spectrum(self, periods=None):
    '''Use scipy LTS signal processing module to generate the spectrum.  
    Note that the accel spectrum is pseudo-spectrum derived from disp spectrum.  
    So it's only accurate when damping is small.  
    If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated, 
        a blank spectrum will be returned.  

    Parameters
    ----------
    `periods` : List-like, None    
        The period sampling points.   
        If is None, `Spectrum.periods_standard` will be called.

    Returns
    -------
    `spectrum` : `Spectrum`  
        The generated spectrum.

    Outputs
    -------
    `Motion.spectrum` will be changed.
    '''
    if periods is None:
        periods = Spectrum.periods_standard()
    if self.dt is not None:
        print("Generating spectrum ...")
        omegas = 2 * np.pi / periods[1:]
        num = np.array([-1])
        spectrum = np.zeros(len(periods))
        spectrum[0] = self.get_PGA()
        for i, omega in enumerate(omegas):
            den = np.array([1, 2*self.damping*omega, omega**2])
            _, y, _ = lsim((num, den), self.accel, self.get_times())
            spectrum[i+1] = np.max(abs(y)) * omega ** 2
        self.spectrum = Spectrum(periods, spectrum)
    else:
        print("Dt is not known. Assigning empty spectrum.")
        self.spectrum = Spectrum(periods=[], spectrum=[])
    return self.spectrum
def get_PGA(self)

Returns

value : float
The peak ground acceleration.

Expand source code
def get_PGA(self):
    '''
    Returns
    -------
    `value` : float  
        The peak ground acceleration.  
    '''
    return np.max(abs(self.accel))
def get_duration(self)

Returns

value : float
The duration of the motion.

Expand source code
def get_duration(self):
    '''
    Returns
    -------
    `value` : float  
        The duration of the motion.  
    '''
    return (len(self.accel) - 1) * self.dt
def get_times(self)

Returns

value : numpy.ndarray
The time series of the motion

Expand source code
def get_times(self):
    '''
    Returns
    -------
    `value` : numpy.ndarray  
        The time series of the motion  
    '''
    return np.arange(len(self.accel)) * self.dt
def load_peer(self, peer_filename)

Load acceleration time-history files downloaded from PEER.
The strings in the file will be parsed and also stored in self.information.

Parameters

peer_filename : str

Outputs

Motion.dt, Motion.accel, Motion.information, Motion.spectrum will all be changed.

Expand source code
def load_peer(self, peer_filename):
    '''Load acceleration time-history files downloaded from PEER.  
    The strings in the file will be parsed and also stored in self.information.  

    Parameters
    ----------
    `peer_filename` : str

    Outputs
    -------
    `Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed.  
    '''
    self.information += f'peer_filename: {peer_filename}'
    try:
        f = open(peer_filename, 'r')
        for _ in range(3):
            self.information += f.readline()
        s = f.readline()
        self.information += s
        re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC")
        mt = re_dt.match(s)
        try:
            self.dt = float(mt.group(1).replace('.', '0.'))
            print(f'Extracted dt = {self.dt:.4f}')
        except:
            print(f'ERROR: cannot find dt from {s}')
            self.dt = 0.000001
        series = []
        datastring = f.readlines()
        for dtline in datastring:
            for dt in dtline.split("  "):
                if dt:
                    try:
                        series.append(float(dt.replace(".", "0.")))
                    except ValueError:
                        break
        self.accel = np.array(series)
        print(f'Extracted npts = {len(self.accel)}')
    except IOError:
        print(f'No file named {self.name} is found.')
def plot(self, folder=None, save=True, engine='pyplot')

Plot the time history and the spectrum.

Parameters

See Motion.plot_timehistory()

Outputs

Create a figure named '{folder}/{name}'

Expand source code
def plot(self, folder=None, save=True, engine='pyplot'):
    '''Plot the time history and the spectrum.

    Parameters
    ----------
    See `Motion.plot_timehistory`

    Outputs
    -------
    Create a figure named '{folder}/{name}'
    '''
    folder = self.folder if folder is None else folder
    if engine == 'pyplot':
        _, ax = plt.subplots(2, 1, figsize=(7, 8))
        ax[0].plot(self.get_times(), self.accel)
        ax[0].set_xlabel('Time(s)')
        ax[0].set_ylabel('Acceleration(g)')
        ax[0].set_title(f'PGA={self.get_PGA():.4f}')
        ax[1].plot(self.spectrum.periods, self.spectrum.spectrum)
        ax[1].set_xlabel('Period(s)')
        ax[1].set_ylabel('Spectrum Acceleration(g)')
        ax[1].set_title(f'damping={self.damping:.4f}')
        plt.subplots_adjust(hspace=0.3)
        if save:
            plt.savefig(f'{folder}/{self.name}.png', dpi=200)
        else:
            plt.show()
    elif engine == 'plotly':
        fig = make_subplots(rows=1, cols=2)
        fig.add_trace(go.Scatter(
            x=self.get_times(),
            y=self.accel,
            mode='lines',
            name='accel'
        ), row=1, col=2)
        fig.add_trace(go.Scatter(
            x=self.spectrum.periods,
            y=self.spectrum.spectrum,
            mode='lines',
            name='spectrum'
        ), row=1, col=1)
        if save:
            fig.write_html(f'{folder}/{self.name}.html')
        else:
            fig.show()
    else:
        print('Engine should be in "pyplot" or "plotly"')
def plot_timehistory(self, folder=None, save=True, engine='pyplot')

Plot the time history of the acceleration.

Parameters

folder : str, None
Store folder. If None, use Motion.folder

save : boolean Save the figure or not.

engine : 'pyplot' or 'plotly'

Outputs

if save is True, create a figure '{folder}/{self.name}_th'.

Expand source code
def plot_timehistory(self, folder=None, save=True, engine='pyplot'):
    '''Plot the time history of the acceleration.

    Parameters
    ----------
    `folder` : str, None  
        Store folder. If None, use `Motion.folder` 

    `save` : boolean
        Save the figure or not. 

    `engine` : 'pyplot' or 'plotly'  

    Outputs
    -------
    if save is True, create a figure '{folder}/{self.name}_th'.
    '''
    folder = self.folder if folder is None else folder
    if engine == 'pyplot':
        plt.figure()
        t = self.get_times()
        plt.plot(t, self.accel)
        if save:
            plt.savefig(f'{folder}/{self.name}_th.png', dpi=200)
        else:
            plt.show()
    elif engine == 'plotly':
        fig = go.Figure()
        fig.add_trace(go.Scatter(
            x=self.get_times(),
            y=self.accel,
            mode='lines',
            name=self.name
        ))
        if save:
            fig.write_html(f'{folder}/{self.name}_th.html')
        else:
            fig.show()
    else:
        print('ERROR: engine should be in "pyplot" or "plotly"')
def read_accel(self, filename, dt=True)

Load an single-column acceleration file to the current instance.

Parameters

dt : float, boolean if dt is float, Motion.dt will be set to dt. Else if dt is True, Motion.dt will be found in the file.

Outputs

Change Motion.dt and Motion.accel in the CURRENT instance.

Expand source code
def read_accel(self, filename, dt=True):
    '''Load an single-column acceleration file to the current instance.

    Parameters
    ----------
    `dt` : float, boolean
        if dt is float, `Motion.dt` will be set to dt.
        Else if dt is True, `Motion.dt` will be found in the file.

    Outputs
    -------
    Change `Motion.dt` and `Motion.accel` in the CURRENT instance.
    '''
    if not isinstance(dt, float):
        data = np.loadtxt(filename)
        self.accel = data[:, 1]
        self.dt = data[1, 0]
    else:
        self.accel = np.loadtxt(filename)
        self.dt = dt
def read_info(self, filename)

Read info from text file.

Parameters

filename : str

Outputs

The Motion.information of the CURRENT instance will be changed.

Expand source code
def read_info(self, filename):
    '''Read info from text file.  

    Parameters
    ----------
    `filename` : str

    Outputs
    -------
    The `Motion.information` of the CURRENT instance will be changed.
    '''
    with open(filename, 'r') as f:
        self.dt = float(f.readline().replace("dt=", ""))
        _ = int(f.readline().replace("npts=", ""))
        self.damping = float(f.readline().replace("damping=", ""))
        self.information = f.read()
def read_spectrum(self, filename)

Read the spectrum from csv file.

Parameters

filename : str

Outputs

The Motion.spectrum of the CURRENT instance will be changed.

Expand source code
def read_spectrum(self, filename):
    '''Read the spectrum from csv file.  

    Parameters
    ----------
    `filename` : str

    Outputs
    -------
    The `Motion.spectrum` of the CURRENT instance will be changed.
    '''
    spectrum = Spectrum.read_csv(filename)
    self.spectrum = spectrum
def save(self)

Save the instance to a file, which is defined by Motion.folder and Motion.name

Outputs

A file named '{self.folder}/{self.name}.motion' will be created.

Expand source code
def save(self):
    '''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name`  

    Outputs
    -------
    A file named '{self.folder}/{self.name}.motion' will be created.
    '''
    with open(f'{self.folder}/{self.name}.motion', 'w') as f:
        f.write(self.to_json())
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1)

Scale the ground motion.

Parameters

new_name : str

new_folder : str, None

factor_x : float
The scaling factor on the time/period axis.

factor_y : float The scaling factor on the accel axis.

Returns

motion : Motion

Expand source code
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1):
    '''Scale the ground motion.  

    Parameters
    ----------
    `new_name` : str

    `new_folder` : str, None

    `factor_x` : float  
        The scaling factor on the time/period axis.  

    `factor_y` : float
        The scaling factor on the accel axis.

    Returns
    -------
    `motion` : Motion
    '''
    mo = self.copy()
    mo.name = new_name
    mo.folder = new_folder if new_folder is not None else self.folder
    if factor_x != 1:
        mo.dt = self.dt * factor_x
    if factor_y != 1:
        mo.accel = self.accel * factor_y
    mo.spectrum = self.spectrum.scale(factor_x, factor_y)
    return mo
def to_dict(self)

Convert the instance properties to dict.

Returns

it : dict

Expand source code
def to_dict(self):
    '''Convert the instance properties to dict.  

    Returns
    -------
    `it` : dict
    '''
    return dict(
        dt=self.dt,
        npts=len(self.accel),
        damping=self.damping,
        information=self.information,
        accel=self.accel.tolist(),
        spectrum=self.spectrum.to_dict(),
        folder=self.folder,
        name=self.name,
    )
def to_json(self)

Convert the instance properties to string using json format.

Returns

string : str

Expand source code
def to_json(self):
    '''Convert the instance properties to string using json format.  

    Returns
    -------
    `string` : str
    '''
    return json.dumps(self.to_dict())
def trim(self, new_name, new_folder=None, left=0.001, right=0.001, prepend_zero=True, log=True, log_folder=None)

Trim the motion with two coefficients of PGA.
This method is used when a long near-zero time-history is stored in the data file.

Parameters

new_name : str

new_folder : str, None

left, right : float The coefficient of PGA to trim the time history. Search from left or right, when the target point is found, search for the nearest near-zero point. Then trim off the small values.

prepend_zero : boolean
If True, prepend a zero value to the trimmed acceleration.

log : boolean
If the log picture will be generated.

log_folder : str, None Path for the folder to save the trim logs. If None, save in the previous folder.

Returns

motion : Motion
The trimmed motion.

Outputs

If log, plot a figure in log_folder named '{new_name}_trimlog.png'

Expand source code
def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None):
    '''Trim the motion with two coefficients of PGA.  
    This method is used when a long near-zero time-history is stored in the data file.  

    Parameters
    ----------
    `new_name` : str  

    `new_folder` : str, None  

    `left`, `right` : float 
        The coefficient of PGA to trim the time history.
        Search from left or right, when the target point is found,
        search for the nearest near-zero point. Then trim off the small values.  


    `prepend_zero` : boolean  
        If True, prepend a zero value to the trimmed acceleration.

    `log` : boolean  
        If the log picture will be generated.

    `log_folder` : str, None
        Path for the folder to save the trim logs.
        If None, save in the previous folder.

    Returns
    -------
    `motion` : `Motion`  
        The trimmed motion.

    Outputs
    -------
    If log, plot a figure in log_folder named '{new_name}_trimlog.png'
    '''
    mo = self.copy()
    mo.name = new_name
    mo.folder = new_folder if new_folder is not None else self.folder
    pga = self.get_PGA()
    # search left
    i = 0
    while abs(self.accel[i]) < pga * left:
        i += 1
    while i > 0 and self.accel[i] * self.accel[i-1] > 0:
        i -= 1
    i_left = i
    # search right
    i = -1
    while abs(self.accel[i]) < pga * right:
        i -= 1
    while i < -1 and self.accel[i] * self.accel[i+1] > 0:
        i += 1
    i_right = len(self.accel) + i
    if prepend_zero:
        mo.accel = np.hstack([0, self.accel[i_left:i_right]])
    else:
        mo.accel = self.accel[i_left:i_right]
    mo.generate_spectrum()
    print(
        f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
    if log:
        log_folder = self.folder if log_folder is None else log_folder
        os.makedirs(log_folder, exist_ok=True)
        _, ax = plt.subplots(3, 1, figsize=(7, 5))
        ax[0].plot(self.get_times(), self.accel, label='before')
        ax[1].plot(mo.get_times(), mo.accel, label='after')
        ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
        ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
        for i in range(3):
            ax[i].legend()
        plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
        plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200)
    return mo
def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None)

Truncate the acceleration. Sometimes, a ground motion file may have two peaks or more. So if only one peak is needed, this method can be used to skip the other peaks.

Parameters

time : float The accels after the time will be cut off.

new_name : str

new_folder : str, None

log : boolean

log_folder : str, None

Returns

motion : Motion

Outputs

If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created.

Expand source code
def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None):
    '''Truncate the acceleration. 
    Sometimes, a ground motion file may have two peaks or more. 
    So if only one peak is needed, this method can be used to skip the other peaks.  

    Parameters
    ----------
    `time` : float
        The accels after the time will be cut off.  

    `new_name` : str

    `new_folder` : str, None  

    `log` : boolean  

    `log_folder` : str, None  

    Returns
    -------
    `motion` : `Motion`

    Outputs
    -------
    If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created.
    '''
    i = round(time / self.dt)
    mo = self.copy()
    mo.name = new_name
    mo.folder = new_folder if new_folder is not None else self.folder
    if i >= len(self.accel):
        print(f"WARNING: time is longer than duration. {self.name} is not truncated.")
        return mo
    mo.accel = self.accel[:i]
    mo.generate_spectrum()
    print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
    if log:
        log_folder = self.folder if log_folder is None else log_folder
        os.makedirs(log_folder, exist_ok=True)
        _, ax = plt.subplots(3, 1, figsize=(7, 5))
        ax[0].plot(self.get_times(), self.accel, label='before')
        ax[1].plot(mo.get_times(), mo.accel, label='after')
        ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
        ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
        for i in range(3):
            ax[i].legend()
        plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
        plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200)
    return mo
def write_accel(self, filename, time=True)

Write the acceleration timehistory to a single file.

Parameters

filename : str
The file path to store the accelration.

time : boolean
If True, a time column is created. To know dt in the file content, time should be True. Otherwise, dt should be stored in the filename.

Outputs

An text file with acceleration information will be written.

Expand source code
def write_accel(self, filename, time=True):
    '''Write the acceleration timehistory to a single file.  

    Parameters
    ----------
    `filename` : str  
        The file path to store the accelration.  

    `time` : boolean  
        If True, a time column is created.
        To know dt in the file content, time should be True.
        Otherwise, dt should be stored in the filename.

    Outputs
    -------
    An text file with acceleration information will be written.
    '''
    if time:
        data = np.vstack([self.get_times(), self.accel])
        np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel")
    else:
        np.savetxt(filename, self.accel, fmt="%.7e")
def write_info(self, filename)

Write the information to a single file.

Parameters

filename : str

Outputs

A text file with information will be created.

Expand source code
def write_info(self, filename):
    '''Write the information to a single file.  

    Parameters
    ----------
    `filename` : str

    Outputs
    -------
    A text file with information will be created.
    '''
    with open(filename, 'w') as f:
        f.write(f'dt={self.dt:.3}\n')
        f.write(f'npts={len(self.accel)}\n')
        f.write(f'damping={self.damping:.3}\n')
        f.write(self.information)
def write_spectrum(self, filename)

Write the spectrum to a single file. Use the form of csv.

Parameters

filename : str

Outputs

A csv file will be created.

Expand source code
def write_spectrum(self, filename):
    '''Write the spectrum to a single file. Use the form of csv.

    Parameters
    ----------
    `filename` : str

    Outputs
    -------
    A csv file will be created.
    '''
    self.spectrum.write_csv(filename)
class Spectrum (periods, spectrum)

A class storing a spectrum.
It can be init-ed simply with (periods, spectrum) sampling points.
It can also be init-ed by static functions to generate Code-confined spectrums.

Parameters

periods : List, numpy.ndarray
List of sampling periods.
It can be generated by the static function Spectrum.periods_standard() . Otherwise, users can specify desired period sampling points.

spectrum : List, numpy.ndarray
List of the spectrum points corresponding to sampling periods.

Expand source code
class Spectrum():
    '''A class storing a spectrum.  
    It can be init-ed simply with (periods, spectrum) sampling points.  
    It can also be init-ed by static functions to generate Code-confined spectrums.

    Parameters  
    ----------  
    `periods` : List, numpy.ndarray  
        List of sampling periods.  
        It can be generated by the static function `Spectrum.periods_standard` .
        Otherwise, users can specify desired period sampling points.  

    `spectrum` : List, numpy.ndarray  
        List of the spectrum points corresponding to sampling periods.  
    '''

    def __init__(self, periods, spectrum):
        self.periods = np.array(periods)
        '''numpy.ndarray, sampling periods'''
        self.spectrum = np.array(spectrum)
        '''numpy.ndarray, spectrum corresponding to sampling periods.'''

    def match(self, target, start, end, step=0.1):
        '''Match the current spectrum with a target spectrum. Use Least Square method.  

        Parameters  
        ----------  
        `target` : `Spectrum`  
            the spectrum to be matched.  

        `start`, `end` : float  
            the matching period boundaries.  

        `step` : float
            evenly sample the periods with the step.

        Returns
        -------
        `coefficient` : float  
            the coefficient to match the current spectrum to the target.

        `srss` : float   
            the srss after match.
        '''
        periods = np.arange(start, end+step, step)
        this_spectrum = np.array([self.get(p) for p in periods])
        that_spectrum = np.array([target.get(p) for p in periods])
        coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2)
        srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5
        return coefficient, srss

    def get(self, period):
        '''Get the spectrum value. Either a list or a number is accepted.  
        If the period exceeds the largest sampling period, the last spectrum value is returned.  
        The spectrum value not on sampling points are linearly interpolated.  

        Parameters
        ----------
        `period` : number, list, numpy.ndarray  
            The interested period.

        Returns
        -------
        `spectrum` : number, numpy.ndarray  
            if period is list-like, return a ndarray.
            if period is a number, return a number.
        '''
        if not hasattr(period, '__iter__'):
            i = self.periods.searchsorted(period)
            if self.periods[i] == period:
                return self.spectrum[i]
            elif i >= len(self.periods):
                print("Warning: given period exceeds the sampling range.")
                return self.spectrum[-1]
            else:
                return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period)
        else:
            return np.array([self.get(p) for p in period])

    def scale(self, factor_x=1, factor_y=1):
        '''Scale the spectrum.

        Parameters
        ----------
        `factor_x` : number  
            scaling factor in x direction (periods)  

        `factor_y` : number  
            scaling factor in y direction (spectrum)

        Returns
        -------
        `spectrum` : `Spectrum`
        '''
        return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y)

    def plot(self, filename=None, engine='pyplot'):
        '''Plot the spectrum.

        Parameters
        ----------
        `filename` : string, None  
            if None, show it. Else, save it to the filename  

        `engine` : 'pyplot' or 'plotly'  
            Plot engine.

        Outputs
        -------
        A file with name `filename`.png or `filename`.html will be generated.
        '''
        if engine == 'pyplot':
            plt.figure(figsize=(4, 3))
            plt.plot(self.periods, self.spectrum, marker='o')
            plt.xlabel('Period (s)')
            plt.ylabel('Spetrum')
            plt.xlim([0, self.periods[-1]])
            if filename is None:
                plt.show()
            else:
                plt.savefig(filename+'.png', dpi=200)
        elif engine == 'plotly':
            fig = go.Figure()
            fig.add_scatter(dict(
                x='self.periods',
                y='self.spectrum',
                mode='lines+markers',
            ))
            if filename is None:
                fig.show()
            else:
                fig.write_html(filename+'.html')
        else:
            print('ERROR: engine should be in "pyplot" or "plotly"')

    def to_dict(self):
        '''Convert the instance properties to a dict.  

        Returns
        -------
        `it` : dict
        '''
        return dict(
            periods=self.periods.tolist(),
            spectrum=self.spectrum.tolist()
        )

    def to_json(self):
        '''Convert the properties to json.  

        Returns
        -------
        `json` : string  
        '''
        return json.dumps(self.to_dict())

    def save(self, filename):
        '''Save the spectrum to a file. Currently use json form.  

        Parameters
        ----------
        `filename` : string  
            The file name to be saved.   
            The extension '.spectrum' will be automatically added.  

        Outputs
        -------
        A file with name `filename`.spectrum will be generated.
        '''
        with open(filename + '.spectrum', 'w') as f:
            f.write(self.to_json())

    def write_csv(self, filename):
        '''Write the spectrum to a csv file.  

        Parameters
        ----------
        `filename` : string  
            The extension '.csv' will be automatically added.

        Outputs
        -------
        A file named `filename`.csv will be generated.
        '''
        np.savetxt(filename+'.csv', np.vstack(
            [self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum')

    @staticmethod
    def read_csv(filename):
        '''Create an instance by reading the written csv file.  

        Parameters
        ----------
        `filename` : string  
            The filename to read.

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        if not filename.endswith('.csv'):
            filename += '.csv'
        data = np.loadtxt(filename, delimiter=',')
        return Spectrum(periods=data[:, 0], spectrum=data[:, 1])

    @staticmethod
    def read_dict(it):
        '''Create an instance from the dict.  

        Parameters
        ----------
        `it` : dict  
            The dict generated by `Spectrum.to_dict`

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        return Spectrum(periods=it['periods'], spectrum=it['spectrum'])

    @staticmethod
    def read_json(string):
        '''Create an instance from json string.  

        Parameters
        ----------
        `string` : str  
            The json string 

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        it = json.loads(string)
        return Spectrum.read_dict(it)

    @staticmethod
    def load(filename):
        '''Load the spectrum from file.  

        Parameters
        ----------
        `filename` : str  

        Returns
        -------
        `spectrum` : `Spectrum`  
            New instance.
        '''
        if not filename.endswith('.spectrum'):
            filename += '.spectrum'
        with open(filename, 'r') as f:
            string = f.read()
        return Spectrum.read_json(string)

    @staticmethod
    def periods_standard(until=6):
        '''Get a standard sampling of the periods.  
        The sampling points are more sparse with the increase of period.  

        Parameters
        ----------  
        `until` : number  
            The end of the period sampling list. It should >= 4.0.

        Returns
        -------
        `periods` : numpy.ndarray  
            The standard sampling points.
        '''
        assert until >= 4
        return np.hstack([np.arange(0, 0.2, 0.01),
                          np.arange(0.2, 0.5, 0.02),
                          np.arange(0.5, 1.0, 0.05),
                          np.arange(1.0, 2.0, 0.1),
                          np.arange(2.0, 4.0, 0.2),
                          np.arange(4.0, until+0.5, 0.5)])

    @staticmethod
    def chinese_point(period, intensity, major, site, group, damping=0.05):
        '''Get spectrum point for Chinese code, given a period.  

        Parameters
        ----------  
        `period` : number  
            The given period.  

        `intensity` : number  
            Intensity represented by a number.   
            Use 7.5 or 8.5 for half-level intensity.  

        `major` : boolean
            If True, it's for major (rare) earthquake.  
            If False, for minor (frequent) earthquake.  

        `site` : int
            Site number (Roman number in the code. 0 for I0, 1 for I1.)  

        `group` : int  
            Site class (Chinese number in the code.)  

        `damping` : float  
            Spectrum damping.  

        Returns
        -------  
        `value` : float
            the specific spectrum point value corresponding to the period.  
        '''
        # alpha max
        if intensity == 6:
            alphamax = 0.28 if major else 0.04
        elif intensity == 7:
            alphamax = 0.50 if major else 0.08
        elif intensity == 7.5:
            alphamax = 0.72 if major else 0.12
        elif intensity == 8:
            alphamax = 0.9 if major else 0.16
        elif intensity == 8.5:
            alphamax = 1.2 if major else 0.24
        elif intensity == 9:
            alphamax = 1.40 if major else 0.32
        else:
            raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]")
        # tg
        tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65],
                    [0.25, 0.3, 0.4, 0.55, 0.75],
                    [0.3, 0.35, 0.45, 0.65, 0.9]]
        assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4]
        tg = tg_table[group-1][site]
        # other parameters
        gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping)
        eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping)
        eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping)
        # calculate
        if period < 0.1:
            return ((10*eta2 - 4.5) * period + 0.45) * alphamax
        elif period < tg:
            return eta2 * alphamax
        elif period < 5*tg:
            return (tg/period)**gamma * eta2 * alphamax
        elif period <= 6:
            return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax
        else:
            return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax

    @staticmethod
    def chinese(intensity=8, major=True, site=3, group=2, damping=0.05):
        '''Return a new spectrum instance that confines to Chinese Code.  
        Use `Spectrum.periods_standard` to get sampling periods.  

        Parameters
        ----------
        refer to `Spectrum.chinese_point`

        Returns
        -------
        `spectrum` : `Spectrum`
        '''
        periods = Spectrum.periods_standard(until=6)
        spectrum = [Spectrum.chinese_point(p, 
                                           intensity=intensity,
                                           major=major,
                                           site=site,
                                           group=group,
                                           damping=damping) for p in periods]
        return Spectrum(periods, spectrum)

Static methods

def chinese(intensity=8, major=True, site=3, group=2, damping=0.05)

Return a new spectrum instance that confines to Chinese Code.
Use Spectrum.periods_standard() to get sampling periods.

Parameters

refer to Spectrum.chinese_point()

Returns

spectrum : Spectrum

Expand source code
@staticmethod
def chinese(intensity=8, major=True, site=3, group=2, damping=0.05):
    '''Return a new spectrum instance that confines to Chinese Code.  
    Use `Spectrum.periods_standard` to get sampling periods.  

    Parameters
    ----------
    refer to `Spectrum.chinese_point`

    Returns
    -------
    `spectrum` : `Spectrum`
    '''
    periods = Spectrum.periods_standard(until=6)
    spectrum = [Spectrum.chinese_point(p, 
                                       intensity=intensity,
                                       major=major,
                                       site=site,
                                       group=group,
                                       damping=damping) for p in periods]
    return Spectrum(periods, spectrum)
def chinese_point(period, intensity, major, site, group, damping=0.05)

Get spectrum point for Chinese code, given a period.

Parameters

period : number
The given period.

intensity : number
Intensity represented by a number.
Use 7.5 or 8.5 for half-level intensity.

major : boolean If True, it's for major (rare) earthquake.
If False, for minor (frequent) earthquake.

site : int Site number (Roman number in the code. 0 for I0, 1 for I1.)

group : int
Site class (Chinese number in the code.)

damping : float
Spectrum damping.

Returns

value : float the specific spectrum point value corresponding to the period.

Expand source code
@staticmethod
def chinese_point(period, intensity, major, site, group, damping=0.05):
    '''Get spectrum point for Chinese code, given a period.  

    Parameters
    ----------  
    `period` : number  
        The given period.  

    `intensity` : number  
        Intensity represented by a number.   
        Use 7.5 or 8.5 for half-level intensity.  

    `major` : boolean
        If True, it's for major (rare) earthquake.  
        If False, for minor (frequent) earthquake.  

    `site` : int
        Site number (Roman number in the code. 0 for I0, 1 for I1.)  

    `group` : int  
        Site class (Chinese number in the code.)  

    `damping` : float  
        Spectrum damping.  

    Returns
    -------  
    `value` : float
        the specific spectrum point value corresponding to the period.  
    '''
    # alpha max
    if intensity == 6:
        alphamax = 0.28 if major else 0.04
    elif intensity == 7:
        alphamax = 0.50 if major else 0.08
    elif intensity == 7.5:
        alphamax = 0.72 if major else 0.12
    elif intensity == 8:
        alphamax = 0.9 if major else 0.16
    elif intensity == 8.5:
        alphamax = 1.2 if major else 0.24
    elif intensity == 9:
        alphamax = 1.40 if major else 0.32
    else:
        raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]")
    # tg
    tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65],
                [0.25, 0.3, 0.4, 0.55, 0.75],
                [0.3, 0.35, 0.45, 0.65, 0.9]]
    assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4]
    tg = tg_table[group-1][site]
    # other parameters
    gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping)
    eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping)
    eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping)
    # calculate
    if period < 0.1:
        return ((10*eta2 - 4.5) * period + 0.45) * alphamax
    elif period < tg:
        return eta2 * alphamax
    elif period < 5*tg:
        return (tg/period)**gamma * eta2 * alphamax
    elif period <= 6:
        return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax
    else:
        return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax
def load(filename)

Load the spectrum from file.

Parameters

filename : str

Returns

spectrum : Spectrum
New instance.

Expand source code
@staticmethod
def load(filename):
    '''Load the spectrum from file.  

    Parameters
    ----------
    `filename` : str  

    Returns
    -------
    `spectrum` : `Spectrum`  
        New instance.
    '''
    if not filename.endswith('.spectrum'):
        filename += '.spectrum'
    with open(filename, 'r') as f:
        string = f.read()
    return Spectrum.read_json(string)
def periods_standard(until=6)

Get a standard sampling of the periods.
The sampling points are more sparse with the increase of period.

Parameters

until : number
The end of the period sampling list. It should >= 4.0.

Returns

periods : numpy.ndarray
The standard sampling points.

Expand source code
@staticmethod
def periods_standard(until=6):
    '''Get a standard sampling of the periods.  
    The sampling points are more sparse with the increase of period.  

    Parameters
    ----------  
    `until` : number  
        The end of the period sampling list. It should >= 4.0.

    Returns
    -------
    `periods` : numpy.ndarray  
        The standard sampling points.
    '''
    assert until >= 4
    return np.hstack([np.arange(0, 0.2, 0.01),
                      np.arange(0.2, 0.5, 0.02),
                      np.arange(0.5, 1.0, 0.05),
                      np.arange(1.0, 2.0, 0.1),
                      np.arange(2.0, 4.0, 0.2),
                      np.arange(4.0, until+0.5, 0.5)])
def read_csv(filename)

Create an instance by reading the written csv file.

Parameters

filename : string
The filename to read.

Returns

spectrum : Spectrum
New instance.

Expand source code
@staticmethod
def read_csv(filename):
    '''Create an instance by reading the written csv file.  

    Parameters
    ----------
    `filename` : string  
        The filename to read.

    Returns
    -------
    `spectrum` : `Spectrum`  
        New instance.
    '''
    if not filename.endswith('.csv'):
        filename += '.csv'
    data = np.loadtxt(filename, delimiter=',')
    return Spectrum(periods=data[:, 0], spectrum=data[:, 1])
def read_dict(it)

Create an instance from the dict.

Parameters

it : dict
The dict generated by Spectrum.to_dict()

Returns

spectrum : Spectrum
New instance.

Expand source code
@staticmethod
def read_dict(it):
    '''Create an instance from the dict.  

    Parameters
    ----------
    `it` : dict  
        The dict generated by `Spectrum.to_dict`

    Returns
    -------
    `spectrum` : `Spectrum`  
        New instance.
    '''
    return Spectrum(periods=it['periods'], spectrum=it['spectrum'])
def read_json(string)

Create an instance from json string.

Parameters

string : str
The json string

Returns

spectrum : Spectrum
New instance.

Expand source code
@staticmethod
def read_json(string):
    '''Create an instance from json string.  

    Parameters
    ----------
    `string` : str  
        The json string 

    Returns
    -------
    `spectrum` : `Spectrum`  
        New instance.
    '''
    it = json.loads(string)
    return Spectrum.read_dict(it)

Instance variables

var periods

numpy.ndarray, sampling periods

var spectrum

numpy.ndarray, spectrum corresponding to sampling periods.

Methods

def get(self, period)

Get the spectrum value. Either a list or a number is accepted.
If the period exceeds the largest sampling period, the last spectrum value is returned.
The spectrum value not on sampling points are linearly interpolated.

Parameters

period : number, list, numpy.ndarray
The interested period.

Returns

spectrum : number, numpy.ndarray
if period is list-like, return a ndarray. if period is a number, return a number.

Expand source code
def get(self, period):
    '''Get the spectrum value. Either a list or a number is accepted.  
    If the period exceeds the largest sampling period, the last spectrum value is returned.  
    The spectrum value not on sampling points are linearly interpolated.  

    Parameters
    ----------
    `period` : number, list, numpy.ndarray  
        The interested period.

    Returns
    -------
    `spectrum` : number, numpy.ndarray  
        if period is list-like, return a ndarray.
        if period is a number, return a number.
    '''
    if not hasattr(period, '__iter__'):
        i = self.periods.searchsorted(period)
        if self.periods[i] == period:
            return self.spectrum[i]
        elif i >= len(self.periods):
            print("Warning: given period exceeds the sampling range.")
            return self.spectrum[-1]
        else:
            return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period)
    else:
        return np.array([self.get(p) for p in period])
def match(self, target, start, end, step=0.1)

Match the current spectrum with a target spectrum. Use Least Square method.

Parameters

target : Spectrum
the spectrum to be matched.

start, end : float
the matching period boundaries.

step : float evenly sample the periods with the step.

Returns

coefficient : float
the coefficient to match the current spectrum to the target.

srss : float
the srss after match.

Expand source code
def match(self, target, start, end, step=0.1):
    '''Match the current spectrum with a target spectrum. Use Least Square method.  

    Parameters  
    ----------  
    `target` : `Spectrum`  
        the spectrum to be matched.  

    `start`, `end` : float  
        the matching period boundaries.  

    `step` : float
        evenly sample the periods with the step.

    Returns
    -------
    `coefficient` : float  
        the coefficient to match the current spectrum to the target.

    `srss` : float   
        the srss after match.
    '''
    periods = np.arange(start, end+step, step)
    this_spectrum = np.array([self.get(p) for p in periods])
    that_spectrum = np.array([target.get(p) for p in periods])
    coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2)
    srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5
    return coefficient, srss
def plot(self, filename=None, engine='pyplot')

Plot the spectrum.

Parameters

filename : string, None
if None, show it. Else, save it to the filename

engine : 'pyplot' or 'plotly'
Plot engine.

Outputs

A file with name filename.png or filename.html will be generated.

Expand source code
def plot(self, filename=None, engine='pyplot'):
    '''Plot the spectrum.

    Parameters
    ----------
    `filename` : string, None  
        if None, show it. Else, save it to the filename  

    `engine` : 'pyplot' or 'plotly'  
        Plot engine.

    Outputs
    -------
    A file with name `filename`.png or `filename`.html will be generated.
    '''
    if engine == 'pyplot':
        plt.figure(figsize=(4, 3))
        plt.plot(self.periods, self.spectrum, marker='o')
        plt.xlabel('Period (s)')
        plt.ylabel('Spetrum')
        plt.xlim([0, self.periods[-1]])
        if filename is None:
            plt.show()
        else:
            plt.savefig(filename+'.png', dpi=200)
    elif engine == 'plotly':
        fig = go.Figure()
        fig.add_scatter(dict(
            x='self.periods',
            y='self.spectrum',
            mode='lines+markers',
        ))
        if filename is None:
            fig.show()
        else:
            fig.write_html(filename+'.html')
    else:
        print('ERROR: engine should be in "pyplot" or "plotly"')
def save(self, filename)

Save the spectrum to a file. Currently use json form.

Parameters

filename : string
The file name to be saved.
The extension '.spectrum' will be automatically added.

Outputs

A file with name filename.spectrum will be generated.

Expand source code
def save(self, filename):
    '''Save the spectrum to a file. Currently use json form.  

    Parameters
    ----------
    `filename` : string  
        The file name to be saved.   
        The extension '.spectrum' will be automatically added.  

    Outputs
    -------
    A file with name `filename`.spectrum will be generated.
    '''
    with open(filename + '.spectrum', 'w') as f:
        f.write(self.to_json())
def scale(self, factor_x=1, factor_y=1)

Scale the spectrum.

Parameters

factor_x : number
scaling factor in x direction (periods)

factor_y : number
scaling factor in y direction (spectrum)

Returns

spectrum : Spectrum

Expand source code
def scale(self, factor_x=1, factor_y=1):
    '''Scale the spectrum.

    Parameters
    ----------
    `factor_x` : number  
        scaling factor in x direction (periods)  

    `factor_y` : number  
        scaling factor in y direction (spectrum)

    Returns
    -------
    `spectrum` : `Spectrum`
    '''
    return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y)
def to_dict(self)

Convert the instance properties to a dict.

Returns

it : dict

Expand source code
def to_dict(self):
    '''Convert the instance properties to a dict.  

    Returns
    -------
    `it` : dict
    '''
    return dict(
        periods=self.periods.tolist(),
        spectrum=self.spectrum.tolist()
    )
def to_json(self)

Convert the properties to json.

Returns

json : string

Expand source code
def to_json(self):
    '''Convert the properties to json.  

    Returns
    -------
    `json` : string  
    '''
    return json.dumps(self.to_dict())
def write_csv(self, filename)

Write the spectrum to a csv file.

Parameters

filename : string
The extension '.csv' will be automatically added.

Outputs

A file named filename.csv will be generated.

Expand source code
def write_csv(self, filename):
    '''Write the spectrum to a csv file.  

    Parameters
    ----------
    `filename` : string  
        The extension '.csv' will be automatically added.

    Outputs
    -------
    A file named `filename`.csv will be generated.
    '''
    np.savetxt(filename+'.csv', np.vstack(
        [self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum')
class Suite (folder, name, target_spectrum, period_bound)

A class that contains mainly a series of Motion instances, and corresponding series of factors, matching the suite of motions to the target Spectrum.

Parameters

folder : str
A path-like string for the suite.

name : str A specific name for the suite. The instance will be saved in '{folder}/{name}.suite'

target_spectrum : Spectrum
The target spectrum of the suite.

period_bound : List len=2
The matching period boundaries.

Expand source code
class Suite(object):
    '''A class that contains mainly a series of `Motion` instances, 
    and corresponding series of factors, matching the suite of motions to the target `Spectrum`.  
    '''

    def __init__(self, folder, name, target_spectrum, period_bound):
        '''
        Parameters
        ----------
        `folder` : str  
            A path-like string for the suite.

        `name` : str
            A specific name for the suite. The instance will be saved in '{folder}/{name}.suite'  

        `target_spectrum` : `Spectrum`  
            The target spectrum of the suite.  

        `period_bound` : List<float> len=2  
            The matching period boundaries.
        '''
        self.folder = folder
        '''str. See `Suite`'''
        os.makedirs(folder, exist_ok=True)
        self.name = name
        '''str. See `Suite`'''
        self.motions = []
        '''List<`Motion`>. Store all the motions.'''
        self.factors = []
        '''List<float>. Store all the factors that should be applied to the motions.'''
        self.target_spectrum = target_spectrum
        '''`Spectrum`. See `Suite`'''
        self.period_bound = period_bound
        '''List<float>. See `Suite`'''
        self.period_samples = np.linspace(period_bound[0], period_bound[1], 200)
        '''numpy.ndarray. The period within the `period_bound` is evenly sampled, with 200 points.'''
        self.spectrum_matrix = None
        '''numpy.ndarray. A matrix with shape (len(`Suite.period_samples`), len(`Suite.motions`)).
        This matrix times the `Suite.factors` vector products the average spectrum with `Suite.period_samples`.'''
        self.target_vector = target_spectrum.get(self.period_samples)
        '''numpy.ndarray. The vector generated by the target_spectrum sampling the `Suite.period_samples`'''

    def set_folder(self, folder):
        '''Reset the folder for the suite and the motions.

        Parameters
        ----------
        `folder` : str

        Outputs
        -------
        `Suite.folder` and `Suite.motions` will be changed.
        '''
        if folder is not None:
            self.folder = folder
            os.makedirs(folder, exist_ok=True)
            for mo in self.motions:
                mo.folder = folder

    def add_motion(self, motion, factor=1):
        '''Add motion to the suite together with a factor, to keep the lengths are identical.

        Parameters
        ----------
        `motion` : `Motion`

        `factor` : number

        Outputs
        -------
        `Suite.motions` and `Suite.factors` will be changed.
        '''
        self.motions.append(motion)
        self.factors.append(factor)

    def load_motions_from_names(self, names, folder=None):
        '''Give a list of name and the folder, load motions.

        Parameters
        ----------
        `names` : List<str>  
            The list of motion names.

        `folder` : str, None  
            The folder of the motions.  
            If is None, use `Suite.folder`

        Outputs
        -------
        `Suite.motions` and `Suite.factors` will be changed.
        '''
        folder = self.folder if folder is None else folder
        for name in names:
            mo = Motion.load(f'{folder}/{name}.motion')
            self.add_motion(mo)
            print(f'Loaded motion {mo.name}')
        self.reset_spectrum_matrix()

    def load_motions_from_folder(self, folder=None):
        '''Find all the .motion file from the given folder, and load them.  

        Parameters
        ----------
        `folder` : str, None   
            The folder to search the .motion files.  
            If is None, use `Suite.folder` instead.  

        Outputs
        -------
        `Suite.motions` and `Suite.factors` will be changed.
        '''
        folder = self.folder if folder is None else folder
        filenames = os.listdir(self.folder)
        for filename in filenames:
            if filename.endswith('.motion'):
                mo = Motion.load(f'{folder}/{filename}')
                self.add_motion(mo)
                print(f'Loadad motion {mo.name}')
        self.reset_spectrum_matrix()

    def filter_by_IDs(self, ids, new_name, new_folder=None):
        '''Give a series of selected IDs, filter the motions.  

        Parameters
        ----------
        `ids` : List<int>  
            The list of indexes of the selected motions in the current suite.  

        `new_name` : str  

        `new_folder` : str, None

        Returns
        -------
        `suite` : `Suite`  
            A new suite with the selected motions and their factors only.  
        '''
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        suite.motions = [self.motions[i] for i in ids]
        suite.factors = [self.factors[i] for i in ids]
        suite.reset_spectrum_matrix()
        print(f'The motions have been filtered. {len(ids)} motions remain.')
        return suite

    def filter_by_file(self, filename, new_name, new_folder=None):
        '''Give a file with indexes selected.  

        Parameters
        ----------
        Refer to `Suite.filter_by_IDs`.  

        Returns
        -------
        Refer to `Suite.filter_by_IDs`.  
        '''
        with open(filename, 'r') as f:
            lines = f.readlines()
        ids = [int(l) for l in lines]
        return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder)

    def get_average_spectrum(self):
        '''Return a spectrum which is the average spectrum of the suite.

        Outputs
        -------
        `spectrum` : `Spectrum`  
        '''
        periods = self.motions[0].spectrum.periods
        average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0]
        for i in range(1, len(self.motions)):
            average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i]
        average_spectrum /= len(self.motions)
        return Spectrum(periods, average_spectrum)

    def get_srss(self):
        '''Get the srss of the current averate spectrum to the target spectrum in the period boundary.

        Returns
        -------
        `srss` : float
        '''
        return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item()

    def reset_spectrum_matrix(self):
        '''Reset the spectrum matrix, which produces the average spectrum 
        in the boundary by multiplying factors vector.
        This method should be called everytime `Suite.motions` change.

        Outputs
        -------
        `Suite.spectrum_matrix` and `Suite.target_vector` will be changed.  
        '''
        mat = np.zeros((len(self.period_samples), len(self.motions)))
        for i in range(len(self.motions)):
            spectrum = self.motions[i].spectrum.get(self.period_samples)
            mat[:, i] = spectrum / len(self.motions)
        self.spectrum_matrix = mat
        self.target_vector = self.target_spectrum.get(self.period_samples)

    def eliminate_neg(self):
        '''Eliminate negative target vector by multiplying a factor for every individual.
        In other words, make the average spectrum above the target spectrum.

        Returns
        -------
        `factor` : float  
            The global factor that should be applied to each of the factors.  

        Outputs
        -------
        `Suite.factors` are all amplified by the returned `factor`.
        '''
        error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector
        argmin = np.argmin(error)
        factor = 1 / (1 + error[argmin] / self.target_vector[argmin])
        print(f'Multiply {factor:.4} to eliminate negative spectrum.')
        self.factors = [self.factors[i] * factor for i in range(len(self.factors))]
        return factor

    def match_individual_LSQ(self):
        '''Use Least square method for each individual motion to match the spectrum.

        Returns
        -------
        `residuals` : numpy.ndarray  
            srss of each motion matched to the target spectrum.  

        Outputs
        -------
        `Suite.factors` are changed.
        '''
        residuals = np.zeros(len(self.motions))
        for i in range(len(self.motions)):
            this = self.spectrum_matrix[:, i] * len(self.motions)
            that = self.target_vector
            factor = np.dot(this, that) / np.dot(this, this)
            residual = np.linalg.norm(this * factor - that)
            self.factors[i] = factor
            residuals[i] = residual
        print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}")
        return residuals

    def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000):
        '''Use Monte Carlo simulation and Least Square method to optimize the suite.
        after running, a bunch of suites will be written to the new_folder.

        Parameters  
        ----------
        `count` : int  
            The number of motions in the resulting suite.  

        `times` : int  
            The number of times that MC simulation with dimension is run.  
            The total number of samples of MC simulation will be {times} * {dimension}.  

        `use_lsq` : boolean  
            Whether use LSQ optimizer to better optimize the suite.  

        `output_count` : int  
            The number of outputs. if 0, output all.  

        `new_folder` : str 
        `lower_bound`, `upper_bound` : float  
            For the purpose that the individual motions are not too far from the individually matched factors,
            the lower and upper boundaries of amplifying the individually matched factors are set.  
            This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer.  

        `dimension` : int  
            Each time the Monte Carlo simulation is run, 
            means that {dimension} times of simulation have been done by using a matrix target.

        Returns
        -------
        List<`Suite`> with len=`output_count`.  
        The list is sorted. The best answer is the first result. However, visual check should be done.

        Outputs
        -------
        Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted.
        '''
        new_folder = new_folder if new_folder is not None else self.folder
        os.makedirs(new_folder, exist_ok=True)
        suites = []
        srsses = []
        for i in range(times):
            suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder)
            suites.append(suite_new)
            srsses.append(suite_new.get_srss())
            if use_lsq:
                suite_new2 = suite_new.optimize_LSQ(
                    new_name=f'lsq{i:03d}', 
                    new_folder=new_folder, 
                    lower_bound=lower_bound, 
                    upper_bound=upper_bound
                )
                suites.append(suite_new2)
                srsses.append(suite_new2.get_srss())
        argsorted = np.argsort(np.array(srsses))
        results = [suites[i] for i in argsorted[:output_count]]
        for i, suite in enumerate(results):
            suite.name = f'no{i}-{suite.name}'
            suite.save()
            suite.plot_all_spectrums(engine='pyplot')
        return results

    def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000):
        '''Use Monte Carlo simulation to filter the suite to count numbers of motions.

        Parameters
        ---------- 
        Refer to `Suite.filter_optimize`.  

        Returns
        -------
        `suite` : `Suite`   
            The optimized result after {dimension} times of MC simulations.
        '''
        print(f'Running Monte Carlo simulation, dimension={dimension}')
        self.match_individual_LSQ()
        factors = np.array(self.factors)
        xs = np.zeros((len(self.motions), dimension))
        for i in range(dimension):
            indexes = random.sample(range(len(self.motions)), count)
            xs[indexes, i] = factors[indexes]
        mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs)
        loss = mul - self.target_vector.reshape(-1, 1)
        argmin = np.argmin(loss, axis=0)
        lossmin = np.min(loss, axis=0)
        target_at_lossmin = self.target_vector[argmin]
        amps = 1 / (lossmin / target_at_lossmin + 1)
        lossAmp = mul * amps - self.target_vector.reshape(-1, 1)
        srss = np.linalg.norm(lossAmp, axis=0)
        arg = np.argmin(srss)
        factors = xs[:, arg] * amps[arg]
        suvives = np.where(factors > 1.0e-3)
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        suite.motions = [self.motions[i] for i in suvives[0]]
        suite.factors = [factors[i] for i in suvives[0]]
        suite.reset_spectrum_matrix()
        return suite

    def _loss(self, x):
        '''Utility function: calculate the loss for optimization.'''
        factors = x * np.array(self.factors)
        mul = np.matmul(self.spectrum_matrix, factors)
        loss = mul - self.target_vector
        argmin = np.argmin(loss)
        amp = 1 / (loss[argmin] / self.target_vector[argmin] + 1)
        return mul * amp - self.target_vector

    def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4):
        '''Use Least Square method from scipy to optimize the factors.

        Parameters
        ----------
        Refer to `Suite.filter_optimize`.   

        Returns
        -------
        `suite` : `Suite`  
            Optimized suite.
        '''
        print('Running Bounded Least Square optimization ...')
        res = least_squares(self._loss,
                            np.ones(len(self.motions)),
                            bounds=(lower_bound, upper_bound),
                            verbose=0)
        factors = res.x * self.factors
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        suite.factors = factors.tolist()
        suite.eliminate_neg()
        return suite

    def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False):
        '''From a folder load all the PEER motions into the suite.  
        Extract the downloaded peer zip file first.

        Parameters
        ----------
        `peer_folder` : str  
            The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found.

        `h1`, `h2`, `v` : boolean  
            Whether load the three directional files or not.  

        Outputs
        -------
        `Suite.motions`, `Suite.factors` will be changed.  
        All the PEER motions will be saved in .motion format in the new folder.  
        '''
        with open(f'{peer_folder}/_SearchResults.csv', "r") as f:
            for _ in range(34):
                f.readline()
            i = 0
            positions = []
            if h1:
                positions.append(19)
            if h2:
                positions.append(20)
            if v:
                positions.append(21)
            line = f.readline()
            while line != '\n':
                cells = line.split(',')
                for pos in positions:
                    filename = cells[pos].replace(' ', '')
                    motion_name = filename.replace('.AT2', '')
                    print(f'Loading {motion_name}')
                    mo = Motion(folder=self.folder, name=motion_name)
                    mo.load_peer(peer_filename=f'{peer_folder}/{filename}')
                    mo.generate_spectrum()
                    mo.save()
                    self.add_motion(mo)
                    i += 1
                line = f.readline()
        self.reset_spectrum_matrix()
        print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.")
        return True

    def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True):
        '''Beautify the suite in 4 steps:  

        1. truncate the motions, see `Motion.truncate`  
        2. trim the near-zero parts, see `Motion.trim` 
        3. change dt to uniform, see `Motion.change_dt`
        4. rename the motions to a series.  

        This procedure is always logged to the current folder.  

        Parameters
        ----------
        `new_name` : str  

        `run_steps` : List<int>  
            The step number to run. Users can select which one out of the 4 steps to run.
            The order will also follow this List.  

        `dt` : float  
            The uniform delta t.   

        `new_folder` : str  

        `trunc_dict` : dict  
            A dict that defines the `time` parameter in `Motion.truncate`  
            In the dict, keys are the indexes of the motions in the suite, values are the `time` values.

        `left`, `right` : float  
            The parameters of PGA to trim. Refer to `Motion.trim`.  

        Returns
        -------
        `suite` : `Suite`

        Outputs
        -------
        All the logs and the pre- and post- beautify average spectrum.
        '''
        suite = self.copy()
        spectrum_hist = suite.get_average_spectrum()
        suite.plot_all_spectrums(engine='pyplot')
        for i, motion in enumerate(self.motions):
            mo = motion.copy()
            mo.name = f'{self.name}-{i}-b'
            for step in run_steps:
                if step == 1:
                    if i in trunc_dict:
                        mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1')
                elif step == 2:
                    mo = mo.trim(new_name=mo.name+'2', left=left, right=right)
                elif step == 3:
                    mo = mo.change_dt(dt=dt, new_name=mo.name+'3')
                elif step == 4:
                    mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}'
                else:
                    print(f'step={step} is not recognized and ignored.')
            suite.motions[i] = mo
        spectrum_new = suite.get_average_spectrum()
        plt.figure()
        plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before')
        plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after')
        plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target')
        plt.legend()
        plt.suptitle(f'Beautify: {self.name} -> {new_name}')
        plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200)
        suite.name = new_name 
        if new_folder is not None:
            suite.set_folder(new_folder)
        return suite

    def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True):
        '''Scale the motions both horizontally and vertically.  

        Parameters
        ----------
        `new_name` : str  

        `new_folder` : str, None  

        `factor_x`, `factor_y` : float  
            The factors to amplify in x axis and y axis.  

        `scale_target_spectrum` : boolean  
            Whether scale the target spectrum as well.

        Returns
        -------
        `suite` : `Suite`
        '''
        suite = self.copy()
        suite.name = new_name
        suite.set_folder(new_folder)
        for i, motion in enumerate(self.motions):
            suite.motions[i] = motion.scale(
                new_name=motion.name, factor_x=factor_x, factor_y=factor_y)
        if scale_target_spectrum:
            suite.target_spectrum = self.target_spectrum.scale(
                factor_x=factor_x, factor_y=factor_y)
        if factor_x != 1:
            suite.period_bound = [p * factor_x for p in self.period_bound]
            suite.period_samples = self.period_samples * factor_x
        if factor_y != 1:
            suite.factors = [f*factor_y for f in self.factors]
        suite.reset_spectrum_matrix()
        return suite

    def write_TCL(self, folder=None, filename=None):
        '''Write the suite to TCL forms for OpenSees.  
        Currently the TCL contains only one dict. 
        The keys of the dict is the motion names. Then the essential properties are stored.  
        At the same time, the accelerations for each motion is written to a text file.

        Parameters
        ----------
        `folder` : str, None  
            The folder to write the tcl files.
            If is None, use `Suite.folder`.

        `filename` :  str, None  
            The tcl file name. If is None, use `Suite.name`.

        Outputs
        -------
        A tcl file and a series of acceleration files.
        '''
        folder = self.folder if folder is None else folder
        filename = self.name if filename is None else filename.replace(
            '.tcl', '') + '.tcl'
        os.makedirs(folder, exist_ok=True)
        config = ''
        config += 'dict set motion keys {'
        names = [mo.name for mo in self.motions]
        config += ' '.join(names)
        config += '}\n\n'
        for i, mo in enumerate(self.motions):
            mo.write_accel(f'{folder}/{mo.name}', time=False)
            config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n'
            config += f'dict set motion {mo.name} npts {len(mo.accel)}\n'
            config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n'
            config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n'
        with open(f'{folder}/{filename}.tcl', 'w') as f:
            f.write(config)

    def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'):
        '''Plot amplified accelograms to a single file.  

        Parameters
        ----------
        `folder` : str, None  
            If is None, use `Suite.folder`.

        `filename` : str, None  
            If is None, use `Suite.name` + '_accel'

        `save` : boolean  
            If True, save the file. Else, show immediately.  

        `engine` : 'pyplot' or 'plotly'  

        Outputs
        -------
        A figure is plotted.
        '''
        folder = self.folder if folder is None else folder
        if engine == 'plotly':
            filename = f'{self.name}_accel.html' if filename is None else filename + '.html'
            fig = go.Figure()
            for i, motion in enumerate(self.motions):
                fig.add_trace(go.Scatter(
                    x=np.arange(len(motion.accels)) * motion.dt,
                    y=motion.accels,
                    mode='lines',
                    name=str(i) + motion.name,
                    text=motion.name
                ))
            if save:
                fig.write_html(f'{folder}/{filename}')
            else:
                fig.show()
        elif engine == 'pyplot':
            print('Pyplot is not supported.')
        else:
            print('ERROR: engine should be in "pyplot" and "plotly"')

    def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'):
        '''Plot all amplified spectrums in one file.  

        Parameters
        ----------
        Refer to `Suite.plot_all_accels`. 

        The filename if is None will be `Suite.name` + '_spectrum'  

        Outputs
        -------
        A figure will be generated.
        '''
        if engine == 'plotly':
            folder = self.folder if folder is None else folder
            filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html'
            fig = go.Figure()
            for i, motion in enumerate(self.motions):
                fig.add_trace(go.Scatter(
                    x=motion.spectrum.periods,
                    y=motion.spectrum.spectrum * self.factors[i],
                    mode='lines',
                    name=motion.name,
                    text=motion.name
                ))
            spectrum1 = self.target_spectrum
            spectrum2 = self.get_average_spectrum()
            fig.add_trace(go.Scatter(
                x=spectrum1.periods,
                y=spectrum1.spectrum,
                mode='lines',
                name='Target',
                text='Target',
                line=dict(width=5),
            ))
            fig.add_trace(go.Scatter(
                x=spectrum2.periods,
                y=spectrum2.spectrum,
                mode='lines',
                name='Average',
                text='Average',
                line=dict(width=5),
            ))
            if save:
                fig.write_html(f'{folder}/{filename}')
            else:
                fig.show()
        elif engine == 'pyplot':
            folder = self.folder if folder is None else folder
            filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png'
            plt.figure(figsize=(7, 5))
            for i, motion in enumerate(self.motions):
                plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver')
            plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3)
            averageSpectrum = self.get_average_spectrum()
            plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2)
            plt.legend(loc='best', fontsize='x-small')
            plt.title(f'SRSS={self.get_srss():.4f}')
            if save:
                plt.savefig(os.path.join(folder, filename), dpi=200)
            else:
                plt.show()
        else:
            print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.")

    def plot_individual(self, save=True):
        '''Plot all individual ground motions and the spectrum.

        Parameters
        ----------
        Refer to `Suite.plot_all_accels`.  

        Outputs
        -------
        If save, a figure for each motion will be generated.  
        '''
        for i, mo in enumerate(self.motions):
            _, ax = plt.subplots(2, 1, figsize=(7, 5))
            ax[0].plot(np.arange(len(mo.accel)) * mo.dt,
                        mo.accel * self.factors[i])
            ax[1].plot(mo.spectrum.periods,
                        mo.spectrum.spectrum * self.factors[i])
            ax[1].plot(self.target_spectrum.periods,
                        self.target_spectrum.spectrum, linewidth=2)
            plt.suptitle(
                f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}')
            filename = f'{self.folder}/motion{i}.png'
            if save:
                plt.savefig(filename, dpi=200)
            else:
                plt.show()

    def plot_interactive(self, save=True):
        '''Plot the spectrum and the acceleration time-histories to an interactive html file.
        
        Parameters
        ----------
        `save` : boolean  
            If True, write to html file. Otherwise, show directly without saving.  
        
        Outputs
        -------
        A html file with interactive plotting sliders.
        '''
        fig = make_subplots(rows=1, cols=2)
        for i in range(len(self.motions)):
            fig.add_trace(go.Scatter(
                x=self.motions[i].spectrum.periods,
                y=self.motions[i].spectrum.spectrum * self.factors[i],
                name=self.motions[i].name,
            ), row=1, col=1)
            fig.add_trace(go.Scatter(
                x=self.motions[i].get_times(),
                y=self.motions[i].accel * self.factors[i],
                    name=self.motions[i].name,
            ), row=1, col=2)
        spectrum_average = self.get_average_spectrum()
        fig.add_trace(go.Scatter(
            x=spectrum_average.periods,
            y=spectrum_average.spectrum,
            name='average',
            line={'width': 5}
        ))
        fig.add_trace(go.Scatter(
            x=self.target_spectrum.periods,
            y=self.target_spectrum.spectrum,
            name='target',
            line={'width': 5}
        ))

        def _title(i):
            return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}'
        
        steps = []
        for i in range(len(self.motions)):
            step = dict(
                method='update',
                args=[
                    {'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]},
                    {'title': _title(i)}
                ],
                label=str(i),
            )
            steps.append(step)
        sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})]
        updatemenus = [dict(
            buttons=[dict(
                label='All',
                method='update',
                args=[
                    {'visible': [True] * (len(self.motions)*2+2)},
                    {'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'}
                ]
            )],
            type='buttons',
            x=1,
            y=0,
            xanchor='right',
            yanchor='top',
            pad={'t': 30},
        )]
        fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus)
        if save:
            fig.write_html(f'{self.folder}/{self.name}.html')
        else:
            fig.show()
    
    def to_dict(self, separate_motion=True):
        '''Convert all the properties to dict.  

        Parameters
        ----------  
        `separate_motion` : boolean  
            If True, the motion details are not included into the dict.  
            User has to make sure these motions are not moved.

        Returns
        -------  
        `it` : dict  
        '''
        return dict(
            folder=self.folder,
            name=self.name,
            motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [
                f'{mo.folder}/{mo.name}' for mo in self.motions],
            factors=self.factors if type(self.factors[0] == float) else [
                fact.item() for fact in self.factors],
            period_bound=self.period_bound,
            target_spectrum=self.target_spectrum.to_dict(),
            period_samples=self.period_samples.tolist(),
            srss=self.get_srss(),
            average_spectrum=self.get_average_spectrum().to_dict(),
        )

    def to_json(self, separate_motion=True):
        '''Convert the properties to json.  

        Parameters
        ----------
        Refer to `Suite.to_dict`.  

        Returns
        -------
        `json` : str
        '''
        return json.dumps(self.to_dict(separate_motion=separate_motion))

    def save(self, separate_motion=True, rewrite=False):
        '''Save the suite to a .suite file.  

        Parameters
        ----------  
        `separate_motion` : boolean  
            If True, the motion details are not included into the file.  
            User has to make sure the motions are not moved.

        `rewrite` : boolean   
            If True, all the '.motion' files will be rewritten.  
            otherwise, the motion files will be ignored.

        Outputs
        -------
        Create a .suite file to store the suite.
        '''
        with open(f'{self.folder}/{self.name}.suite', 'w') as f:
            f.write(self.to_json(separate_motion=separate_motion))
        if separate_motion:
            for mo in self.motions:
                if not os.path.exists(f'{mo.folder}/{mo.name}.motion'):
                    mo.save()
                    print(f'Motion saved to {mo.folder}/{mo.name}.motion')
                else:
                    if rewrite:
                        mo.save()
                        print(f'Motion {mo.folder}/{mo.name}.motion is written.')
        print(f'Saved to {self.folder}/{self.name}.suite')

    @staticmethod
    def read_dict(it):
        '''Read the dictionary representing the properties of the class.  
        If key 'motions' is a instance of dict, then construct motions with dict.   
        else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file.  

        Parameters
        ----------  
        `it` : dict  
            The dict generated by `Suite.to_dict`  

        Returns
        -------  
        `suite` : `Suite` 
        '''
        suite = Suite(
            folder=it['folder'],
            name=it['name'],
            target_spectrum=Spectrum.read_dict(it['target_spectrum']),
            period_bound=it['period_bound']
        )
        for i, mo_content in enumerate(it['motions']):
            if isinstance(mo_content, dict):
                suite.add_motion(Motion.read_dict(
                    mo_content), it['factors'][i])
            else:
                suite.add_motion(Motion.load(
                    f'{mo_content}.motion'), it['factors'][i])
        suite.period_samples = np.array(it['period_samples'])
        suite.target_vector = suite.target_spectrum.get(suite.period_samples)
        suite.reset_spectrum_matrix()
        return suite

    @staticmethod
    def load(filename):
        '''Load a the suite from a file.  

        Parameters
        ----------
        `filename` : str  
            File that stores the suite.

        Returns
        -------
        `suite` : `Suite`
        '''
        if not filename.endswith('.suite'):
            filename += '.suite'
        with open(filename, 'r') as f:
            return Suite.read_dict(json.loads(f.read()))

    def copy(self):
        '''Copy the instance.  
        Returns
        -------
        `suite` : `Suite`
        '''
        return Suite.read_dict(self.to_dict())

Static methods

def load(filename)

Load a the suite from a file.

Parameters

filename : str
File that stores the suite.

Returns

suite : Suite

Expand source code
@staticmethod
def load(filename):
    '''Load a the suite from a file.  

    Parameters
    ----------
    `filename` : str  
        File that stores the suite.

    Returns
    -------
    `suite` : `Suite`
    '''
    if not filename.endswith('.suite'):
        filename += '.suite'
    with open(filename, 'r') as f:
        return Suite.read_dict(json.loads(f.read()))
def read_dict(it)

Read the dictionary representing the properties of the class.
If key 'motions' is a instance of dict, then construct motions with dict.
else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file.

Parameters

it : dict
The dict generated by Suite.to_dict()

Returns

suite : Suite

Expand source code
@staticmethod
def read_dict(it):
    '''Read the dictionary representing the properties of the class.  
    If key 'motions' is a instance of dict, then construct motions with dict.   
    else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file.  

    Parameters
    ----------  
    `it` : dict  
        The dict generated by `Suite.to_dict`  

    Returns
    -------  
    `suite` : `Suite` 
    '''
    suite = Suite(
        folder=it['folder'],
        name=it['name'],
        target_spectrum=Spectrum.read_dict(it['target_spectrum']),
        period_bound=it['period_bound']
    )
    for i, mo_content in enumerate(it['motions']):
        if isinstance(mo_content, dict):
            suite.add_motion(Motion.read_dict(
                mo_content), it['factors'][i])
        else:
            suite.add_motion(Motion.load(
                f'{mo_content}.motion'), it['factors'][i])
    suite.period_samples = np.array(it['period_samples'])
    suite.target_vector = suite.target_spectrum.get(suite.period_samples)
    suite.reset_spectrum_matrix()
    return suite

Instance variables

var factors

List. Store all the factors that should be applied to the motions.

var folder

str. See Suite

var motions

List<Motion>. Store all the motions.

var name

str. See Suite

var period_bound

List. See Suite

var period_samples

numpy.ndarray. The period within the period_bound is evenly sampled, with 200 points.

var spectrum_matrix

numpy.ndarray. A matrix with shape (len(Suite.period_samples), len(Suite.motions)). This matrix times the Suite.factors vector products the average spectrum with Suite.period_samples.

var target_spectrum

Spectrum. See Suite

var target_vector

numpy.ndarray. The vector generated by the target_spectrum sampling the Suite.period_samples

Methods

def add_motion(self, motion, factor=1)

Add motion to the suite together with a factor, to keep the lengths are identical.

Parameters

motion : Motion

factor : number

Outputs

Suite.motions and Suite.factors will be changed.

Expand source code
def add_motion(self, motion, factor=1):
    '''Add motion to the suite together with a factor, to keep the lengths are identical.

    Parameters
    ----------
    `motion` : `Motion`

    `factor` : number

    Outputs
    -------
    `Suite.motions` and `Suite.factors` will be changed.
    '''
    self.motions.append(motion)
    self.factors.append(factor)
def beautify(self, new_name, run_steps=[1, 2, 3, 4], dt=0.01, new_folder=None, trunc_dict={}, left=0.001, right=0.001, prepend_zero=True)

Beautify the suite in 4 steps:

  1. truncate the motions, see Motion.truncate()
  2. trim the near-zero parts, see Motion.trim()
  3. change dt to uniform, see Motion.change_dt()
  4. rename the motions to a series.

This procedure is always logged to the current folder.

Parameters

new_name : str

run_steps : List
The step number to run. Users can select which one out of the 4 steps to run. The order will also follow this List.

dt : float
The uniform delta t.

new_folder : str

trunc_dict : dict
A dict that defines the time parameter in Motion.truncate()
In the dict, keys are the indexes of the motions in the suite, values are the time values.

left, right : float
The parameters of PGA to trim. Refer to Motion.trim().

Returns

suite : Suite

Outputs

All the logs and the pre- and post- beautify average spectrum.

Expand source code
def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True):
    '''Beautify the suite in 4 steps:  

    1. truncate the motions, see `Motion.truncate`  
    2. trim the near-zero parts, see `Motion.trim` 
    3. change dt to uniform, see `Motion.change_dt`
    4. rename the motions to a series.  

    This procedure is always logged to the current folder.  

    Parameters
    ----------
    `new_name` : str  

    `run_steps` : List<int>  
        The step number to run. Users can select which one out of the 4 steps to run.
        The order will also follow this List.  

    `dt` : float  
        The uniform delta t.   

    `new_folder` : str  

    `trunc_dict` : dict  
        A dict that defines the `time` parameter in `Motion.truncate`  
        In the dict, keys are the indexes of the motions in the suite, values are the `time` values.

    `left`, `right` : float  
        The parameters of PGA to trim. Refer to `Motion.trim`.  

    Returns
    -------
    `suite` : `Suite`

    Outputs
    -------
    All the logs and the pre- and post- beautify average spectrum.
    '''
    suite = self.copy()
    spectrum_hist = suite.get_average_spectrum()
    suite.plot_all_spectrums(engine='pyplot')
    for i, motion in enumerate(self.motions):
        mo = motion.copy()
        mo.name = f'{self.name}-{i}-b'
        for step in run_steps:
            if step == 1:
                if i in trunc_dict:
                    mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1')
            elif step == 2:
                mo = mo.trim(new_name=mo.name+'2', left=left, right=right)
            elif step == 3:
                mo = mo.change_dt(dt=dt, new_name=mo.name+'3')
            elif step == 4:
                mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}'
            else:
                print(f'step={step} is not recognized and ignored.')
        suite.motions[i] = mo
    spectrum_new = suite.get_average_spectrum()
    plt.figure()
    plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before')
    plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after')
    plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target')
    plt.legend()
    plt.suptitle(f'Beautify: {self.name} -> {new_name}')
    plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200)
    suite.name = new_name 
    if new_folder is not None:
        suite.set_folder(new_folder)
    return suite
def copy(self)

Copy the instance.
Returns


suite : Suite

Expand source code
def copy(self):
    '''Copy the instance.  
    Returns
    -------
    `suite` : `Suite`
    '''
    return Suite.read_dict(self.to_dict())
def eliminate_neg(self)

Eliminate negative target vector by multiplying a factor for every individual. In other words, make the average spectrum above the target spectrum.

Returns

factor : float
The global factor that should be applied to each of the factors.

Outputs

Suite.factors are all amplified by the returned factor.

Expand source code
def eliminate_neg(self):
    '''Eliminate negative target vector by multiplying a factor for every individual.
    In other words, make the average spectrum above the target spectrum.

    Returns
    -------
    `factor` : float  
        The global factor that should be applied to each of the factors.  

    Outputs
    -------
    `Suite.factors` are all amplified by the returned `factor`.
    '''
    error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector
    argmin = np.argmin(error)
    factor = 1 / (1 + error[argmin] / self.target_vector[argmin])
    print(f'Multiply {factor:.4} to eliminate negative spectrum.')
    self.factors = [self.factors[i] * factor for i in range(len(self.factors))]
    return factor
def filter_by_IDs(self, ids, new_name, new_folder=None)

Give a series of selected IDs, filter the motions.

Parameters

ids : List
The list of indexes of the selected motions in the current suite.

new_name : str

new_folder : str, None

Returns

suite : Suite
A new suite with the selected motions and their factors only.

Expand source code
def filter_by_IDs(self, ids, new_name, new_folder=None):
    '''Give a series of selected IDs, filter the motions.  

    Parameters
    ----------
    `ids` : List<int>  
        The list of indexes of the selected motions in the current suite.  

    `new_name` : str  

    `new_folder` : str, None

    Returns
    -------
    `suite` : `Suite`  
        A new suite with the selected motions and their factors only.  
    '''
    suite = self.copy()
    suite.name = new_name
    suite.set_folder(new_folder)
    suite.motions = [self.motions[i] for i in ids]
    suite.factors = [self.factors[i] for i in ids]
    suite.reset_spectrum_matrix()
    print(f'The motions have been filtered. {len(ids)} motions remain.')
    return suite
def filter_by_file(self, filename, new_name, new_folder=None)

Give a file with indexes selected.

Parameters

Refer to Suite.filter_by_IDs().

Returns

Refer to Suite.filter_by_IDs().

Expand source code
def filter_by_file(self, filename, new_name, new_folder=None):
    '''Give a file with indexes selected.  

    Parameters
    ----------
    Refer to `Suite.filter_by_IDs`.  

    Returns
    -------
    Refer to `Suite.filter_by_IDs`.  
    '''
    with open(filename, 'r') as f:
        lines = f.readlines()
    ids = [int(l) for l in lines]
    return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder)
def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000)

Use Monte Carlo simulation to filter the suite to count numbers of motions.

Parameters

Refer to Suite.filter_optimize().

Returns

suite : Suite
The optimized result after {dimension} times of MC simulations.

Expand source code
def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000):
    '''Use Monte Carlo simulation to filter the suite to count numbers of motions.

    Parameters
    ---------- 
    Refer to `Suite.filter_optimize`.  

    Returns
    -------
    `suite` : `Suite`   
        The optimized result after {dimension} times of MC simulations.
    '''
    print(f'Running Monte Carlo simulation, dimension={dimension}')
    self.match_individual_LSQ()
    factors = np.array(self.factors)
    xs = np.zeros((len(self.motions), dimension))
    for i in range(dimension):
        indexes = random.sample(range(len(self.motions)), count)
        xs[indexes, i] = factors[indexes]
    mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs)
    loss = mul - self.target_vector.reshape(-1, 1)
    argmin = np.argmin(loss, axis=0)
    lossmin = np.min(loss, axis=0)
    target_at_lossmin = self.target_vector[argmin]
    amps = 1 / (lossmin / target_at_lossmin + 1)
    lossAmp = mul * amps - self.target_vector.reshape(-1, 1)
    srss = np.linalg.norm(lossAmp, axis=0)
    arg = np.argmin(srss)
    factors = xs[:, arg] * amps[arg]
    suvives = np.where(factors > 1.0e-3)
    suite = self.copy()
    suite.name = new_name
    suite.set_folder(new_folder)
    suite.motions = [self.motions[i] for i in suvives[0]]
    suite.factors = [factors[i] for i in suvives[0]]
    suite.reset_spectrum_matrix()
    return suite
def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000)

Use Monte Carlo simulation and Least Square method to optimize the suite. after running, a bunch of suites will be written to the new_folder.

Parameters

count : int
The number of motions in the resulting suite.

times : int
The number of times that MC simulation with dimension is run.
The total number of samples of MC simulation will be {times} * {dimension}.

use_lsq : boolean
Whether use LSQ optimizer to better optimize the suite.

output_count : int
The number of outputs. if 0, output all.

new_folder : str lower_bound, upper_bound : float
For the purpose that the individual motions are not too far from the individually matched factors, the lower and upper boundaries of amplifying the individually matched factors are set.
This is only useful when use_lsq is set, to add boundaries to the LSQ optimizer.

dimension : int
Each time the Monte Carlo simulation is run, means that {dimension} times of simulation have been done by using a matrix target.

Returns

List<Suite> with len=output_count.
The list is sorted. The best answer is the first result. However, visual check should be done.

Outputs

Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted.

Expand source code
def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000):
    '''Use Monte Carlo simulation and Least Square method to optimize the suite.
    after running, a bunch of suites will be written to the new_folder.

    Parameters  
    ----------
    `count` : int  
        The number of motions in the resulting suite.  

    `times` : int  
        The number of times that MC simulation with dimension is run.  
        The total number of samples of MC simulation will be {times} * {dimension}.  

    `use_lsq` : boolean  
        Whether use LSQ optimizer to better optimize the suite.  

    `output_count` : int  
        The number of outputs. if 0, output all.  

    `new_folder` : str 
    `lower_bound`, `upper_bound` : float  
        For the purpose that the individual motions are not too far from the individually matched factors,
        the lower and upper boundaries of amplifying the individually matched factors are set.  
        This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer.  

    `dimension` : int  
        Each time the Monte Carlo simulation is run, 
        means that {dimension} times of simulation have been done by using a matrix target.

    Returns
    -------
    List<`Suite`> with len=`output_count`.  
    The list is sorted. The best answer is the first result. However, visual check should be done.

    Outputs
    -------
    Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted.
    '''
    new_folder = new_folder if new_folder is not None else self.folder
    os.makedirs(new_folder, exist_ok=True)
    suites = []
    srsses = []
    for i in range(times):
        suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder)
        suites.append(suite_new)
        srsses.append(suite_new.get_srss())
        if use_lsq:
            suite_new2 = suite_new.optimize_LSQ(
                new_name=f'lsq{i:03d}', 
                new_folder=new_folder, 
                lower_bound=lower_bound, 
                upper_bound=upper_bound
            )
            suites.append(suite_new2)
            srsses.append(suite_new2.get_srss())
    argsorted = np.argsort(np.array(srsses))
    results = [suites[i] for i in argsorted[:output_count]]
    for i, suite in enumerate(results):
        suite.name = f'no{i}-{suite.name}'
        suite.save()
        suite.plot_all_spectrums(engine='pyplot')
    return results
def get_average_spectrum(self)

Return a spectrum which is the average spectrum of the suite.

Outputs

spectrum : Spectrum

Expand source code
def get_average_spectrum(self):
    '''Return a spectrum which is the average spectrum of the suite.

    Outputs
    -------
    `spectrum` : `Spectrum`  
    '''
    periods = self.motions[0].spectrum.periods
    average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0]
    for i in range(1, len(self.motions)):
        average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i]
    average_spectrum /= len(self.motions)
    return Spectrum(periods, average_spectrum)
def get_srss(self)

Get the srss of the current averate spectrum to the target spectrum in the period boundary.

Returns

srss : float

Expand source code
def get_srss(self):
    '''Get the srss of the current averate spectrum to the target spectrum in the period boundary.

    Returns
    -------
    `srss` : float
    '''
    return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item()
def load_motions_from_folder(self, folder=None)

Find all the .motion file from the given folder, and load them.

Parameters

folder : str, None
The folder to search the .motion files.
If is None, use Suite.folder instead.

Outputs

Suite.motions and Suite.factors will be changed.

Expand source code
def load_motions_from_folder(self, folder=None):
    '''Find all the .motion file from the given folder, and load them.  

    Parameters
    ----------
    `folder` : str, None   
        The folder to search the .motion files.  
        If is None, use `Suite.folder` instead.  

    Outputs
    -------
    `Suite.motions` and `Suite.factors` will be changed.
    '''
    folder = self.folder if folder is None else folder
    filenames = os.listdir(self.folder)
    for filename in filenames:
        if filename.endswith('.motion'):
            mo = Motion.load(f'{folder}/{filename}')
            self.add_motion(mo)
            print(f'Loadad motion {mo.name}')
    self.reset_spectrum_matrix()
def load_motions_from_names(self, names, folder=None)

Give a list of name and the folder, load motions.

Parameters

names : List
The list of motion names.

folder : str, None
The folder of the motions.
If is None, use Suite.folder

Outputs

Suite.motions and Suite.factors will be changed.

Expand source code
def load_motions_from_names(self, names, folder=None):
    '''Give a list of name and the folder, load motions.

    Parameters
    ----------
    `names` : List<str>  
        The list of motion names.

    `folder` : str, None  
        The folder of the motions.  
        If is None, use `Suite.folder`

    Outputs
    -------
    `Suite.motions` and `Suite.factors` will be changed.
    '''
    folder = self.folder if folder is None else folder
    for name in names:
        mo = Motion.load(f'{folder}/{name}.motion')
        self.add_motion(mo)
        print(f'Loaded motion {mo.name}')
    self.reset_spectrum_matrix()
def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False)

From a folder load all the PEER motions into the suite.
Extract the downloaded peer zip file first.

Parameters

peer_folder : str
The folder path of the extracted PEER data. Where _SearchResults.csv can be found.

h1, h2, v : boolean
Whether load the three directional files or not.

Outputs

Suite.motions, Suite.factors will be changed.
All the PEER motions will be saved in .motion format in the new folder.

Expand source code
def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False):
    '''From a folder load all the PEER motions into the suite.  
    Extract the downloaded peer zip file first.

    Parameters
    ----------
    `peer_folder` : str  
        The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found.

    `h1`, `h2`, `v` : boolean  
        Whether load the three directional files or not.  

    Outputs
    -------
    `Suite.motions`, `Suite.factors` will be changed.  
    All the PEER motions will be saved in .motion format in the new folder.  
    '''
    with open(f'{peer_folder}/_SearchResults.csv', "r") as f:
        for _ in range(34):
            f.readline()
        i = 0
        positions = []
        if h1:
            positions.append(19)
        if h2:
            positions.append(20)
        if v:
            positions.append(21)
        line = f.readline()
        while line != '\n':
            cells = line.split(',')
            for pos in positions:
                filename = cells[pos].replace(' ', '')
                motion_name = filename.replace('.AT2', '')
                print(f'Loading {motion_name}')
                mo = Motion(folder=self.folder, name=motion_name)
                mo.load_peer(peer_filename=f'{peer_folder}/{filename}')
                mo.generate_spectrum()
                mo.save()
                self.add_motion(mo)
                i += 1
            line = f.readline()
    self.reset_spectrum_matrix()
    print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.")
    return True
def match_individual_LSQ(self)

Use Least square method for each individual motion to match the spectrum.

Returns

residuals : numpy.ndarray
srss of each motion matched to the target spectrum.

Outputs

Suite.factors are changed.

Expand source code
def match_individual_LSQ(self):
    '''Use Least square method for each individual motion to match the spectrum.

    Returns
    -------
    `residuals` : numpy.ndarray  
        srss of each motion matched to the target spectrum.  

    Outputs
    -------
    `Suite.factors` are changed.
    '''
    residuals = np.zeros(len(self.motions))
    for i in range(len(self.motions)):
        this = self.spectrum_matrix[:, i] * len(self.motions)
        that = self.target_vector
        factor = np.dot(this, that) / np.dot(this, this)
        residual = np.linalg.norm(this * factor - that)
        self.factors[i] = factor
        residuals[i] = residual
    print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}")
    return residuals
def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4)

Use Least Square method from scipy to optimize the factors.

Parameters

Refer to Suite.filter_optimize().

Returns

suite : Suite
Optimized suite.

Expand source code
def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4):
    '''Use Least Square method from scipy to optimize the factors.

    Parameters
    ----------
    Refer to `Suite.filter_optimize`.   

    Returns
    -------
    `suite` : `Suite`  
        Optimized suite.
    '''
    print('Running Bounded Least Square optimization ...')
    res = least_squares(self._loss,
                        np.ones(len(self.motions)),
                        bounds=(lower_bound, upper_bound),
                        verbose=0)
    factors = res.x * self.factors
    suite = self.copy()
    suite.name = new_name
    suite.set_folder(new_folder)
    suite.factors = factors.tolist()
    suite.eliminate_neg()
    return suite
def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot')

Plot amplified accelograms to a single file.

Parameters

folder : str, None
If is None, use Suite.folder.

filename : str, None
If is None, use Suite.name + '_accel'

save : boolean
If True, save the file. Else, show immediately.

engine : 'pyplot' or 'plotly'

Outputs

A figure is plotted.

Expand source code
def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'):
    '''Plot amplified accelograms to a single file.  

    Parameters
    ----------
    `folder` : str, None  
        If is None, use `Suite.folder`.

    `filename` : str, None  
        If is None, use `Suite.name` + '_accel'

    `save` : boolean  
        If True, save the file. Else, show immediately.  

    `engine` : 'pyplot' or 'plotly'  

    Outputs
    -------
    A figure is plotted.
    '''
    folder = self.folder if folder is None else folder
    if engine == 'plotly':
        filename = f'{self.name}_accel.html' if filename is None else filename + '.html'
        fig = go.Figure()
        for i, motion in enumerate(self.motions):
            fig.add_trace(go.Scatter(
                x=np.arange(len(motion.accels)) * motion.dt,
                y=motion.accels,
                mode='lines',
                name=str(i) + motion.name,
                text=motion.name
            ))
        if save:
            fig.write_html(f'{folder}/{filename}')
        else:
            fig.show()
    elif engine == 'pyplot':
        print('Pyplot is not supported.')
    else:
        print('ERROR: engine should be in "pyplot" and "plotly"')
def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot')

Plot all amplified spectrums in one file.

Parameters

Refer to Suite.plot_all_accels().

The filename if is None will be Suite.name + '_spectrum'

Outputs

A figure will be generated.

Expand source code
def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'):
    '''Plot all amplified spectrums in one file.  

    Parameters
    ----------
    Refer to `Suite.plot_all_accels`. 

    The filename if is None will be `Suite.name` + '_spectrum'  

    Outputs
    -------
    A figure will be generated.
    '''
    if engine == 'plotly':
        folder = self.folder if folder is None else folder
        filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html'
        fig = go.Figure()
        for i, motion in enumerate(self.motions):
            fig.add_trace(go.Scatter(
                x=motion.spectrum.periods,
                y=motion.spectrum.spectrum * self.factors[i],
                mode='lines',
                name=motion.name,
                text=motion.name
            ))
        spectrum1 = self.target_spectrum
        spectrum2 = self.get_average_spectrum()
        fig.add_trace(go.Scatter(
            x=spectrum1.periods,
            y=spectrum1.spectrum,
            mode='lines',
            name='Target',
            text='Target',
            line=dict(width=5),
        ))
        fig.add_trace(go.Scatter(
            x=spectrum2.periods,
            y=spectrum2.spectrum,
            mode='lines',
            name='Average',
            text='Average',
            line=dict(width=5),
        ))
        if save:
            fig.write_html(f'{folder}/{filename}')
        else:
            fig.show()
    elif engine == 'pyplot':
        folder = self.folder if folder is None else folder
        filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png'
        plt.figure(figsize=(7, 5))
        for i, motion in enumerate(self.motions):
            plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver')
        plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3)
        averageSpectrum = self.get_average_spectrum()
        plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2)
        plt.legend(loc='best', fontsize='x-small')
        plt.title(f'SRSS={self.get_srss():.4f}')
        if save:
            plt.savefig(os.path.join(folder, filename), dpi=200)
        else:
            plt.show()
    else:
        print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.")
def plot_individual(self, save=True)

Plot all individual ground motions and the spectrum.

Parameters

Refer to Suite.plot_all_accels().

Outputs

If save, a figure for each motion will be generated.

Expand source code
def plot_individual(self, save=True):
    '''Plot all individual ground motions and the spectrum.

    Parameters
    ----------
    Refer to `Suite.plot_all_accels`.  

    Outputs
    -------
    If save, a figure for each motion will be generated.  
    '''
    for i, mo in enumerate(self.motions):
        _, ax = plt.subplots(2, 1, figsize=(7, 5))
        ax[0].plot(np.arange(len(mo.accel)) * mo.dt,
                    mo.accel * self.factors[i])
        ax[1].plot(mo.spectrum.periods,
                    mo.spectrum.spectrum * self.factors[i])
        ax[1].plot(self.target_spectrum.periods,
                    self.target_spectrum.spectrum, linewidth=2)
        plt.suptitle(
            f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}')
        filename = f'{self.folder}/motion{i}.png'
        if save:
            plt.savefig(filename, dpi=200)
        else:
            plt.show()
def plot_interactive(self, save=True)

Plot the spectrum and the acceleration time-histories to an interactive html file.

Parameters

save : boolean
If True, write to html file. Otherwise, show directly without saving.

Outputs

A html file with interactive plotting sliders.

Expand source code
def plot_interactive(self, save=True):
    '''Plot the spectrum and the acceleration time-histories to an interactive html file.
    
    Parameters
    ----------
    `save` : boolean  
        If True, write to html file. Otherwise, show directly without saving.  
    
    Outputs
    -------
    A html file with interactive plotting sliders.
    '''
    fig = make_subplots(rows=1, cols=2)
    for i in range(len(self.motions)):
        fig.add_trace(go.Scatter(
            x=self.motions[i].spectrum.periods,
            y=self.motions[i].spectrum.spectrum * self.factors[i],
            name=self.motions[i].name,
        ), row=1, col=1)
        fig.add_trace(go.Scatter(
            x=self.motions[i].get_times(),
            y=self.motions[i].accel * self.factors[i],
                name=self.motions[i].name,
        ), row=1, col=2)
    spectrum_average = self.get_average_spectrum()
    fig.add_trace(go.Scatter(
        x=spectrum_average.periods,
        y=spectrum_average.spectrum,
        name='average',
        line={'width': 5}
    ))
    fig.add_trace(go.Scatter(
        x=self.target_spectrum.periods,
        y=self.target_spectrum.spectrum,
        name='target',
        line={'width': 5}
    ))

    def _title(i):
        return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}'
    
    steps = []
    for i in range(len(self.motions)):
        step = dict(
            method='update',
            args=[
                {'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]},
                {'title': _title(i)}
            ],
            label=str(i),
        )
        steps.append(step)
    sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})]
    updatemenus = [dict(
        buttons=[dict(
            label='All',
            method='update',
            args=[
                {'visible': [True] * (len(self.motions)*2+2)},
                {'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'}
            ]
        )],
        type='buttons',
        x=1,
        y=0,
        xanchor='right',
        yanchor='top',
        pad={'t': 30},
    )]
    fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus)
    if save:
        fig.write_html(f'{self.folder}/{self.name}.html')
    else:
        fig.show()
def reset_spectrum_matrix(self)

Reset the spectrum matrix, which produces the average spectrum in the boundary by multiplying factors vector. This method should be called everytime Suite.motions change.

Outputs

Suite.spectrum_matrix and Suite.target_vector will be changed.

Expand source code
def reset_spectrum_matrix(self):
    '''Reset the spectrum matrix, which produces the average spectrum 
    in the boundary by multiplying factors vector.
    This method should be called everytime `Suite.motions` change.

    Outputs
    -------
    `Suite.spectrum_matrix` and `Suite.target_vector` will be changed.  
    '''
    mat = np.zeros((len(self.period_samples), len(self.motions)))
    for i in range(len(self.motions)):
        spectrum = self.motions[i].spectrum.get(self.period_samples)
        mat[:, i] = spectrum / len(self.motions)
    self.spectrum_matrix = mat
    self.target_vector = self.target_spectrum.get(self.period_samples)
def save(self, separate_motion=True, rewrite=False)

Save the suite to a .suite file.

Parameters

separate_motion : boolean
If True, the motion details are not included into the file.
User has to make sure the motions are not moved.

rewrite : boolean
If True, all the '.motion' files will be rewritten.
otherwise, the motion files will be ignored.

Outputs

Create a .suite file to store the suite.

Expand source code
def save(self, separate_motion=True, rewrite=False):
    '''Save the suite to a .suite file.  

    Parameters
    ----------  
    `separate_motion` : boolean  
        If True, the motion details are not included into the file.  
        User has to make sure the motions are not moved.

    `rewrite` : boolean   
        If True, all the '.motion' files will be rewritten.  
        otherwise, the motion files will be ignored.

    Outputs
    -------
    Create a .suite file to store the suite.
    '''
    with open(f'{self.folder}/{self.name}.suite', 'w') as f:
        f.write(self.to_json(separate_motion=separate_motion))
    if separate_motion:
        for mo in self.motions:
            if not os.path.exists(f'{mo.folder}/{mo.name}.motion'):
                mo.save()
                print(f'Motion saved to {mo.folder}/{mo.name}.motion')
            else:
                if rewrite:
                    mo.save()
                    print(f'Motion {mo.folder}/{mo.name}.motion is written.')
    print(f'Saved to {self.folder}/{self.name}.suite')
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True)

Scale the motions both horizontally and vertically.

Parameters

new_name : str

new_folder : str, None

factor_x, factor_y : float
The factors to amplify in x axis and y axis.

scale_target_spectrum : boolean
Whether scale the target spectrum as well.

Returns

suite : Suite

Expand source code
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True):
    '''Scale the motions both horizontally and vertically.  

    Parameters
    ----------
    `new_name` : str  

    `new_folder` : str, None  

    `factor_x`, `factor_y` : float  
        The factors to amplify in x axis and y axis.  

    `scale_target_spectrum` : boolean  
        Whether scale the target spectrum as well.

    Returns
    -------
    `suite` : `Suite`
    '''
    suite = self.copy()
    suite.name = new_name
    suite.set_folder(new_folder)
    for i, motion in enumerate(self.motions):
        suite.motions[i] = motion.scale(
            new_name=motion.name, factor_x=factor_x, factor_y=factor_y)
    if scale_target_spectrum:
        suite.target_spectrum = self.target_spectrum.scale(
            factor_x=factor_x, factor_y=factor_y)
    if factor_x != 1:
        suite.period_bound = [p * factor_x for p in self.period_bound]
        suite.period_samples = self.period_samples * factor_x
    if factor_y != 1:
        suite.factors = [f*factor_y for f in self.factors]
    suite.reset_spectrum_matrix()
    return suite
def set_folder(self, folder)

Reset the folder for the suite and the motions.

Parameters

folder : str

Outputs

Suite.folder and Suite.motions will be changed.

Expand source code
def set_folder(self, folder):
    '''Reset the folder for the suite and the motions.

    Parameters
    ----------
    `folder` : str

    Outputs
    -------
    `Suite.folder` and `Suite.motions` will be changed.
    '''
    if folder is not None:
        self.folder = folder
        os.makedirs(folder, exist_ok=True)
        for mo in self.motions:
            mo.folder = folder
def to_dict(self, separate_motion=True)

Convert all the properties to dict.

Parameters

separate_motion : boolean
If True, the motion details are not included into the dict.
User has to make sure these motions are not moved.

Returns

it : dict

Expand source code
def to_dict(self, separate_motion=True):
    '''Convert all the properties to dict.  

    Parameters
    ----------  
    `separate_motion` : boolean  
        If True, the motion details are not included into the dict.  
        User has to make sure these motions are not moved.

    Returns
    -------  
    `it` : dict  
    '''
    return dict(
        folder=self.folder,
        name=self.name,
        motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [
            f'{mo.folder}/{mo.name}' for mo in self.motions],
        factors=self.factors if type(self.factors[0] == float) else [
            fact.item() for fact in self.factors],
        period_bound=self.period_bound,
        target_spectrum=self.target_spectrum.to_dict(),
        period_samples=self.period_samples.tolist(),
        srss=self.get_srss(),
        average_spectrum=self.get_average_spectrum().to_dict(),
    )
def to_json(self, separate_motion=True)

Convert the properties to json.

Parameters

Refer to Suite.to_dict().

Returns

json : str

Expand source code
def to_json(self, separate_motion=True):
    '''Convert the properties to json.  

    Parameters
    ----------
    Refer to `Suite.to_dict`.  

    Returns
    -------
    `json` : str
    '''
    return json.dumps(self.to_dict(separate_motion=separate_motion))
def write_TCL(self, folder=None, filename=None)

Write the suite to TCL forms for OpenSees.
Currently the TCL contains only one dict. The keys of the dict is the motion names. Then the essential properties are stored.
At the same time, the accelerations for each motion is written to a text file.

Parameters

folder : str, None
The folder to write the tcl files. If is None, use Suite.folder.

filename : str, None
The tcl file name. If is None, use Suite.name.

Outputs

A tcl file and a series of acceleration files.

Expand source code
def write_TCL(self, folder=None, filename=None):
    '''Write the suite to TCL forms for OpenSees.  
    Currently the TCL contains only one dict. 
    The keys of the dict is the motion names. Then the essential properties are stored.  
    At the same time, the accelerations for each motion is written to a text file.

    Parameters
    ----------
    `folder` : str, None  
        The folder to write the tcl files.
        If is None, use `Suite.folder`.

    `filename` :  str, None  
        The tcl file name. If is None, use `Suite.name`.

    Outputs
    -------
    A tcl file and a series of acceleration files.
    '''
    folder = self.folder if folder is None else folder
    filename = self.name if filename is None else filename.replace(
        '.tcl', '') + '.tcl'
    os.makedirs(folder, exist_ok=True)
    config = ''
    config += 'dict set motion keys {'
    names = [mo.name for mo in self.motions]
    config += ' '.join(names)
    config += '}\n\n'
    for i, mo in enumerate(self.motions):
        mo.write_accel(f'{folder}/{mo.name}', time=False)
        config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n'
        config += f'dict set motion {mo.name} npts {len(mo.accel)}\n'
        config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n'
        config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n'
    with open(f'{folder}/{filename}.tcl', 'w') as f:
        f.write(config)