Skip to content

Commit

Permalink
[skip CI] Merge branch 'master' of github.com:gem/oq-engine
Browse files Browse the repository at this point in the history
  • Loading branch information
micheles committed Dec 8, 2018
2 parents 2399c31 + 7729fa2 commit aca7593
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 93 deletions.
8 changes: 3 additions & 5 deletions openquake/calculators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,10 @@ def block_splitter(self, sources, weight=get_weight):
:returns: an iterator over blocks of sources
"""
ct = self.oqparam.concurrent_tasks or 1
minweight = source.MINWEIGHT * (math.sqrt(len(self.sitecol))
if self.sitecol else 1)
maxweight = self.csm.get_maxweight(weight, ct, minweight)
maxweight = self.csm.get_maxweight(weight, ct, source.MINWEIGHT)
if not hasattr(self, 'logged'):
if maxweight == minweight:
logging.info('Using minweight=%d', minweight)
if maxweight == source.MINWEIGHT:
logging.info('Using minweight=%d', source.MINWEIGHT)
else:
logging.info('Using maxweight=%d', maxweight)
self.logged = True
Expand Down
4 changes: 2 additions & 2 deletions openquake/calculators/event_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ def weight_src(src):
calc_times += dic['calc_times']
if dic['eff_ruptures']:
eff_ruptures += dic['eff_ruptures']
if dic['eb_ruptures']:
if dic['rup_array']:
with mon:
self.rupser.save(dic['eb_ruptures'])
self.rupser.save(dic['rup_array'])
self.rupser.close()

# logic tree reduction, must be called before storing the events
Expand Down
7 changes: 1 addition & 6 deletions openquake/calculators/getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from openquake.hazardlib.gsim.base import ContextMaker, FarAwayRupture
from openquake.hazardlib import calc, geo, probability_map, stats
from openquake.hazardlib.geo.mesh import Mesh, RectangularMesh
from openquake.hazardlib.source.rupture import BaseRupture, EBRupture, classes
from openquake.hazardlib.source.rupture import EBRupture, classes
from openquake.risklib.riskinput import rsi2str
from openquake.commonlib.calc import _gmvs_to_haz_curve

Expand All @@ -33,8 +33,6 @@
F32 = numpy.float32
U64 = numpy.uint64

BaseRupture.init() # initialize rupture codes


class PmapGetter(object):
"""
Expand Down Expand Up @@ -514,9 +512,6 @@ def __iter__(self):
rupture.hypocenter = geo.Point(*rec['hypo'])
rupture.occurrence_rate = rec['occurrence_rate']
rupture.tectonic_region_type = self.trt
pmfx = rec['pmfx']
if pmfx != -1:
rupture.pmf = dstore['pmfs'][pmfx]
if surface_cls is geo.PlanarSurface:
rupture.surface = geo.PlanarSurface.from_array(
mesh[:, 0, :])
Expand Down
5 changes: 3 additions & 2 deletions openquake/calculators/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

from openquake.hazardlib.calc.gmf import GmfComputer
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.commonlib import readinput, source, calc
from openquake.hazardlib.calc.stochastic import get_rup_array
from openquake.hazardlib.source.rupture import EBRupture, events_dt
from openquake.commonlib import readinput, source, calc
from openquake.calculators import base


Expand Down Expand Up @@ -62,7 +63,7 @@ def pre_execute(self):
events[rlz * E: rlz * E + E]['rlz'] = rlz
self.datastore['events'] = events
rupser = calc.RuptureSerializer(self.datastore)
rupser.save([ebr])
rupser.save(get_rup_array([ebr]))
rupser.close()
self.computer = GmfComputer(
ebr, self.sitecol, oq.imtls, self.cmaker, oq.truncation_level,
Expand Down
4 changes: 3 additions & 1 deletion openquake/calculators/ucerf_event_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ def build_ruptures(sources, src_filter, param, monitor):
tot_occ = sum(n_occ.values())
dic = {'eff_ruptures': {src.src_group_id: src.num_ruptures}}
with filt_mon:
dic['eb_ruptures'] = stochastic.build_eb_ruptures(
eb_ruptures = stochastic.build_eb_ruptures(
src, num_ses, cmaker, sitecol, n_occ.items())
dic['rup_array'] = (stochastic.get_rup_array(eb_ruptures)
if eb_ruptures else ())
dt = time.time() - t0
dic['calc_times'] = {src.id: numpy.array([tot_occ, len(sitecol), dt], F32)}
return dic
Expand Down
81 changes: 14 additions & 67 deletions openquake/commonlib/calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from openquake.baselib import hdf5, general
from openquake.baselib.python3compat import decode
from openquake.hazardlib.source.rupture import BaseRupture
from openquake.hazardlib.geo.mesh import surface_to_array, point3d
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.hazardlib.imt import from_string
from openquake.hazardlib import calc, probability_map
Expand All @@ -40,6 +39,8 @@
U64 = numpy.uint64
F64 = numpy.float64

BaseRupture.init()

# ############## utilities for the classical calculator ############### #


Expand Down Expand Up @@ -354,81 +355,27 @@ class RuptureSerializer(object):
Serialize event based ruptures on an HDF5 files. Populate the datasets
`ruptures` and `sids`.
"""
rupture_dt = numpy.dtype([
('serial', U32), ('srcidx', U16), ('grp_id', U16), ('code', U8),
('n_occ', U16),
('gidx1', U32), ('gidx2', U32),
('pmfx', I32), ('mag', F32), ('rake', F32), ('occurrence_rate', F32),
('hypo', (F32, 3)), ('sy', U16), ('sz', U16)])

pmfs_dt = numpy.dtype([('serial', U32), ('pmf', hdf5.vfloat32)])

@classmethod
def get_array_nbytes(cls, ebruptures, offset):
"""
Convert a list of EBRuptures into a numpy composite array
"""
lst = []
geoms = []
nbytes = 0
for ebrupture in ebruptures:
rup = ebrupture.rupture
mesh = surface_to_array(rup.surface)
sy, sz = mesh.shape[1:]
# sanity checks
assert sy < TWO16, 'Too many multisurfaces: %d' % sy
assert sz < TWO16, 'The rupture mesh spacing is too small'
hypo = rup.hypocenter.x, rup.hypocenter.y, rup.hypocenter.z
rate = getattr(rup, 'occurrence_rate', numpy.nan)
points = mesh.reshape(3, -1).T # shape (n, 3)
n = len(points)
tup = (ebrupture.serial, ebrupture.srcidx, ebrupture.grp_id,
rup.code, ebrupture.n_occ,
offset, offset + n, getattr(ebrupture, 'pmfx', -1),
rup.mag, rup.rake, rate, hypo, sy, sz)
offset += n
lst.append(tup)
geoms.append(numpy.array([tuple(p) for p in points], point3d))
nbytes += cls.rupture_dt.itemsize + mesh.nbytes
geom = numpy.concatenate(geoms)
return numpy.array(lst, cls.rupture_dt), geom, nbytes

def __init__(self, datastore):
self.datastore = datastore
self.nbytes = 0
self.nruptures = 0
datastore.create_dset('ruptures', self.rupture_dt, attrs={'nbytes': 0})
datastore.create_dset('rupgeoms', point3d)
datastore.create_dset('ruptures', calc.stochastic.rupture_dt,
attrs={'nbytes': 0})
datastore.create_dset('rupgeoms', calc.stochastic.point3d)

def save(self, ebruptures):
def save(self, rup_array):
"""
Populate a dictionary of site IDs tuples and save the ruptures.
:param ebruptures: a list of EBRupture objects to save
Store the ruptures in array format.
"""
pmfbytes = 0
self.nruptures += len(ebruptures)
for ebr in ebruptures:
rup = ebr.rupture
if hasattr(rup, 'pmf'):
pmfs = numpy.array([(ebr.serial, rup.pmf)], self.pmfs_dt)
dset = self.datastore.extend('pmfs', pmfs)
ebr.pmfx = len(dset) - 1
pmfbytes += self.pmfs_dt.itemsize + rup.pmf.nbytes

# store the ruptures in a compact format
self.nruptures += len(rup_array)
offset = len(self.datastore['rupgeoms'])
array, geom, nbytes = self.get_array_nbytes(ebruptures, offset)
rup_array.array['gidx1'] += offset
rup_array.array['gidx2'] += offset
previous = self.datastore.get_attr('ruptures', 'nbytes', 0)
dset = self.datastore.extend(
'ruptures', array, nbytes=previous + nbytes)
self.datastore.extend('rupgeoms', geom)
# save nbytes occupied by the PMFs
if pmfbytes:
if 'nbytes' in dset.attrs:
dset.attrs['nbytes'] += pmfbytes
else:
dset.attrs['nbytes'] = pmfbytes
self.datastore.extend(
'ruptures', rup_array, nbytes=previous + rup_array.nbytes)
self.datastore.extend('rupgeoms', rup_array.geom)
# TODO: PMFs for nonparametric ruptures are not stored
self.datastore.flush()

def close(self):
Expand Down
54 changes: 51 additions & 3 deletions openquake/hazardlib/calc/stochastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@
import sys
import time
import numpy
from openquake.baselib import hdf5
from openquake.baselib.general import AccumDict
from openquake.baselib.performance import Monitor
from openquake.baselib.python3compat import raise_
from openquake.hazardlib.source.rupture import EBRupture
from openquake.hazardlib.source.rupture import BaseRupture, EBRupture
from openquake.hazardlib.contexts import ContextMaker, FarAwayRupture
from openquake.hazardlib.geo.mesh import surface_to_array, point3d

TWO16 = 2 ** 16 # 65,536
TWO32 = 2 ** 32 # 4,294,967,296
F64 = numpy.float64
U16 = numpy.uint16
U32 = numpy.uint32
U64 = numpy.uint64
U8 = numpy.uint8
I32 = numpy.int32
F32 = numpy.float32
MAX_RUPTURES = 1000


Expand Down Expand Up @@ -88,6 +93,47 @@ def stochastic_event_set(sources, source_site_filter=source_site_noop_filter):

# ######################## rupture calculator ############################ #

rupture_dt = numpy.dtype([
('serial', U32), ('srcidx', U16), ('grp_id', U16), ('code', U8),
('n_occ', U16), ('gidx1', U32), ('gidx2', U32),
('pmfx', I32), ('mag', F32), ('rake', F32), ('occurrence_rate', F32),
('hypo', (F32, 3)), ('sy', U16), ('sz', U16)])


def get_rup_array(ebruptures):
"""
Convert a list of EBRuptures into a numpy composite array
"""
if not BaseRupture._code:
BaseRupture.init() # initialize rupture codes

lst = []
geoms = []
nbytes = 0
offset = 0
for ebrupture in ebruptures:
rup = ebrupture.rupture
mesh = surface_to_array(rup.surface)
sy, sz = mesh.shape[1:]
# sanity checks
assert sy < TWO16, 'Too many multisurfaces: %d' % sy
assert sz < TWO16, 'The rupture mesh spacing is too small'
hypo = rup.hypocenter.x, rup.hypocenter.y, rup.hypocenter.z
rate = getattr(rup, 'occurrence_rate', numpy.nan)
points = mesh.reshape(3, -1).T # shape (n, 3)
n = len(points)
tup = (ebrupture.serial, ebrupture.srcidx, ebrupture.grp_id,
rup.code, ebrupture.n_occ, offset, offset + n, -1,
rup.mag, rup.rake, rate, hypo, sy, sz)
offset += n
lst.append(tup)
geoms.append(numpy.array([tuple(p) for p in points], point3d))
nbytes += rupture_dt.itemsize + mesh.nbytes
dic = dict(geom=numpy.concatenate(geoms), nbytes=nbytes)
# TODO: PMFs for nonparametric ruptures are not converted
return hdf5.ArrayWrapper(numpy.array(lst, rupture_dt), dic)


def sample_ruptures(sources, param, src_filter=source_site_noop_filter,
monitor=Monitor()):
"""
Expand Down Expand Up @@ -116,7 +162,7 @@ def sample_ruptures(sources, param, src_filter=source_site_noop_filter,
for src, sites in src_filter(sources):
t0 = time.time()
if len(eb_ruptures) > MAX_RUPTURES:
yield AccumDict(eb_ruptures=eb_ruptures,
yield AccumDict(rup_array=get_rup_array(eb_ruptures),
calc_times={},
eff_ruptures={})
eb_ruptures.clear()
Expand All @@ -126,7 +172,9 @@ def sample_ruptures(sources, param, src_filter=source_site_noop_filter,
eff_ruptures += src.num_ruptures
dt = time.time() - t0
calc_times[src.id] += numpy.array([n_occ, src.nsites, dt])
yield AccumDict(eb_ruptures=eb_ruptures, calc_times=calc_times,
yield AccumDict(rup_array=get_rup_array(eb_ruptures)
if eb_ruptures else (),
calc_times=calc_times,
eff_ruptures={grp_id: eff_ruptures})


Expand Down
10 changes: 5 additions & 5 deletions openquake/hazardlib/geo/surface/gridded.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_rx_distance(self, mesh):
:returns:
Numpy array of distances in km.
"""
raise NotImplementedError
raise NotImplementedError('GriddedSurface')

def get_top_edge_depth(self):
"""
Expand All @@ -116,7 +116,7 @@ def get_top_edge_depth(self):
Float value, the vertical distance between the earth surface
and the shallowest point in surface's top edge in km.
"""
raise NotImplementedError
raise NotImplementedError('GriddedSurface')

def get_strike(self):
"""
Expand Down Expand Up @@ -150,7 +150,7 @@ def get_width(self):
:returns:
Float value, the surface width
"""
raise NotImplementedError
raise NotImplementedError('GriddedSurface')

def get_area(self):
"""
Expand All @@ -159,7 +159,7 @@ def get_area(self):
:returns:
Float value, the surface area
"""
raise NotImplementedError
raise NotImplementedError('GriddedSurface')

def get_middle_point(self):
"""
Expand All @@ -185,4 +185,4 @@ def get_ry0_distance(self, mesh):
:param mesh:
:class:`~openquake.hazardlib.geo.mesh.Mesh` of points
"""
raise NotImplementedError
raise NotImplementedError('GriddedSurface')
4 changes: 2 additions & 2 deletions openquake/hazardlib/tests/calc/stochastic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ def test_nankai(self):
param = dict(ses_per_logic_tree_path=10, filter_distance='rjb',
gsims=[SiMidorikawa1999SInter()])
dic = sum(sample_ruptures(group, param, s_filter), {})
self.assertEqual(len(dic['eb_ruptures']), 5)
self.assertEqual(len(dic['rup_array']), 5)
self.assertEqual(len(dic['calc_times']), 15) # mutex sources

# test no filtering 1
ruptures = list(stochastic_event_set(group))
self.assertEqual(len(ruptures), 19)

# test no filtering 2
ruptures = sum(sample_ruptures(group, param), {})['eb_ruptures']
ruptures = sum(sample_ruptures(group, param), {})['rup_array']
self.assertEqual(len(ruptures), 5)

0 comments on commit aca7593

Please sign in to comment.