"""
Arbor field-related classes
"""
# -----------------------------------------------------------------------------
# Copyright (c) ytree development team. All rights reserved.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
# -----------------------------------------------------------------------------
from collections import defaultdict
import numpy as np
import re
import weakref
from ytree.data_structures.detection import FieldDetector
from ytree.utilities.exceptions import (
ArborFieldAlreadyExists,
ArborFieldCircularDependency,
ArborFieldDependencyNotFound,
ArborDerivedFieldException,
ArborFieldNotFound,
)
from ytree.utilities.logger import ytreeLogger as mylog
def _redshift(field, data):
return 1.0 / data["scale_factor"] - 1.0
def _time(field, data):
co = data.arbor.cosmology
if co is None:
raise ArborDerivedFieldException(field["name"], data.arbor)
return data.arbor.cosmology.t_from_z(data["redshift"])
def _vector_func(field, data):
name = field["name"]
cfields = field.get("vector_components", [f"{name}_{ax}" for ax in "xyz"])
field_data = data.arbor.arr([data[cfield] for cfield in cfields])
field_data = np.rollaxis(field_data, 1)
return field_data
def _magnitude_func(field, data):
name = field["name"][: -len("_magnitude")]
return np.sqrt((data[name] ** 2).sum(axis=1))
[docs]
class FieldInfoContainer(dict):
"""
A container for information about fields.
"""
alias_fields = ()
known_fields = ()
vector_fields = ("position", "velocity", "angular_momentum")
data_types = ()
[docs]
def __init__(self, arbor):
self.arbor = weakref.proxy(arbor)
self._data_types = dict(self.data_types)
def setup_known_fields(self):
"""
Add units for fields on disk as defined in the known_fields
tuple.
"""
for field, units in self.known_fields:
if field not in self:
continue
funits = self[field].get("units")
if funits is None:
self[field]["units"] = units
def add_analysis_field(self, name, units, dtype=None, default=0):
"""
Add an analysis field.
"""
if name in self:
raise ArborFieldAlreadyExists(name, arbor=self)
if dtype is None:
dtype = self.arbor._default_dtype
self.arbor.analysis_field_list.append(name)
self[name] = {
"type": "analysis",
"default": default,
"dtype": dtype,
"units": units,
}
def add_alias_field(self, alias, field, units=None, force_add=True):
"""
Add an alias field.
"""
if alias in self:
if force_add:
ftype = self[alias].get("type", "on-disk")
if ftype in ["alias", "derived"]:
fl = self.arbor.derived_field_list
else:
fl = self.arbor.field_list
mylog.warning(
f'Overriding field "{alias}" that already exists as {ftype} field.'
)
fl.pop(fl.index(alias))
else:
return
if field not in self:
if force_add:
raise ArborFieldDependencyNotFound(field, alias, arbor=self)
else:
return
if units is None:
units = self[field].get("units")
self.arbor.derived_field_list.append(alias)
self[alias] = {"type": "alias", "units": units, "dependencies": [field]}
if "aliases" not in self[field]:
self[field]["aliases"] = []
self[field]["aliases"].append(alias)
def setup_aliases(self):
"""
Add aliases defined in the alias_fields tuple for each frontend.
"""
for alias in self.alias_fields:
aliasname, fieldname, units = alias
if not isinstance(fieldname, tuple):
fieldname = (fieldname,)
for fname in fieldname:
self.arbor.add_alias_field(
aliasname, fname, units=units, force_add=False
)
# Fields with "/" in the name don't play well with hdf5.
for field in self.arbor.field_list:
if "/" not in field:
continue
alias = field.replace("/", "_")
self.arbor.add_alias_field(alias, field)
def add_derived_field(
self,
name,
function,
units=None,
dtype=None,
description=None,
vector_components=None,
force_add=True,
):
"""
Add a derived field.
"""
if name in self:
if force_add:
ftype = self[name].get("type", "on-disk")
if ftype in ["alias", "derived"]:
fl = self.arbor.derived_field_list
else:
fl = self.arbor.field_list
mylog.warning(
f'Overriding field "{name}" that already exists as {ftype} field.'
)
fl.pop(fl.index(name))
else:
return
if units is None:
units = ""
if dtype is None:
dtype = self.arbor._default_dtype
info = {
"name": name,
"type": "derived",
"function": function,
"units": units,
"dtype": dtype,
"description": description,
}
if vector_components is not None:
info["vector_components"] = vector_components
fc = FieldDetector(self.arbor, name=name)
try:
rv = function(info, fc)
except ArborDerivedFieldException:
return
except TypeError as e:
raise RuntimeError(
"""
Field function syntax in ytree has changed. Field functions must
now take two arguments, as in the following:
def my_field(field, data):
return data['mass']
Check the TypeError exception above for more details.
"""
)
raise e
except ArborFieldDependencyNotFound as e:
if force_add:
raise e
else:
return
rv.convert_to_units(units)
info["dependencies"] = list(fc.keys())
self.arbor.derived_field_list.append(name)
self[name] = info
def setup_derived_fields(self):
"""
Add stock derived fields.
"""
self.arbor.add_derived_field("redshift", _redshift, units="", force_add=False)
self.arbor.add_derived_field("time", _time, units="Myr", force_add=False)
def add_vector_field(self, fieldname, cfields=None):
"""
Add vector and magnitude fields for a field with
x/y/z components.
If component fields not provided (with cfields kwarg), assume a
naming convention of <fieldname>_<xyz>.
"""
if cfields is None:
cfields = [f"{fieldname}_{ax}" for ax in "xyz"]
exists = all([field in self for field in cfields])
if not exists:
return None
for field in cfields:
self[field]["vector_fieldname"] = fieldname
units = self[cfields[0]].get("units", None)
self.arbor.add_derived_field(
fieldname, _vector_func, vector_components=cfields, units=units
)
self.arbor.add_derived_field(
f"{fieldname}_magnitude", _magnitude_func, units=units
)
self.vector_fields += (fieldname,)
return fieldname
def setup_vector_fields(self):
"""
Add vector and magnitude fields.
"""
vfields = set()
candidates = defaultdict(dict)
# match anything, followed by one xyz (case-insensitive),
# optionally followed by more characters.
vreg = re.compile(r"^(.+)([xyzXYZ]{1})(.*)$")
for field in self:
match = vreg.match(field)
if match is None:
continue
pre, comp, suf = match.groups()
# remove any trailing hyphens/underscores
pre = re.sub("[-_]{1}$", "", pre)
vfield = pre + suf
candidates[vfield][comp.lower()] = field
for vfield in candidates:
if len(candidates[vfield]) == 3:
vfields.add(vfield)
added_fields = []
for field in vfields.union(self.vector_fields):
comp_fields = (
sorted(candidates[field].values()) if field in candidates else None
)
field = self.add_vector_field(field, cfields=comp_fields)
if field is None:
continue
added_fields.append(field)
self.vector_fields = tuple(added_fields)
def resolve_field_dependencies(self, fields, fcache=None, fsize=None):
"""
Divide fields into those to be read and those to generate.
"""
if fcache is None:
fcache = {}
fields_to_read = []
fields_to_generate = []
fields_to_resolve = fields[:]
while len(fields_to_resolve) > 0:
field = fields_to_resolve.pop(0)
if field in fcache:
# Check that the field array is the size we want.
# It might not be if it was previously gotten just
# for the root and now we want it for the whole tree.
if fsize is None or fcache[field].shape[0] == fsize:
continue
del fcache[field]
if field not in self:
raise ArborFieldNotFound(field, self.arbor)
ftype = self[field].get("type")
if ftype == "derived" or ftype == "alias":
deps = self[field]["dependencies"]
if field in deps:
raise ArborFieldCircularDependency(field, self.arbor)
fields_to_resolve.extend(set(deps).difference(set(fields_to_resolve)))
if field not in fields_to_generate:
fields_to_generate.append(field)
elif ftype == "analysis":
fields_to_generate.append(field)
else:
if field not in fields_to_read:
fields_to_read.append(field)
return fields_to_read, fields_to_generate
[docs]
class FieldContainer(dict):
"""
A container for field data.
"""
[docs]
def __init__(self, arbor):
self.arbor = weakref.proxy(arbor)