Source code for ytree.frontends.rockstar.arbor

"""
RockstarArbor class and member functions



"""

#-----------------------------------------------------------------------------
# Copyright (c) ytree development team. All rights reserved.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#-----------------------------------------------------------------------------

import numpy as np
import os
import re

from unyt.exceptions import \
    UnitParseError

from ytree.data_structures.arbor import \
    CatalogArbor
from ytree.frontends.rockstar.fields import \
    RockstarFieldInfo, \
    setup_field_groups
from ytree.frontends.rockstar.io import \
    RockstarDataFile

[docs]class RockstarArbor(CatalogArbor): """ Class for Arbors created from Rockstar out_*.list files. Use only descendent IDs to determine tree relationship. """ _field_info_class = RockstarFieldInfo _data_file_class = RockstarDataFile _default_dtype = np.float32 def _parse_parameter_file(self): fgroups = setup_field_groups() rems = [f"{s[0]}{t}{s[1]}" for s in [("(", ")"), ("", "")] for t in ["physical, peculiar", "comoving", "physical"]] f = open(self.filename, "r") # Read the first line as a list of all fields. fields = f.readline()[1:].strip().split() # Get box size, cosmological parameters, and units. while True: line = f.readline() if line is None or not line.startswith("#"): break elif line.startswith("#Om = "): pars = line[1:].split(";") for j, par in enumerate(["omega_matter", "omega_lambda", "hubble_constant"]): v = float(pars[j].split(" = ")[1]) setattr(self, par, v) elif line.startswith("#Box size:"): pars = line.split(":")[1].strip().split() self.box_size = self.quan(float(pars[0]), pars[1]) # Looking for <quantities> in <units> elif line.startswith("#Units:"): if " in " not in line: continue quan, punits = line[8:].strip().split(" in ", 2) for rem in rems: while rem in punits: pre, mid, pos = punits.partition(rem) punits = pre + pos try: self.quan(1, punits) except UnitParseError: punits = "" for group in fgroups: if group.in_group(quan): group.units = punits break f.close() fi = {} for i, field in enumerate(fields): for group in fgroups: units = "" if group.in_group(field): units = getattr(group, "units", "") break fi[field] = {"column": i, "units": units} # the scale factor comes from the catalog file header fields.append("scale_factor") fi["scale_factor"] = {"source": "header", "units": ""} self.field_list = fields self.field_info.update(fi) def _get_data_files(self): """ Get all out_*.list files and sort them in reverse order. """ reg = re.search(r"_\d+[_\.]", self.basename) prefix = self.basename[:reg.start()+1] suffix = self.basename[reg.end()-1:] freg = re.compile(rf"{prefix}\d+{suffix}") my_files = [os.path.join(self.directory, f) for f in os.listdir(self.directory) if freg.match(f)] # sort by catalog number my_files.sort( key=lambda x: self._get_file_index(x, prefix, suffix), reverse=True) self.data_files = \ [self._data_file_class(f, self) for f in my_files] def _get_file_index(self, f, prefix, suffix): return int(f[f.find(prefix)+len(prefix):f.rfind(suffix)]) @classmethod def _is_valid(self, *args, **kwargs): """ File should end in .list. """ fn = args[0] if not os.path.basename(fn).startswith("out") or \ not fn.endswith(".list"): return False # filter out non-integers reg = re.search(r"_\d+[_\.]\D", fn) if reg is None: return False return True