Source code for ytree.utilities.parallel

"""
parallel utilities



"""

#-----------------------------------------------------------------------------
# Copyright (c) ytree development team. All rights reserved.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#-----------------------------------------------------------------------------

import numpy as np

from yt.funcs import \
    get_pbar, \
    is_root as yt_is_root
from yt.utilities.parallel_tools.parallel_analysis_interface import \
    _get_comm, \
    parallel_objects

from ytree.data_structures.load import load as ytree_load

def regenerate_node(arbor, node, new_index=None):
    """
    Regenerate the TreeNode using the provided arbor.

    This is to be used when the original arbor associated with the
    TreeNode no longer exists.

    If new_index is None, assume the arbor has the same structure
    as it did before. If new_index is not None, assume the node
    is now a root.
    """

    if new_index is None:
        root_node = node.find_root()
        new_node = root_node.get_node("forest", node.tree_id)
    else:
        root_node = arbor[new_index]
        new_node = root_node

    return new_node

[docs] def parallel_trees(trees, collect_results=True, save_every=None, save_in_place=None, save_roots_only=False, filename=None, njobs=0, dynamic=False): """ Iterate over a list of trees in parallel. Trees are divided up between the available processor groups. Analysis field values can then be assigned to halos within the tree. The trees will be saved either at the end of the loop or after a number of trees given by the ``save_every`` keyword are completed. This uses the yt :func:`~yt.utilities.parallel_tools.parallel_analysis_interface.parallel_objects` function, which is parallelized with MPI underneath and so is suitable for parallelism across compute nodes. Parameters ---------- trees : list of :class:`~ytree.data_structures.tree_node.TreeNode` objects The trees to be iterated over in parallel. collect_results : optional, bool If True, then results stored in analysis fields will be collected by the root process. This must be set to True if saving is to be done. If False, results collection is ignored. This will result in a significant speedup. If you have no intention of altering analysis fields or do not need results to be recollected or saved, then this is the best option. Setting this to False will automatically set save_every to False as well. Default: True save_every : optional, int or False Number of trees to be completed before results are saved. This is used to save intermediate results in case scripts need to be restarted. This parameter results in different behavior depending on the value of the collect_results keyword. If save_every is set to: - integer: if collect_trees is True, the number of trees to complete before saving. If collect_trees is False, a ValueError exception will be raised. - False: no saving will be done. Results will still be collected if collect_results is True. - None: if collect_results if True, save will occur after iterating over all trees. If collect_results is False, no saving will be done. Default: None save_in_place : optional, bool or None If True, analysis fields will be saved to the original arbor, even if only a subset of all trees is provided with the trees keyword. If False and only a subset of all trees is provided, a new arbor will be created containing only the trees provided. If set to None, behavior is determined by the type of arbor loaded. If the arbor is a YTreeArbor (i.e., saved with save_arbor), save_in_place will be set to True. If not of this type, it will be set to False. Default: None save_roots_only : optional, bool If True, only field values of each node are saved. If False, field data for the entire tree stemming from that node are saved. Default: False. filename : optional, string The name of the new arbor to be saved. If None, the naming convention will follow the filename keyword of the :func:`~ytree.data_structures.arbor.Arbor.save_arbor` function. Default: None njobs : optional, int The number of process groups for parallel iteration. Set to 0 to make the same number of process groups as available processors. Hence, each tree will be allocated to a single processor. Set to a number less than the total number of processors to create groups with multiple processors, which will allow for further parallelization within a tree. For example, running with 8 processors and setting njobs to 4 will result in 4 groups of 2 processors each. Default: 0 dynamic : optional, bool Set to False to divide iterations evenly among process groups. Set to True to allocate iterations with a task queue. If True, the number of processors available will be one fewer than the total as one will act as the task queue server. Default: False Examples -------- >>> import ytree >>> a = ytree.load("arbor/arbor.h5") >>> a.add_analysis_field("test_field", default=-1, units="Msun") >>> trees = list(a[:]) >>> for tree in ytree.parallel_trees(trees): ... for node in tree["forest"]: ... node["test_field"] = 2 * node["mass"] # some analysis See Also -------- parallel_tree_nodes, parallel_nodes """ comm = _get_comm(()) # This is the root process of the whole operation. # We may split into process groups later, in which case there # will be other root processes and we will uses calls to yt's # is_root to identify them. is_global_root = comm.comm is None or comm.comm.rank == 0 if dynamic: if is_global_root: nt = len(trees) else: nt = None nt = comm.mpi_bcast(nt, root=0) else: nt = len(trees) if nt < 1: return arbor = trees[0].arbor afields = arbor.analysis_field_list if save_in_place is None: from ytree.frontends.ytree.arbor import YTreeArbor save_in_place = isinstance(arbor, YTreeArbor) # are we actually going to save anything? do_save = True if isinstance(save_every, (int, np.integer)): if collect_results is False: raise ValueError( "collect_results must be True if save_every is set to a number.") elif save_every is False: save_every = nt do_save = False elif save_every is None: do_save = collect_results save_every = nt nb = int(np.ceil(nt / save_every)) for ib in range(nb): start = ib * save_every end = min(start + save_every, nt) my_items = range(start, end) arbor_storage = {} for tree_store, my_item in parallel_objects( my_items, storage=arbor_storage, njobs=njobs, dynamic=dynamic): my_tree = trees[my_item] yield my_tree # We use yt_is_root here because we want the root of this # workgroup running this iteration, not the global root. # It is this process's job to round up the results for this # iteration and place them in the storage object. This is the # the slowest part as we will copy field data for all of the # analysis fields for the entire forest or tree. In the future, # it may be worth trying to identify the nodes and analysis fields # which were actually altered and copying just them. if yt_is_root(): if not collect_results: continue # this is fast my_root = my_tree.find_root() tree_store.result_id = (my_root._arbor_index, my_tree.tree_id) # If the tree is not a root, only save the "tree" selection # as we could overwrite other trees in the forest. if save_roots_only: pass elif my_tree.is_root: selection = "forest" else: selection = "tree" if save_roots_only: tree_store.result = {field: my_tree[field] for field in afields} else: # this is the slow part as we are grabbing all analysis # fields for the entire tree and copying them. tree_store.result = {field: my_tree[selection, field] for field in afields} else: tree_store.result_id = None # Use the global root to combine all results. # Both parts of this are fairly slow. if is_global_root and collect_results: my_trees = [] pbar = get_pbar("Combining results", len(my_items)) for i, my_item in enumerate(my_items): my_tree = trees[my_item] my_trees.append(my_tree) my_root = my_tree.find_root() key = (my_root._arbor_index, my_tree.tree_id) data = arbor_storage[key] if save_roots_only: indices = my_tree.tree_id elif my_tree.is_root: indices = slice(None) else: indices = [my_tree._tree_field_indices] for field in afields: if field not in my_root.field_data: arbor._node_io._initialize_analysis_field(my_root, field) my_root.field_data[field][indices] = data[field] pbar.update(i+1) pbar.finish() if do_save: if save_in_place: save_trees = my_trees else: save_trees = trees fn = arbor.save_arbor(filename=filename, trees=save_trees, save_in_place=save_in_place, save_roots_only=save_roots_only) new_arbor = ytree_load(fn) add_fields = set(arbor.derived_field_list).difference( new_arbor.derived_field_list) for field in add_fields: fi = arbor.field_info[field].copy() ftype = fi.pop("type") # skip aliases as they will have been saved as the field if ftype == "alias": continue name = fi.pop("name") function = fi.pop("function") del fi["dependencies"] new_arbor.add_derived_field(name, function, **fi) arbor = new_arbor trees = [regenerate_node(arbor, tree, new_index=i) for i, tree in enumerate(trees)] comm.barrier()
[docs] def parallel_tree_nodes(tree, group="forest", nodes=None, njobs=0, dynamic=False): """ Iterate over nodes in a single tree in parallel. Nodes are divided up between the available processor groups. Analysis field values can then be assigned to each node (halo). Note, unlike the parallel_trees and parallel_nodes function, no saving is performed internally. Results saving with the :func:`~ytree.data_structures.arbor.Arbor.save_arbor` must be done manually. This uses the yt :func:`~yt.utilities.parallel_tools.parallel_analysis_interface.parallel_objects` function, which is parallelized with MPI underneath and so is suitable for parallelism across compute nodes. Parameters ---------- tree : :class:`~ytree.data_structures.tree_node.TreeNode` The tree whose nodes will be iterated over. group : optional, str ("forest", "tree", or "prog") Determines the nodes to be iterated over in the tree: "forest" for all nodes in the forest, "tree" for all nodes in the tree, or "prog" for all nodes in the line of main progenitors. Default: "forest" nodes: optional, list A list of nodes to iterate over instead of using a forest, tree, or prog selection. If provided, this will supersede the value of the "group" keyword. Note, all nodes must be members of the tree given in the "tree" argument. njobs : optional, int The number of process groups for parallel iteration. Set to 0 to make the same number of process groups as available processors. Hence, each node will be allocated to a single processor. Set to a number less than the total number of processors to create groups with multiple processors, which will allow for further parallelization. For example, running with 8 processors and setting njobs to 4 will result in 4 groups of 2 processors each. Default: 0 dynamic : optional, bool Set to False to divide iterations evenly among process groups. Set to True to allocate iterations with a task queue. If True, the number of processors available will be one fewer than the total as one will act as the task queue server. Default: False Examples -------- >>> import ytree >>> a = ytree.load("arbor/arbor.h5") >>> a.add_analysis_field("test_field", default=-1, units="Msun") >>> trees = list(a[:]) >>> for tree in trees: ... for node in ytree.parallel_tree_nodes(tree): ... node["test_field"] = 2 * node["mass"] # some analysis See Also -------- parallel_trees, parallel_nodes """ afields = tree.arbor.analysis_field_list if nodes is None: my_halos = list(tree[group]) else: my_halos = nodes tree_storage = {} for halo_store, ihalo in parallel_objects( range(len(my_halos)), storage=tree_storage, njobs=njobs, dynamic=dynamic): my_halo = my_halos[ihalo] yield my_halo if yt_is_root(): halo_store.result_id = my_halo.tree_id halo_store.result = {field: my_halo[field] for field in afields} else: halo_store.result_id = -1 # combine results for this tree if yt_is_root(): for tree_id, result in sorted(tree_storage.items()): if tree_id == -1: continue my_halo = tree.get_node("forest", tree_id) for field, value in result.items(): my_halo[field] = value
[docs] def parallel_nodes(trees, group="forest", collect_results=True, save_every=None, save_in_place=None, filename=None, njobs=None, dynamic=None): """ Iterate over all nodes in a list of trees in parallel. Both trees and/or nodes within a tree are divided up between available process groups using multi-level parallelism. Analysis field values can then be assigned to all nodes (halos). Trees will be saved either at the end of the loop over all trees or after a number of trees given by the ``save_every`` keyword are completed. This uses the yt :func:`~yt.utilities.parallel_tools.parallel_analysis_interface.parallel_objects` function, which is parallelized with MPI underneath and so is suitable for parallelism across compute nodes. Parameters ---------- trees : list of :class:`~ytree.data_structures.tree_node.TreeNode` objects The trees to be iterated over in parallel. group : optional, str ("forest", "tree", or "prog") Determines the nodes to be iterated over in the tree: "forest" for all nodes in the forest, "tree" for all nodes in the tree, or "prog" for all nodes in the line of main progenitors. Default: "forest" collect_results : optional, bool If True, then results stored in analysis fields will be collected by the root process. This must be set to True if saving is to be done. If False, results collection is ignored. This will result in a significant speedup. If you have no intention of altering analysis fields or do not need results to be recollected or saved, then this is the best option. Setting this to False will automatically set save_every to False as well. Default: True save_every : optional, int or False Number of trees to be completed before results are saved. This is used to save intermediate results in case scripts need to be restarted. This parameter results in different behavior depending on the value of the collect_results keyword. If save_every is set to: - integer: if collect_trees is True, the number of trees to complete before saving. If collect_trees is False, a ValueError exception will be raised. - False: no saving will be done. Results will still be collected if collect_results is True. - None: if collect_results if True, save will occur after iterating over all trees. If collect_results is False, no saving will be done. Default: None save_in_place : optional, bool or None If True, analysis fields will be saved to the original arbor, even if only a subset of all trees is provided with the trees keyword. This will essentially "update" the arbor in place. If False and only a subset of all trees is provided, a new arbor will be created containing only the trees provided. If set to None, behavior is determined by the type of arbor loaded. If the arbor is a YTreeArbor (i.e., saved with save_arbor), save_in_place will be set to True. If not of this type, it will be set to False. Default: None filename : optional, string The name of the new arbor to be saved. If None, the naming convention will follow the filename keyword of the :func:`~ytree.data_structures.arbor.Arbor.save_arbor` function. Default: None njobs : optional, tuple of ints The number of process groups for parallel iteration over trees and nodes within each tree. The first value sets behavior for iteration over trees and the second for iteration over nodes in a tree. For example, set to (1, 0) to parallelize only over nodes in a tree and (0, 1) to parallelize only over trees. For multi-level parallelism set the first value to a number less than the total number of processors and the second to 0. For example, if running with 8 processors, set njobs to (2, 0) to iterate over each tree with a group of 4 processors. Within each tree, each of the 4 processors in the group will work on a single node. If set to None, njobs will be set to (0, 1) if there are most trees than processors (tree parallel) and (1, 0) otherwise (node parallel). Default: None dynamic : optional, tuples of bools Toggles task queue on/off for parallelism over trees (first value) and nodes within a tree (second). Set to a value False to divide iterations evenly among process groups. Set to True to allocate iterations with a task queue. If True, the number of processors available will be one fewer than the total as one will act as the task queue server. Yes, this can be set to (True, True). Try it. Default: (False, False) Examples -------- >>> import ytree >>> a = ytree.load("arbor/arbor.h5") >>> a.add_analysis_field("test_field", default=-1, units="Msun") >>> trees = list(a[:]) >>> for node in ytree.parallel_nodes(trees): ... node["test_field"] = 2 * node["mass"] # some analysis See Also -------- parallel_trees, parallel_tree_nodes """ if njobs is None: comm = _get_comm(()) # parallelize over trees if more trees than cores if len(trees) > comm.size: njobs = (0, 1) else: njobs = (1, 0) else: if not isinstance(njobs, (tuple, list)) or len(njobs) != 2: raise ValueError(f"njobs must be a tuple of length 2: {njobs}.") if dynamic is None: dynamic = (False, False) else: if not isinstance(dynamic, (tuple, list)) or len(dynamic) != 2: raise ValueError(f"dynamic must be a tuple of length 2: {dynamic}.") for tree in parallel_trees( trees, collect_results=collect_results, save_every=save_every, save_in_place=save_in_place, filename=filename, njobs=njobs[0], dynamic=dynamic[0]): for node in parallel_tree_nodes( tree, group=group, njobs=njobs[1], dynamic=dynamic[1]): yield node