Source code for ytree.utilities.parallel

"""
parallel utilities



"""

#-----------------------------------------------------------------------------
# Copyright (c) ytree development team. All rights reserved.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#-----------------------------------------------------------------------------

import numpy as np

from yt.funcs import is_root
from yt.utilities.parallel_tools.parallel_analysis_interface import \
    _get_comm, \
    parallel_objects

from ytree.data_structures.load import load as ytree_load

def regenerate_node(arbor, node, new_index=None):
    """
    Regenerate the TreeNode using the provided arbor.

    This is to be used when the original arbor associated with the
    TreeNode no longer exists.

    If new_index is None, assume the arbor has the same structure
    as it did before. If new_index is not None, assume the node
    is now a root.
    """

    if new_index is None:
        root_node = node.find_root()
        new_node = root_node.get_node("forest", node.tree_id)
    else:
        root_node = arbor[new_index]
        new_node = root_node

    return new_node

def _get_analysis_fields(arbor):
    fi = arbor.field_info
    return [field for field in fi
            if fi[field].get("type") in ("analysis", "analysis_saved")]

[docs]def parallel_trees(trees, save_every=None, filename=None, njobs=0, dynamic=False): """ Iterate over a list of trees in parallel. Trees are divided up between the available processor groups. Analysis field values can then be assigned to halos within the tree. The trees will be saved either at the end of the loop or after a number of trees given by the ``save_every`` keyword are completed. This uses the yt :func:`~yt.utilities.parallel_tools.parallel_analysis_interface.parallel_objects` function, which is parallelized with MPI underneath and so is suitable for parallelism across compute nodes. Parameters ---------- trees : list of :class:`~ytree.data_structures.tree_node.TreeNode` objects The trees to be iterated over in parallel. save_every : optional, int or False Number of trees to be completed before results are saved. This is used to save intermediate results in case scripts need to be restarted. If None, save will only occur after iterating over all trees. If False, no saving will be done. Default: None filename : optional, string The name of the new arbor to be saved. If None, the naming convention will follow the filename keyword of the :func:`~ytree.data_structures.arbor.Arbor.save_arbor` function. Default: None njobs : optional, int The number of process groups for parallel iteration. Set to 0 to make the same number of process groups as available processors. Hence, each tree will be allocated to a single processor. Set to a number less than the total number of processors to create groups with multiple processors, which will allow for further parallelization within a tree. For example, running with 8 processors and setting njobs to 4 will result in 4 groups of 2 processors each. Default: 0 dynamic : optional, bool Set to False to divide iterations evenly among process groups. Set to True to allocate iterations with a task queue. If True, the number of processors available will be one fewer than the total as one will act as the task queue server. Default: False Examples -------- >>> import ytree >>> a = ytree.load("arbor/arbor.h5") >>> a.add_analysis_field("test_field", default=-1, units="Msun") >>> trees = list(a[:]) >>> for tree in ytree.parallel_trees(trees): ... for node in tree["forest"]: ... node["test_field"] = 2 * node["mass"] # some analysis See Also -------- parallel_tree_nodes, parallel_nodes """ arbor = trees[0].arbor afields = _get_analysis_fields(arbor) nt = len(trees) save = True if save_every is None: save_every = nt elif save_every is False: save_every = nt save = False nb = int(np.ceil(nt / save_every)) for ib in range(nb): start = ib * save_every end = min(start + save_every, nt) arbor_storage = {} for tree_store, itree in parallel_objects( range(start, end), storage=arbor_storage, njobs=njobs, dynamic=dynamic): my_tree = trees[itree] yield my_tree if is_root(): my_root = my_tree.find_root() tree_store.result_id = (my_root._arbor_index, my_tree.tree_id) # If the tree is not a root, only save the "tree" selection # as we could overwrite other trees in the forest. if my_tree.is_root: selection = "forest" else: selection = "tree" tree_store.result = {field: my_tree[selection, field] for field in afields} else: tree_store.result_id = None # combine results for all trees if is_root(): for itree in range(start, end): my_tree = trees[itree] my_root = my_tree.find_root() key = (my_root._arbor_index, my_tree.tree_id) data = arbor_storage[key] if my_tree.is_root: indices = slice(None) else: indices = [my_tree._tree_field_indices] for field in afields: if field not in my_root.field_data: arbor._node_io._initialize_analysis_field(my_root, field) my_root.field_data[field][indices] = data[field] if save: fn = arbor.save_arbor(filename=filename, trees=trees) arbor = ytree_load(fn) trees = [regenerate_node(arbor, tree, new_index=i) for i, tree in enumerate(trees)]
[docs]def parallel_tree_nodes(tree, group="forest", njobs=0, dynamic=False): """ Iterate over nodes in a single tree in parallel. Nodes are divided up between the available processor groups. Analysis field values can then be assigned to each node (halo). Note, unlike the parallel_trees and parallel_nodes function, no saving is performed internally. Results saving with the :func:`~ytree.data_structures.arbor.Arbor.save_arbor` must be done manually. This uses the yt :func:`~yt.utilities.parallel_tools.parallel_analysis_interface.parallel_objects` function, which is parallelized with MPI underneath and so is suitable for parallelism across compute nodes. Parameters ---------- tree : :class:`~ytree.data_structures.tree_node.TreeNode` The tree whose nodes will be iterated over. group : optional, str ("forest", "tree", or "prog") Determines the nodes to be iterated over in the tree: "forest" for all nodes in the forest, "tree" for all nodes in the tree, or "prog" for all nodes in the line of main progenitors. Default: "forest" njobs : optional, int The number of process groups for parallel iteration. Set to 0 to make the same number of process groups as available processors. Hence, each node will be allocated to a single processor. Set to a number less than the total number of processors to create groups with multiple processors, which will allow for further parallelization. For example, running with 8 processors and setting njobs to 4 will result in 4 groups of 2 processors each. Default: 0 dynamic : optional, bool Set to False to divide iterations evenly among process groups. Set to True to allocate iterations with a task queue. If True, the number of processors available will be one fewer than the total as one will act as the task queue server. Default: False Examples -------- >>> import ytree >>> a = ytree.load("arbor/arbor.h5") >>> a.add_analysis_field("test_field", default=-1, units="Msun") >>> trees = list(a[:]) >>> for tree in trees: ... for node in ytree.parallel_tree_nodes(tree): ... node["test_field"] = 2 * node["mass"] # some analysis See Also -------- parallel_trees, parallel_nodes """ afields = _get_analysis_fields(tree.arbor) my_halos = list(tree[group]) tree_storage = {} for halo_store, ihalo in parallel_objects( range(len(my_halos)), storage=tree_storage, njobs=njobs, dynamic=dynamic): my_halo = my_halos[ihalo] yield my_halo if is_root(): halo_store.result_id = my_halo.tree_id halo_store.result = {field: my_halo[field] for field in afields} else: halo_store.result_id = -1 # combine results for this tree if is_root(): for tree_id, result in sorted(tree_storage.items()): if tree_id == -1: continue my_halo = tree.get_node("forest", tree_id) for field, value in result.items(): my_halo[field] = value
[docs]def parallel_nodes(trees, group="forest", save_every=None, filename=None, njobs=None, dynamic=None): """ Iterate over all nodes in a list of trees in parallel. Both trees and/or nodes within a tree are divided up between available process groups using multi-level parallelism. Analysis field values can then be assigned to all nodes (halos). Trees will be saved either at the end of the loop over all trees or after a number of trees given by the ``save_every`` keyword are completed. This uses the yt :func:`~yt.utilities.parallel_tools.parallel_analysis_interface.parallel_objects` function, which is parallelized with MPI underneath and so is suitable for parallelism across compute nodes. Parameters ---------- trees : list of :class:`~ytree.data_structures.tree_node.TreeNode` objects The trees to be iterated over in parallel. group : optional, str ("forest", "tree", or "prog") Determines the nodes to be iterated over in the tree: "forest" for all nodes in the forest, "tree" for all nodes in the tree, or "prog" for all nodes in the line of main progenitors. Default: "forest" save_every : optional, int or False Number of trees to be completed before results are saved. This is used to save intermediate results in case scripts need to be restarted. If None, save will only occur after iterating over all trees. If False, no saving will be done. Default: None filename : optional, string The name of the new arbor to be saved. If None, the naming convention will follow the filename keyword of the :func:`~ytree.data_structures.arbor.Arbor.save_arbor` function. Default: None njobs : optional, tuple of ints The number of process groups for parallel iteration over trees and nodes within each tree. The first value sets behavior for iteration over trees and the second for iteration over nodes in a tree. For example, set to (1, 0) to parallelize only over nodes in a tree and (0, 1) to parallelize only over trees. For multi-level parallelism set the first value to a number less than the total number of processors and the second to 0. For example, if running with 8 processors, set njobs to (2, 0) to iterate over each tree with a group of 4 processors. Within each tree, each of the 4 processors in the group will work on a single node. If set to None, njobs will be set to (0, 1) if there are most trees than processors (tree parallel) and (1, 0) otherwise (node parallel). Default: None dynamic : optional, tuples of bools Toggles task queue on/off for parallelism over trees (first value) and nodes within a tree (second). Set to a value False to divide iterations evenly among process groups. Set to True to allocate iterations with a task queue. If True, the number of processors available will be one fewer than the total as one will act as the task queue server. Yes, this can be set to (True, True). Try it. Default: (False, False) Examples -------- >>> import ytree >>> a = ytree.load("arbor/arbor.h5") >>> a.add_analysis_field("test_field", default=-1, units="Msun") >>> trees = list(a[:]) >>> for node in ytree.parallel_nodes(trees): ... node["test_field"] = 2 * node["mass"] # some analysis See Also -------- parallel_trees, parallel_tree_nodes """ if njobs is None: comm = _get_comm(()) # parallelize over trees if more trees than cores if len(trees) > comm.size: njobs = (0, 1) else: njobs = (1, 0) else: if not isinstance(njobs, (tuple, list)) or len(njobs) != 2: raise ValueError(f"njobs must be a tuple of length 2: {njobs}.") if dynamic is None: dynamic = (False, False) else: if not isinstance(dynamic, (tuple, list)) or len(dynamic) != 2: raise ValueError(f"dynamic must be a tuple of length 2: {dynamic}.") for tree in parallel_trees( trees, save_every=save_every, filename=filename, njobs=njobs[0], dynamic=dynamic[0]): for node in parallel_tree_nodes( tree, group=group, njobs=njobs[1], dynamic=dynamic[1]): yield node