"""Copyright 2018 Equinor ASA and The Netherlands Organisation for
Applied Scientific Research TNO.
Licensed under the MIT license.
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the conditions stated in the LICENSE file in the project root for
details.
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
"""
import copy
import configsuite
import collections
from .schema import assert_valid_schema
from .meta_keys import MetaKeys as MK
[docs]class ConfigSuite(object):
"""A `Suite` exposing the functionality of Config Suite in a unified manner.
The intended usage of Config Suite is via this immutable suite. It is constructed
with a `schema` describing the structure of a configuration, together with
a `raw_config` and possibly additional `layers`.
Parameters
----------
raw_config
The configuration taking precedence.
schema
A description of the structure of a valid configuration, together with
actions that are to be carried out.
layers: iterable of layers, optional
Additional layers of configuration. A layer takes precedence over all
other layers following it in the given sequence. Note that `raw_config`
takes precedence over all elements of `layers`.
extract_validation_context: callable, optional
Callable that extracts the context used for validation. The callable is
given a snapshot of the configuration as argument. Defaults to the
constant function always returning `None`.
extract_transformation_context: callable, optional
Callable that extracts the context used for transformations. The
callable is given a snapshot of the configuration as argument. Defaults
to the constant function always returning `None`.
deduce_required: bool, optional
Boolean that enables future behaviour of deducing whether a schema
entry is `required` by inspecting `allow_none` and `default`. In
particular, using `required` in schemas as well as not setting
`deduce_required=True` is deprecated.
Raises
------
TypeError, KeyError, ValueError
Approperiate errors are raised if provided with an invalid schema. Note
that errors are independent of `raw_config` and `layers` and hence not
user input responsive.
"""
def __init__(
self,
raw_config,
schema,
layers=(),
extract_validation_context=lambda snapshot: None,
extract_transformation_context=lambda snapshot: None,
deduce_required=False,
):
assert_valid_schema(schema, deduce_required=deduce_required)
self._layers = tuple(
[copy.deepcopy(layer) for layer in tuple(layers) + (raw_config,)]
)
self._schema = copy.deepcopy(schema)
self._extract_validation_context = extract_validation_context
self._extract_transformation_context = extract_transformation_context
self._readable = True
self._valid = True
self._errors = ()
self._snapshot = None
self._deduce_required = deduce_required
self._cached_merged_config = self._build_merged_config()
if self._readable:
self._validate_final()
self._assert_state()
@property
def valid(self):
"""A boolean indicating whether the resulting configuration was deemed
valid according to the schema."""
return self._valid
@property
def errors(self):
"""An iterable of `configsuite.errors` that occurred during validation. Is always
empty if `valid` is `True` and non-empty otherwise."""
return self._errors
@property
def readable(self):
"""A boolean indicating whether the resulting configuration was deemed
readable. Is always `True` if `valid` is `True`. For the consequences
of being readable, see the documentation of `snapshot`."""
return self._readable
@property
def snapshot(self):
"""A complete, immutable representation of the resulting configuration.
Raises
------
AssertionError
Raised on access if `suite` is not `readable`.
"""
if not self.readable:
err_msg = "Cannot build snapshot of unreadable configuration."
raise AssertionError(err_msg)
if self._snapshot is None:
self._snapshot = self._build_snapshot(self._merged_config, self._schema)
return self._snapshot
@property
def _validation_context(self):
return self._extract_validation_context(self.snapshot)
[docs] def push(self, raw_config):
"""Builds a new suite with `raw_config` on top of the current suite.
Builds a new suite with the same schema, but with `raw_config` on top
of the layers in the current suite.
Parameters
----------
raw_config:
A configuration that is to take precedence over all layers in the
current suite.
Returns
-------
A new `ConfigSuite` with `raw_config` as the first layer.
"""
return ConfigSuite(
raw_config,
self._schema,
layers=self._layers,
extract_validation_context=self._extract_validation_context,
extract_transformation_context=self._extract_transformation_context,
deduce_required=self._deduce_required,
)
@property
def _merged_config(self):
"""The merged config cannot be built and will also not be utilized if the
config is not readable. Due to this it is built lazily.
"""
if not self.readable:
err_msg = (
"Internal error: "
"Cannot build merged_config of unreadable configuration."
)
raise AssertionError(err_msg)
if self._cached_merged_config is None:
err_msg = (
"Internal error: "
"Merged config cannot be accessed before cache is built."
)
raise AssertionError(err_msg)
return self._cached_merged_config
def _build_merged_config(self):
layers = self._build_transformed_layers()
self._validate_readability(layers)
if not self.readable:
return None
merged_config = self._build_initial_merged_config(layers, self._schema)
merged_config = self._apply_transformations(merged_config)
self._validate_readability((merged_config,))
if not self.readable:
return None
merged_config = self._apply_context_transformations(merged_config)
self._validate_readability((merged_config,))
if not self.readable:
return None
return merged_config
def _build_transformed_layers(self):
layer_transformer = configsuite.Transformer(
self._schema, MK.LayerTransformation, (), bottom_up=False
)
layers = []
for layer in self._layers:
trans_layer = layer_transformer.transform(layer)
self._errors += trans_layer.errors
layers.append(trans_layer.result)
self._valid &= len(self._errors) == 0
return layers
def _build_initial_named_dict_merged_config(self, layers, schema):
rec = self._build_initial_merged_config
content_schema = schema[MK.Content]
def is_collection(schema_type):
return isinstance(schema_type, configsuite.types.Collection)
def is_defaultable(schema_item):
return MK.Default in schema_item or is_collection(schema_item[MK.Type])
layer_keys = set((key for layer in layers for key in layer.keys()))
defaultable_keys = set(
key for key, val in content_schema.items() if is_defaultable(val)
)
config = {}
for key in layer_keys | defaultable_keys:
child_layers = tuple([layer[key] for layer in layers if key in layer])
if key in defaultable_keys:
key_type = content_schema[key][MK.Type]
if is_collection(key_type):
default = key_type.create_empty()
else:
default = content_schema[key].get(MK.Default)
child_layers = (default,) + child_layers
if len(child_layers) == 0:
continue
if key in content_schema:
config[key] = rec(child_layers, content_schema[key])
else:
config[key] = child_layers[-1]
return config
def _build_initial_dict_merged_config(self, layers, schema):
rec = self._build_initial_merged_config
content_schema = schema[MK.Content]
config = {}
all_keys = (key for layer in layers for key in layer.keys())
for key in all_keys:
child_layers = tuple([layer[key] for layer in layers if key in layer])
if len(child_layers) == 0:
continue
config[key] = rec(child_layers, content_schema[MK.Value])
return config
def _build_initial_list_merged_config(self, layers, schema):
rec = self._build_initial_merged_config
item_schema = schema[MK.Content][MK.Item]
config = []
for layer in layers:
config += [rec((item,), item_schema) for item in layer]
return tuple(config)
def _build_initial_merged_config(self, layers, schema):
data_type = schema[MK.Type]
if isinstance(data_type, configsuite.types.BasicType):
return layers[-1]
elif data_type == configsuite.types.List:
return self._build_initial_list_merged_config(layers, schema)
elif data_type == configsuite.types.NamedDict:
return self._build_initial_named_dict_merged_config(layers, schema)
elif data_type == configsuite.types.Dict:
return self._build_initial_dict_merged_config(layers, schema)
else:
msg = "Encountered unknown type {} while building raw config"
raise TypeError(msg.format(str(data_type)))
def _apply_transformations(self, config):
transformer = configsuite.Transformer(self._schema, MK.Transformation, ())
trans_res = transformer.transform(config)
self._errors += trans_res.errors
self._valid &= len(trans_res.errors) == 0
return trans_res.result
def _apply_context_transformations(self, config):
prelim_snapshot = self._build_snapshot(config, self._schema)
try:
context = self._extract_transformation_context(prelim_snapshot)
# pylint: disable=broad-except
except Exception as e:
self._valid = False
self._errors += (configsuite.ContextExtractionError(str(e), ()),)
return config
context_transformer = configsuite.Transformer(
self._schema, MK.ContextTransformation, (context,)
)
trans_res = context_transformer.transform(config)
self._errors += trans_res.errors
self._valid &= len(trans_res.errors) == 0
return trans_res.result
def _build_named_dict_snapshot(self, config, schema):
data_type = schema[MK.Type]
content_schema = schema[MK.Content]
dict_name = data_type.name
dict_keys = sorted(content_schema.keys())
dict_collection = collections.namedtuple(dict_name, dict_keys)
return dict_collection(
**{
key: self._build_snapshot(config.get(key), content_schema[key])
for key in content_schema
}
)
def _build_list_snapshot(self, config, schema):
item_schema = schema[MK.Content][MK.Item]
return tuple([self._build_snapshot(elem, item_schema) for elem in config])
def _build_dict_snapshot(self, config, schema):
key_schema = schema[MK.Content][MK.Key]
value_schema = schema[MK.Content][MK.Value]
Pair = collections.namedtuple("KeyValuePair", ["key", "value"])
return tuple(
[
Pair(
self._build_snapshot(key, key_schema),
self._build_snapshot(value, value_schema),
)
for key, value in config.items()
]
)
def _build_snapshot(self, config, schema):
if config is None:
return None
data_type = schema[MK.Type]
if isinstance(data_type, configsuite.BasicType):
return config
elif data_type == configsuite.types.NamedDict:
return self._build_named_dict_snapshot(config, schema)
elif data_type == configsuite.types.List:
return self._build_list_snapshot(config, schema)
elif data_type == configsuite.types.Dict:
return self._build_dict_snapshot(config, schema)
else:
msg = "Encountered unknown type {} while building snapshot"
raise TypeError(msg.format(str(data_type)))
def _validate_readability(self, layers):
readable_errors = (configsuite.UnknownKeyError, configsuite.MissingKeyError)
def _not_container(schema):
return not isinstance(schema[MK.Type], configsuite.types.Collection)
container_validator = configsuite.Validator(
self._schema, stop_condition=_not_container, apply_validators=False
)
container_errors = []
for idx, layer in enumerate(layers):
val_res = container_validator.validate(layer)
container_errors += [
error.create_layer_error(idx)
for error in val_res.errors
if not isinstance(error, readable_errors)
]
self._readable &= len(container_errors) == 0
self._valid &= self._readable
self._errors += tuple(container_errors)
def _validate_final(self):
if not self.readable:
err_msg = "Internal error: Not readable when doing final validation"
raise AssertionError(err_msg)
self._assert_state()
validator = configsuite.Validator(self._schema)
val_res = validator.validate(self._merged_config, self._validation_context)
self._valid &= val_res.valid
self._errors += val_res.errors
def _assert_state(self):
"""Asserts that the internal state is consistent. In particular we will
verify that:
- not readable => not valid
- valid <=> no errors
"""
if not self.readable and self.valid:
err_msg = "Internal error: Config is valid, but not readable"
raise AssertionError(err_msg)
if self.valid and len(self.errors) > 0:
err_msg = "Internal error: Config is valid, but has errors"
raise AssertionError(err_msg)
if not self.valid and len(self.errors) == 0:
err_msg = "Internal error: Config is not valid, but has no errors"
raise AssertionError(err_msg)