"""
RockstarArbor class and member functions
"""
# -----------------------------------------------------------------------------
# Copyright (c) ytree development team. All rights reserved.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
# -----------------------------------------------------------------------------
import numpy as np
import os
import re
from unyt.exceptions import UnitParseError
from ytree.data_structures.arbor import CatalogArbor
from ytree.frontends.rockstar.fields import RockstarFieldInfo, setup_field_groups
from ytree.frontends.rockstar.io import RockstarDataFile
[docs]
class RockstarArbor(CatalogArbor):
"""
Class for Arbors created from Rockstar out_*.list files.
Use only descendent IDs to determine tree relationship.
"""
_field_info_class = RockstarFieldInfo
_data_file_class = RockstarDataFile
_default_dtype = np.float32
def _parse_parameter_file(self):
fgroups = setup_field_groups()
rems = [
f"{s[0]}{t}{s[1]}"
for s in [("(", ")"), ("", "")]
for t in ["physical, peculiar", "comoving", "physical"]
]
f = open(self.filename, "r")
# Read the first line as a list of all fields.
fields = f.readline()[1:].strip().split()
# Get box size, cosmological parameters, and units.
while True:
line = f.readline()
if line is None or not line.startswith("#"):
break
elif line.startswith("#Om = "):
pars = line[1:].split(";")
for j, par in enumerate(
["omega_matter", "omega_lambda", "hubble_constant"]
):
v = float(pars[j].split(" = ")[1])
setattr(self, par, v)
elif line.startswith("#Box size:"):
pars = line.split(":")[1].strip().split()
self.box_size = self.quan(float(pars[0]), pars[1])
# Looking for <quantities> in <units>
elif line.startswith("#Units:"):
if " in " not in line:
continue
quan, punits = line[8:].strip().split(" in ", 2)
for rem in rems:
while rem in punits:
pre, mid, pos = punits.partition(rem)
punits = pre + pos
try:
self.quan(1, punits)
except UnitParseError:
punits = ""
for group in fgroups:
if group.in_group(quan):
group.units = punits
break
f.close()
fi = {}
for i, field in enumerate(fields):
for group in fgroups:
units = ""
if group.in_group(field):
units = getattr(group, "units", "")
break
fi[field] = {"column": i, "units": units}
# the scale factor comes from the catalog file header
fields.append("scale_factor")
fi["scale_factor"] = {"source": "header", "units": ""}
self.field_list = fields
self.field_info.update(fi)
def _get_data_files(self):
"""
Get all out_*.list files and sort them in reverse order.
"""
reg = re.search(r"_\d+[_\.]", self.basename)
prefix = self.basename[: reg.start() + 1]
suffix = self.basename[reg.end() - 1 :]
freg = re.compile(rf"{prefix}\d+{suffix}")
my_files = [
os.path.join(self.directory, f)
for f in os.listdir(self.directory)
if freg.match(f)
]
# sort by catalog number
my_files.sort(
key=lambda x: self._get_file_index(x, prefix, suffix), reverse=True
)
self.data_files = [self._data_file_class(f, self) for f in my_files]
def _get_file_index(self, f, prefix, suffix):
return int(f[f.find(prefix) + len(prefix) : f.rfind(suffix)])
@classmethod
def _is_valid(self, *args, **kwargs):
"""
File should end in .list.
"""
fn = args[0]
if not os.path.basename(fn).startswith("out") or not fn.endswith(".list"):
return False
# filter out non-integers
reg = re.search(r"_\d+[_\.]\D", fn)
if reg is None:
return False
return True