import warnings
import typing
import zipfile
import tinydb
import slippy
import os
import numpy as np
from collections.abc import Sequence
if slippy.CUDA:
import cupy as cp
__all__ = ['OutputRequest', 'OutputSaver', 'OutputReader', 'read_output']
class OutputSaver:
"""
A context manager for saving model output
Parameters
----------
model_name: str
The name of the model, to be used for the file names of the outputs
Notes
-----
This writes the outputs to a database file (*.sdb). The arrays cannot be stored in the database and are instead
stored in binary in a zip repository (*.sar). The entries in the data base are replaced with the file names in the
zip repository, the shape of the output and the data type.
See Also
--------
OutputReader - A class for reading output files back in
"""
def __init__(self, model_name: str):
self.model_name = model_name
self.in_context = False
self.database_file = None
self.array_file = None
self.array_number = 0
def write(self, output: dict):
"""Write outputs to file,
Parameters
----------
output: dict
The outputs to be added to the database
"""
if not self.in_context:
raise ValueError("Output saver is not in context cannot save")
clean_dict = dict()
for key, value in output.items():
if slippy.CUDA and isinstance(value, cp.ndarray):
value = cp.asnumpy(value)
if isinstance(value, np.ndarray) and value.size == 1:
try:
value = float(value)
except TypeError:
pass
if isinstance(value, np.ndarray):
name = 'a' + str(self.array_number)
if value.flags.f_contiguous:
value = np.ascontiguousarray(value)
self.array_file.writestr(name, value.tobytes())
value = '**array**#' + str(self.array_number) + '#' + str(value.shape) + '#' + str(value.dtype)
self.array_number += 1
clean_dict[key] = value
self.data_file.insert(clean_dict)
def __enter__(self):
db_filename = os.path.join(slippy.OUTPUT_DIR, self.model_name + '.sdb')
if os.path.exists(db_filename):
os.remove(db_filename)
self.data_file = tinydb.TinyDB(db_filename, 'w')
self.array_file = zipfile.ZipFile(os.path.join(slippy.OUTPUT_DIR, self.model_name + '.sar'), 'w')
self.in_context = True
return self
def __exit__(self, err_type, value, traceback):
self.data_file.close()
self.array_file.close()
self.in_context = False
[docs]class OutputRequest:
"""An output request for a multi step contact model
Parameters
----------
name: str
The name of the output request, used for debugging
parameters: list[str]
A list of parameters to be saved as part of the output, check step and sub-model descriptions for valid
names, alternatively ['all'] will save the entire model state
timing_mode: {'interval', 'time_interval', 'n_outputs_per_step', 'step_times', 'global_times'},
optional ('interval')
The timing mode used to control when this output is written to file, the value parameter controls how each
method is executed:
* 'interval' the output is written every n time points during execution, using this with value set to 1
(as is the default behaviour) writes the output on every sub step / time point.
* 'time_interval' the output is written every n units of time, for example if value is set to 0.25 a step
which is 1 unit of time long will output 4 times [t = 0.25, 0.5, 0.75, 1]. This is reset at the
beginning of each model step.
* 'n_outputs_per_step' the output is written n times in each step it is active in regardless of the length
of each step, the time points are evenly spaced. value is the number of outputs to be recorded
* 'step times' the output is written at specific times measured from the beginning of the steps which the
output is active in, this resets for every model step which uses this output. value should be a sorted
list of relative times
* 'global_times' the output is written at specific times measured from the start of model execution, no
output will be written from steps where this output is not active. value should be a sorted list of
absolute times
value: Union[list, float], optional (1)
Either a float or list of floats, as defined above
Notes
-----
Outputs cannot be requested from the start of a step.
Outputs will only be written from steps which have the output added, this can be done through the individual
steps or the contact model.
See Also
--------
OutputReader - A class for easily reading output files
Examples
--------
An output request that will save everything at all time points:
>>> OutputRequest('output-0', ['all'])
An output request that will save 'contact_nodes' and normal load for every time step:
>>> OutputRequest('output-1', ['contact_nodes', 'loads_z'])
"""
parameters: typing.Sequence[str]
[docs] def __init__(self, name: str, parameters: typing.Sequence[str], timing_mode: str = 'interval',
value: typing.Union[typing.Sequence, float] = 1):
self.name = name
valid_modes = ('interval', 'time_interval', 'n_outputs_per_step', 'step_times', 'global_times')
if timing_mode not in valid_modes:
raise ValueError(f"Unrecognised timing mode: {timing_mode}, should be one of {', '.join(valid_modes)}")
if not isinstance(parameters, Sequence) or isinstance(parameters, str):
raise ValueError("Parameters must be a sequence of strings")
self.timing = timing_mode
if timing_mode == 'interval':
self.interval = value
self._interval_count = value
if timing_mode == 'time_interval':
self.interval = value
self.next_time_point = 0
if timing_mode == 'n_outputs_per_step':
self.interval = value
if timing_mode == 'step_times':
self.interval = None
self.step_times = value
if timing_mode == 'global_times':
self.interval = None
self.time_points = iter(value)
try:
self.next_time_point = next(self.time_points)
except StopIteration:
raise ValueError("Global times must have length of at least 1")
self.parameters = [str(p) for p in parameters]
self.start_time_this_step = 0
self.end_time_this_step = None
def new_step(self, current_time):
"""
Called by the model when a new step is started
"""
self.start_time_this_step = current_time
self.end_time_this_step = None
if self.timing == 'time_interval':
self.next_time_point = current_time
def is_active(self, current_time: float, step_max_time: float):
"""True if the output should be written on this time step
"""
if self.timing == 'interval':
if self._interval_count < self.interval:
self._interval_count += 1
return False
else:
self._interval_count = 1
return True
if self.timing == 'time_interval':
if current_time >= self.next_time_point:
self.next_time_point += self.interval
return True
return False
if self.end_time_this_step is None: # then new_step was just called
self.end_time_this_step = self.start_time_this_step + step_max_time
if self.timing == 'n_outputs_per_step':
self.time_points = iter(np.linspace(self.start_time_this_step, self.end_time_this_step, self.interval))
if self.timing == 'step_times':
self.time_points = iter([self.start_time_this_step + st for st in self.step_times])
try:
self.next_time_point = next(self.time_points)
except StopIteration:
self.next_time_point = None
if self.timing in ['global_times', 'n_outputs_per_step', 'step_times']:
if self.next_time_point is not None and current_time >= self.next_time_point:
try:
self.next_time_point = next(self.time_points)
except StopIteration:
self.next_time_point = None
return True
return False
warnings.warn(f"Output {self.name}, could not be written, output timing not recognised")
return False
[docs]class OutputReader:
"""
A class for reading and querying output files (.sdb) and array files (.sar)
Parameters
----------
file_name: str
The path to the .sdb file with or without the extension, there should be a corresponding .sar file with the
same name in the same directory
Attributes
----------
fields: set
A set of all the fields which appear in the output database
time_points: list
A list of the all the time points in the output file
See Also
--------
OutputSaver
OutputRequest
Notes
-----
All arrays in the output will be lazily read from the array file. These will not be read until the data from the
array is requested. However, the lazy array objects are written so that numpy functions can use them as if they are
arrays and they can be indexed as if they are arrays. Indexing the array will read the entire array into memory.
For many uses this is good enough, however sometimes it is necessary to explicitly convert the lazy array into a
numpy array using numpy.asarray(lazy_array).
Examples
--------
Making an output file to test:
>>> import slippy.contact as c
>>> with c.OutputSaver('test') as output_s:
>>> output_s.write({'time':0,'a':5, 'b':np.array([1,2,3,4]), 'c':np.array([1,2,3,4])})
>>> output_s.write({'time':1,'a':10, 'c':np.array([1,2,3,4])})
>>> output_s.write({'time':2,'a':15, 'b':np.array([1,2,3,4]), })
>>> output_s.write({'time':3,'a':20, 'b':np.array([1,2,3,4]), 'c':np.array([1,2,3,4])})
The file can then be read in using an output reader:
>>> output = c.OutputReader('test')
We can find the time points and fields saved:
>>> output.time_points
[0, 1, 2, 3]
>>> output.fields
{'a', 'b', 'c', 'time'}
The output reader can be indexed by a time point or a field:
>>> output[1]
{'time': 1, 'a': 10, 'c': array, shape:(4,), dtype:int32}
>>> output['a']
{0: 5, 1: 10, 2: 15, 3: 20}
The result of this index is a dictionary of the results from a single time point or all the results from a field,
indexing again will give the result for a specific field at a specific time point.
>>> output['a'][1] == output[1]['a']
True
For plotting etc. all of the values can be accessed:
>>> all_times = list(output['time'].values())
You can also query the results data base directly using tinydb, note that this will not work with array items.
>>> from tinydb import Query
>>> result = Query()
>>> output.search(result.a == 10)
[{'time': 1, 'a': 10, 'c': array, shape:(4,), dtype:int32}]
The output reader can also be use to iterate through the results from each time point:
>>> for result_from_time_point in output:
>>> print(result_from_time_point['time'])
0
1
2
3
"""
[docs] def __init__(self, file_name):
if not file_name.endswith('.sdb'):
file_name += '.sdb'
self.file_name = file_name
with tinydb.TinyDB(file_name) as db:
self.all_entries = db.search(tinydb.Query().time >= 0)
self.fields = set()
self.time_points = []
for entry in self.all_entries:
self.fields.update(entry)
self.time_points.append(entry['time'])
def __getitem__(self, time_point_or_field):
with tinydb.TinyDB(self.file_name) as db:
if time_point_or_field in self.time_points:
query = tinydb.Query()
raw_dict = db.search(query.time == time_point_or_field)[0]
elif time_point_or_field in self.fields:
f = time_point_or_field
raw_dict = {entry['time']: (entry[f] if f in entry else None) for entry in self.all_entries}
else:
raise IndexError("Time point or field not recognised")
return _array_gen(raw_dict, self.file_name[:-4] + '.sar')
def search(self, query):
"""Wraps TinyDB search method replacing array codes in database with lazy arrays
Parameters
----------
query: tinydb.Query
A valid tinydb Query
Returns
-------
list of results that match the query
Notes
-----
Queries cannot match against arrays as these are not stored directly in the database
Examples
--------
>>> with OutputSaver('test') as output:
>>> output.write({'time':0,'a':5, 'b':np.array([1,2,3,4]), 'c':np.array([1,2,3,4])})
>>> output.write({'time':1,'a':10, 'c':np.array([1,2,3,4])})
>>> output.write({'time':2,'a':15, 'b':np.array([1,2,3,4]), })
>>> output.write({'time':3,'a':20, 'b':np.array([1,2,3,4]), 'c':np.array([1,2,3,4])})
>>> output = OutputReader('test')
>>> from tinydb import Query
>>> result = Query()
>>> output.search(result.a == 10)
[{'time': 1, 'a': 10, 'c': array, shape:(4,), dtype:int32}]
"""
with tinydb.TinyDB(self.file_name) as db:
results_list = db.search(query)
filled_results = []
for result_dict in results_list:
filled_results.append(_array_gen(result_dict, self.file_name[:-4] + '.sar'))
return filled_results
def __iter__(self):
current = 0
while current < len(self.time_points):
yield self[self.time_points[current]]
current += 1
def __repr__(self):
return (f"OutputReader \n"
f"fields: {', '.join(self.fields)} \n"
f"time points {', '.join([str(tp) for tp in self.time_points])}")
def _array_gen(output_dict, array_file_name):
"""
Helper function to replace string codes for _ArrayReader objects in the output of a query
Parameters
----------
output_dict: dict
A dict, normally the result of a query on the database
array_file_name: str
The file name of the array file which goes with the output database
Returns
-------
output_dict: dict
The input dict modified by changing any array code values to _ArrayReader objects (lazily read arrays)
"""
for key, value in output_dict.items():
if isinstance(value, str) and value.startswith('**array**'):
output_dict[key] = _ArrayReader(array_file_name, value)
return output_dict
class _ArrayReader:
"""
Helper class for reading arrays from zip file, should act like an array but
lazily loads from the zipfile.
Parameters
----------
file_name: str
The full path to the array file (.sar)
entry_string: str
The string from the corresponding entry in the output database
Examples
--------
>>> with OutputSaver('test') as output_files:
>>> output_files.write({'my_array':np.array([1,2,3,4])})
>>> lazy_array = _ArrayReader('test.sar', '**array**#0#(4,)#int32')
>>> # ^ file name in zip repo
>>> # ^ shape of array
>>> # ^ dtype of array
>>> np.max(lazy_array)
4
>>> lazy_array[0]
1
"""
def __init__(self, file_name: str, entry_string: str):
self.file_name = file_name
hashes = [n for n, v in enumerate(entry_string) if v == '#']
self.file_num = entry_string[hashes[0] + 1:hashes[1]]
shape = entry_string[hashes[1] + 1:hashes[2]]
if any([s not in '1234567890 ,()' for s in shape]):
raise ValueError("Data in this file has been modified, unable to read")
self.shape = eval(shape)
self.dtype_name = entry_string[hashes[2] + 1:]
# noinspection PyUnresolvedReferences
self.dtype = np.__getattribute__(self.dtype_name)
self._array = None
@property
def array(self):
if self._array is None:
self._fill_array()
return self._array
def _fill_array(self):
with zipfile.ZipFile(self.file_name, 'r') as file:
self._array = np.frombuffer(file.read('a' + self.file_num), dtype=self.dtype).reshape(self.shape)
def __array__(self):
return self.array
def __getitem__(self, *items):
return self.array.__getitem__(*items)
def __repr__(self):
return f"array, shape:{self.shape}, dtype:{self.dtype_name}"
def read_output(file_name: str) -> OutputReader:
"""Read a pair of output files
Parameters
----------
file_name: str
The file name or full path to the database file (.sdb)
Returns
-------
OutputReader object
Notes
-----
The returned OutputReader object has attributes time_points and fields, these are all the time points for which
an output was recorded (for single step static analysis there will be one time point at t=1)
Indexing the OutputReader object with a time point will return all of the results from that time point as a
dictionary
Indexing the OutputReader with a field will give a dict of the results from that field for every time point.
With the keys being the time values for each measurement, missing values will be replaced by None
The result is that indexing with a time point then a field or a field then a time point gives the result for that
field at the specific time.
Any arrays in the results will be lazily read when they are first accessed, these can be converted to numpy arrays
by array = np.array(lazy_array). However they should work with numpy functions and indexing without conversion.
Examples
--------
>>> with OutputSaver('test') as output:
>>> output.write({'time':0,'a':5, 'b':np.array([1,2,3,4]), 'c':np.array([1,2,3,4])})
>>> output.write({'time':1,'a':10, 'c':np.array([1,2,3,4])})
>>> output.write({'time':2,'a':15, 'b':np.array([1,2,3,4]), })
>>> output.write({'time':3,'a':20, 'b':np.array([1,2,3,4]), 'c':np.array([1,2,3,4])})
>>> output = read_output('test')
>>> output.time_points
[0,1,2,3]
>>> output[1]
{'time': 1, 'a': 10, 'c': array, shape:(4,), dtype:int32} # dict of results at a specific time
>>> output.fields
{'a', 'b', 'c', 'time'}
>>> output['a']
{0: 5, 1: 10, 2: 15, 3: 20} # dict with keys of time points and values of 'a' at each time point
>>> output['a'][1] == output[1]['a']
True
"""
return OutputReader(file_name)