Parallel Computing with ytree¶
At present, ytree
itself is not parallelized, although
parallelizing with Dask is on the development
roadmap for ytree 3.1. However, parallel merger tree analysis can be
accomplished using the parallel computing capabilities of yt.
Note
Before reading this section, consult the
Parallel Computation With yt section of the yt
documentation to
learn how to configure yt
for running in parallel.
ytree
can be run in parallel by making use of the
parallel_objects
function in yt
. This functionality is built on MPI, so it
can be used to parallelize analysis across multiple nodes of a
distributed computing system. This function powers the two primary
strategies for parallelizing merger tree analysis:
Parallelizing over Trees and Parallelizing over Halos. These two can also be
combined for Multi-level Parallelism. The most efficient strategy will
depend on the nature of your analysis.
In all cases, scripts must be run with mpirun
to work in
parallel. For example, to run on 4 processors, do:
$ mpirun -np 4 python my_analysis.py
where “my_analysis.py” is the name of the script.
Parallelizing over Trees¶
In this strategy, parallelism is achieved by distributing the list of trees to be analyzed over the available processors. Each processor will work on a single tree in serial. Results for all trees will be collected at the end and saved by the root process (i.e., the process with rank 0). In this example, the “analysis” performed will be facilitated through an Analysis Field, called “test_field”. However, the analysis can be anything your heart desires.
All parallel computation in yt
begins by importing yt
and
calling the
enable_parallelism
function.
import yt
yt.enable_parallelism()
import ytree
We will then load some data and create an analysis field.
a = ytree.load("arbor/arbor.h5")
if "test_field" not in a.field_list:
a.add_analysis_field("test_field", default=-1, units="Msun")
The serial version of our analysis contains two loops, one over all trees and another over all halos in each tree. It looks likes the following:
for my_tree in a[:]:
yt.mylog.info(f"Analyzing tree: {my_tree}.")
for my_halo in my_tree["forest"]:
my_halo["test_field"] = 2 * my_halo["mass"] # this is our analysis
To parallelize this loop over all trees, we insert a call to
parallel_objects
.
arbor_storage = {}
for tree_store, my_tree in yt.parallel_objects(a[:], storage=arbor_storage):
yt.mylog.info(f"Analyzing tree: {my_tree}.")
for my_halo in my_tree["forest"]:
my_halo["test_field"] = 2 * my_halo["mass"] # this is our analysis
tree_store.result = my_tree.field_data["test_field"]
As we will see below, the arbor_storage
dictionary created at the
top will be used after the loop to combine the results on the root
processor. For each iteration of the loop, we are given a local
storage object, called tree_store
. All results we want to return
to the root process are assigned to tree_store.result
. In the
above example, the field_data
attribute associated with
my_tree
is a dictionary containing recently queried field data,
including our new “test_field”. We will assign the entire array of
“test_field” values to the result storage. Using yt.mylog.info
to
print will show us which processor is doing what.
Now, we combine the results on the root process and save the new field.
if yt.is_root():
my_trees = []
for i, my_tree in enumerate(a[:]):
my_tree.field_data["test_field"] = arbor_storage[i]
my_trees.append(my_tree)
a.save_arbor(trees=my_trees)
The is_root
function can be used to figure out the
root process of a group. By default, entries in the arbor_storage
dictionary are stored by the index of the loop, so for example, entry
0
will correspond to the first iteration of the original parallel
loop.
Parallelizing over Halos¶
In this strategy, multiple processors work together on a single tree
by splitting up the halos in that tree. This time, we leave the outer
loop over all trees in serial and add
parallel_objects
to the inner loop.
my_trees = []
for my_tree in a[:]:
if yt.is_root():
yt.mylog.info(f"Analyzing tree: {my_tree}.")
tree_storage = {}
for halo_store, my_halo in yt.parallel_objects(
my_tree["forest"], storage=tree_storage):
halo_store.result_id = my_halo.tree_id
halo_store.result = 2 * my_halo["mass"] # this is our analysis
Just as before, we create a dictionary, called tree_storage
, that
will be used to combine the results at the end of the loop. We use the
local results storage, here called halo_store
, to store both the
result that we want to keep and an id using
halo_store.result_id
. We set the result id explicitly to help
re-assemble the results in the correct order. For example, this will
ensure correct collection of results when getting nodes by “tree” or
“prog” as well as “forest”. Now, we collect the results for the tree.
my_trees = []
# this is the outer loop from above
for my_tree in a[:]:
if yt.is_root():
yt.mylog.info(f"Analyzing tree: {my_tree}.")
### code block from above ###
if yt.is_root():
for tree_id, result in tree_storage.items():
my_halo = my_tree.get_node("forest", tree_id)
my_halo["test_field"] = result
my_trees.append(my_tree)
# save the trees
if yt.is_root():
a.save_arbor(trees=my_trees)
Note, the above code is inside the outer loop over all trees shown above. Note as well, to ensure that the tree has all of the new values for the “test_field”, we only need to loop over all the relevant halos and assign the field value to them. The rest happens under the hood.
Multi-level Parallelism¶
With some care, nested loops with calls to
parallel_objects
can be created to parallelize over both trees and halos within a
tree. By default,
parallel_objects
will split the work evenly among all processors, assigning one loop
iteration to a single processor. However, the njobs
keyword allows
us to set explicitly the number of process groups over which to divide
up work. In the example below, we restrict the outer loop to two
process groups by setting njobs=2
. For example, if we are running
with four processors, each iteration of the outer loop will be
assigned to two processors working together as a group.
arbor_storage = {}
for tree_store, my_tree in yt.parallel_objects(
a[:], storage=arbor_storage, njobs=2):
if yt.is_root():
yt.mylog.info(f"Analyzing tree: {my_tree}.")
tree_storage = {}
for halo_store, my_halo in yt.parallel_objects(
my_tree["forest"], storage=tree_storage):
halo_store.result_id = my_halo.tree_id
halo_store.result = 2 * my_halo["mass"] # this is our analysis
# combine results for this tree
if yt.is_root():
for tree_id, result in tree_storage.items():
my_halo = my_tree.get_node("forest", tree_id)
my_halo["test_field"] = result
tree_store.result = my_tree.field_data["test_field"]
else:
tree_store.result_id = None
# combine results for all trees
if yt.is_root():
my_trees = []
for i, my_tree in enumerate(a[:]):
my_tree.field_data["test_field"] = arbor_storage[i]
my_trees.append(my_tree)
a.save_arbor(trees=my_trees)
Note, that we use yt.is_root()
inside the outer loop to combine
results from the inner loop. This is allowed because
is_root
will return True for the root of a process
group, not just the global root process. Within the outer loop, the
root is the first process of each of the two groups of two
processes. Add some calls to yt.mylog.info
to prove this to
yourself.
The code above looks mostly like a combination of the previous two
examples, but with a few notable differences. First, the addition of
the njobs
keyword in the outer loop. Second, when combining the
results of the inner loop over all halos, if we are NOT the root
process, we set tree_store.result_id
to None. Without this, the
results from the non-root processes (that we are not actually
collecting) will clobber those from the root processes and nothing
will be saved.
Saving Intermediate Results¶
Often the analysis is computationally expensive enough to want to save results as they come instead of waiting for all halos to be analyzed. This can be useful if results require a lot of memory or the code takes a long time to run and you would like to restart from a partially completed state. In the example below, analysis is performed on blocks of eight trees at a time. Each block is done in parallel, the results are saved, and analysis resumes.
a = ytree.load("arbor/arbor.h5")
if "test_field" not in a.field_list:
a.add_analysis_field("test_field", default=-1, units="Msun")
block_size = 8
my_trees = list(a[:])
n_blocks = int(np.ceil(len(my_trees) / block_size))
for ib in range(n_blocks):
start = ib * block_size
end = min(start + block_size, len(my_trees))
tree_storage = {}
for tree_store, itree in yt.parallel_objects(
range(start, end), storage=tree_storage, dynamic=False):
my_tree = my_trees[itree]
for my_halo in my_tree["tree"]:
my_halo["test_field"] = 2 * my_halo["mass"]
tree_store.result_id = itree
tree_store.result = my_tree.field_data["test_field"]
if yt.is_root():
# re-assemble results on root processor
for itree, results in sorted(tree_storage.items()):
my_tree = my_trees[itree]
my_tree.field_data["test_field"] = results
a.save_arbor(trees=my_trees)
# now reload it and restore the list of trees
a = ytree.load(a.filename)
my_trees = list(a[:])
There are some notable differences between this example and those
above. First, we explicitly create a list of trees with my_trees =
list(a[:])
so we can restore it after saving and reloading. Second,
we loop over range(start, end)
instead of over trees so we can
loop over a block of trees at a time.
Like with most things, more is possible than what is shown here and there are other ways to do what is demonstrated. Parallel computing can be very satisfying. Enjoy!