Analyzing Merger Trees¶
This section describes the preferred method for performing analysis on merger trees that results in modification of the dataset (e.g., by creating or altering Analysis Fields) or writing additional files. The end of this section, Putting it all Together: Parallel Analysis, illustrates an analysis workflow that will run in parallel.
When performing the same analysis on a large number of items, we often think in terms of creating an “analysis pipeline”, where a series of discrete actions, including deciding whether to skip a given item, are embedded within a loop over all the items to analyze. For merger trees, this may look something like the following:
import ytree
a = ytree.load(...)
trees = list(a[:])
for tree in trees:
for node in tree["forest"]:
# only analyze above some minimum mass
if node["mass"] < a.quan(1e11, "Msun"):
continue
# get simulation snapshot associated with this halo
snap_fn = get_filename_from_redshift(node["redshift"])
ds = yt.load(snap_fn)
# get sphere using halo's center and radius
center = node["position"].to("unitary")
radius = node["virial_radius"].to("unitary")
sp = ds.sphere((center, "unitary"), (radius, "unitary"))
# calculate gas mass and save to field
node["gas_mass"] = sp.quantities.total_quantity(("gas", "mass"))
# make a projection and save an image
p = yt.ProjectionPlot(ds, "x", ("gas", "density"),
data_source=sp, center=sp.center,
width=2*sp.width)
p.save("my_analysis/projections/")
There are a few disadvantages of this approach. The inner loop is very long. It can be difficult to understand the full set of actions, especially if you weren’t the one who wrote it. If there is a section you no longer want to do, a whole block of code needs to be commented out or removed, and it may be tricky to tell if doing that will break something else. Putting the operations into functions will make this simpler, but it can still make for a large code block in the inner loop. As well, if the structure of the loops over trees or nodes is more complicated than the above, there is potential for the code to be non-trivial to digest.
The AnalysisPipeline¶
The AnalysisPipeline
allows
you to design the analysis workflow in advance and then use it to
process a tree or node with a single function call. Skipping straight
to the end, the loop from above will take the form:
for tree in trees:
for node in tree["forest"]:
ap.process_target(node)
In the above example, “ap” is some
AnalysisPipeline
object
that we have defined earlier. We will now take a closer look at how to
design a workflow using this method.
Creating an AnalysisPipeline¶
An AnalysisPipeline
is
instantiated with no arguments. Only an optional output directory
inside which new files will be written can be specified with the
output_dir
keyword.
import ytree
ap = ytree.AnalysisPipeline(output_dir="my_analysis")
The output directory will be created automatically if it does not already exist.
Creating Pipeline Operations¶
An analysis pipeline is assembled by creating functions that accept a
single TreeNode
as an
argument.
def say_hello(node):
print (f"This is node {node}! I will now be analyzed.")
This function can now be added to an existing pipeline with the
add_operation
function.
ap.add_operation(say_hello)
Now, when the
process_target
function is called with a
TreeNode
object, the
say_hello
function will be called with that
TreeNode
. Any additional
calls to
add_operation
will result in those functions also being called with that
TreeNode
in the same order.
Adding Extra Function Arguments¶
Functions can take additional arguments and keyword arguments as well.
def print_field_value(node, field, units=None):
val = node[field]
if units is not None:
val.convert_to_units(units)
print (f"Value of {field} for node {node} is {val}.")
The additional arguments and keyword arguments are then provided when
calling
add_operation
.
ap.add_operation(print_field_value, "mass")
ap.add_operation(print_field_value, "virial_radius", units="kpc/h")
Organizing File Output by Operation¶
In the same way that the
AnalysisPipeline
object
accepts an output_dir
keyword, analysis functions can also accept
an output_dir
keyword.
def save_something(node, output_dir=None):
# make an HDF5 file named by the unique node ID
filename = f"node_{node.uid}.h5"
if output_dir is not None:
filename = os.path.join(output_dir, filename)
# do some stuff...
# meanwhile, back in the pipeline...
ap.add_operation(save_something, output_dir="images")
This output_dir
keyword will be intercepted by the
AnalysisPipeline
object to
ensure that the directory gets created if it does not already
exist. Additionally, if an output_dir
keyword was given when the
AnalysisPipeline
was
created, as in the example above, the directory associated with the
function will be appended to that. Following the examples here, the
resulting directory would be “my_analysis/images”, and the code above
will correctly save to that location.
Using a Function as a Filter¶
Making an analysis function return True
or False
allows it to
act as a filter. If a function returns False
, then any
additional operations defined in the pipeline will not be
performed. For example, we might create a mass filter like this:
def minimum_mass(node, value):
return node["mass"] >= value
# later, in the pipeline
ap.add_operation(minimum_mass, a.quan(1e11, "Msun"))
The pipeline will interpret any return value from an operation that is
not None
in a boolean context to use as a filter.
Adding Operations that Always Run¶
As discussed above in Using a Function as a Filter, returning False
from an operation will prevent all further operations in the pipeline
from being performed on that node. However, there may be operations
that you want to always run, regardless of previous filters. For
example, there may be clean up operations, like freeing up memory,
that should run for every node, no matter what. To accomplish this,
the always_do
keyword can be set to True
in the call to
add_operation
.
def delete_attributes(node, attributes):
for attr in attributes:
if hasattr(node, attr):
delattr(node, attr)
# later, in the pipeline
ap.add_operation(delete_attributes, ["ds", "sphere"], always_do=True)
Modifying a Node¶
There may be occasions where you want to pass local variables or
objects around from one function to the next. The easiest way to do
this is by attaching them to the
TreeNode
object itself as an
attribute. For example, say we have a function that returns a
simulation snapshot loaded with yt
as a function of redshift. We
might do something like the the following to then pass it to another
function which creates a yt
sphere.
def get_yt_dataset(node):
# assume you have something like this
filename = get_filename_from_redshift(node["redshift"])
# attach it to the node for later use
node.ds = yt.load(filename)
def get_yt_sphere(node):
# this works if get_yt_dataset has been called first
ds = node.ds
center = node["position"].to("unitary")
radius = node["virial_radius"].to("unitary")
node.sphere = ds.sphere((center, "unitary"), (radius, "unitary"))
Then, we can add these to the pipeline such that a later function can use the sphere.
ap.add_operation(get_yt_dataset)
ap.add_operation(get_yt_sphere)
To clean things up, we can make a function to remove attributes and add it to the end of the pipeline.
def delete_attributes(node, attributes):
for attr in attributes:
if hasattr(node, attr):
delattr(node, attr)
# later, in the pipeline
ap.add_operation(delete_attributes, ["ds", "sphere"], always_do=True)
See Adding Operations that Always Run for a discussion of the always_do
option.
Running the Pipeline¶
Once the pipeline has been defined through calls to
add_operation
,
it is now only a matter of looping over the nodes we want to analyze
and calling
process_target
with them.
for tree in trees:
for node in tree["forest"]:
ap.process_target(node)
Depending on what you want to do, you may want to call
process_target
with an entire tree and skip the inner loop. After all, a tree in this
context is just another
TreeNode
object, only one
that has no descendent.
Creating a Analysis Recipe¶
Through the previous examples, we have designed a workflow by defining
functions and adding them to our pipeline in the order we want them to
be called. Has it resulted in fewer lines of code? No. But it has
allowed us to construct a workflow out of a series of reusable parts,
so the creation of future pipelines will certainly involve fewer lines
of code. It is also possible to define a more complex series of
operations as a “recipe” that can be added in one go to the pipeline
using the
add_recipe
function. A recipe should be a function that, minimally, accepts an
AnalysisPipeline
object as
the first argument, but can also accept more. Below, we will define a
recipe for calculating the gas mass for a halo. For our purposes,
assume the functions we created earlier exist here.
def calculate_gas_mass(node):
sphere = node.sphere
node["gas_mass"] = sphere.quantities.total_quantity(("gas", "mass"))
def gas_mass_recipe(pipeline):
pipeline.add_operation(get_yt_dataset)
pipeline.add_operation(get_yt_sphere)
pipeline.add_operation(calculate_gas_mass)
pipeline.add_operation(delete_attributes, ["ds", "sphere"])
Now, our entire analysis pipeline design can look like this.
ap = ytree.AnalysisPipeline()
ap.add_recipe(gas_mass_recipe)
See the
add_recipe
docstring for an example of including additional function arguments.
Putting it all Together: Parallel Analysis¶
To unleash the true power of the
AnalysisPipeline
, run it in
parallel using one of the Parallel Iterators. See
Parallel Computing with ytree for more information on using ytree
on
multiple processors.
import ytree
a = ytree.load("arbor/arbor.h5")
if "test_field" not in a.field_list:
a.add_analysis_field("gas_mass", default=-1, units="Msun")
ap = ytree.AnalysisPipeline()
ap.add_recipe(gas_mass_recipe)
trees = list(a[:])
for node in ytree.parallel_nodes(trees):
ap.process_target(node)
If you need some inspiration, have a look at some Example Applications.