Skip to content

Commit

Permalink
Merge pull request #82 from VOD555/new_timer
Browse files Browse the repository at this point in the history
add new timers for "map" process and time for blocks to start working
  • Loading branch information
orbeckst authored May 15, 2019
2 parents aa1fd21 + 6e046da commit 9da8872
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@ The rules for this file:
* release numbers follow "Semantic Versioning" http://semver.org

------------------------------------------------------------------------------
**/**/** VOD555

* 0.2.*

Enhancements
* add timer for the time to start the workers

11/02/18 VOD555, richardjgowers, mimischi, iparask, orbeckst, kain88-de

* 0.2.0

Enhancements
* add add timing for _conclude and _prepare (Issue #49)
* add timing for _conclude and _prepare (Issue #49)
* add parallel particle-particle RDF calculation module pmda.rdf (Issue #41)
* add readonly_attributes context manager to ParallelAnalysisBase
* add parallel implementation of Leaflet Finder (Issue #47)
Expand Down
37 changes: 26 additions & 11 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from contextlib import contextmanager
import warnings

import time
from six.moves import range

import MDAnalysis as mda
from dask.delayed import delayed
import dask
Expand All @@ -35,14 +35,16 @@ class Timing(object):
store various timeing results of obtained during a parallel analysis run
"""

def __init__(self, io, compute, total, universe, prepare, conclude):
def __init__(self, io, compute, total, universe, prepare,
conclude, wait=None):
self._io = io
self._compute = compute
self._total = total
self._cumulate = np.sum(io) + np.sum(compute)
self._universe = universe
self._prepare = prepare
self._conclude = conclude
self._wait = wait

@property
def io(self):
Expand Down Expand Up @@ -83,6 +85,11 @@ def conclude(self):
"""time to conclude"""
return self._conclude

@property
def wait(self):
"""time for blocks to start working"""
return self._wait


class ParallelAnalysisBase(object):
"""Base class for defining parallel multi frame analysis
Expand Down Expand Up @@ -349,30 +356,38 @@ def run(self,
with self.readonly_attributes():
for bslice in slices:
task = delayed(
self._dask_helper, pure=False)(
bslice,
self._indices,
self._top,
self._traj, )
self._dask_helper, pure=False)(
bslice,
self._indices,
self._top,
self._traj, )
blocks.append(task)
blocks = delayed(blocks)

# record the time when scheduler starts working
wait_start = time.time()
res = blocks.compute(**scheduler_kwargs)
# hack to handle n_frames == 0 in this framework
if len(res) == 0:
# everything else wants list of block tuples
res = [([], [], [], 0)]
self._results = np.asarray([el[0] for el in res])
res = [([], [], [], 0, 0)]
with timeit() as conclude:
self._results = np.asarray([el[0] for el in res])
self._conclude()

self.timing = Timing(
np.hstack([el[1] for el in res]),
np.hstack([el[2] for el in res]), total.elapsed,
np.array([el[3] for el in res]), time_prepare, conclude.elapsed)
np.array([el[3] for el in res]), time_prepare,
conclude.elapsed,
# waiting time = wait_end - wait_start
np.array([el[4]-wait_start for el in res]))
return self

def _dask_helper(self, bslice, indices, top, traj):
"""helper function to actually setup dask graph"""
# wait_end needs to be first line for accurate timing
wait_end = time.time()
with timeit() as b_universe:
u = mda.Universe(top, traj)
agroups = [u.atoms[idx] for idx in indices]
Expand All @@ -393,7 +408,7 @@ def _dask_helper(self, bslice, indices, top, traj):
times_compute.append(b_compute.elapsed)

return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed
times_compute), b_universe.elapsed, wait_end

@staticmethod
def _reduce(res, result_single_frame):
Expand Down
5 changes: 4 additions & 1 deletion pmda/test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def test_timeing():
universe = np.arange(2)
prepare = 3
conclude = 6
wait = 12

timing = parallel.Timing(io, compute, total, universe, prepare, conclude)
timing = parallel.Timing(io, compute, total,
universe, prepare, conclude, wait)

np.testing.assert_equal(timing.io, io)
np.testing.assert_equal(timing.compute, compute)
Expand All @@ -36,6 +38,7 @@ def test_timeing():
np.testing.assert_equal(timing.cumulate_time, np.sum(io) + np.sum(compute))
np.testing.assert_equal(timing.prepare, prepare)
np.testing.assert_equal(timing.conclude, conclude)
np.testing.assert_equal(timing.wait, wait)


class NoneAnalysis(parallel.ParallelAnalysisBase):
Expand Down

0 comments on commit 9da8872

Please sign in to comment.