Module ephemeris_library.attitude
Expand source code
import ephemeris_library.database as database
import ephemeris_library.common as common
from ephemeris_library.common import add_utc_column
import datetime
import numpy
from typing import Union
from numpy.lib import recfunctions
def get_yaw_pitch_roll(start_time: Union[datetime.datetime, str, int, float],
end_time: Union[datetime.datetime, str, int, float],
minimum_quality: str = None,
detail=False):
'''
Get yaw pitch and roll information, with data source previous and next information also.
Arguments:
start_time: int, float, or string representing met time.
datetime object or iso formatted date string.
Time will be rounded down to start of second.
end_time: int, float, or string representing met time.
datetime object or iso formatted date string.
minimum_quality: String which matches attitude.ATTITUDE_DATA_SOURCE key representing the minimum quality to return.
Default None, which returns all results without filtering.
detail: bool. Appends data_source_* & seconds_to_*_solution columns.
Return: A numpy structured array with columns representing the requested YPR & metadata columns.
| Column | Type | Condition |
|:------ |:---- |:--------- |
| met | float64 | Always |
| yaw | float64 | Always |
| pitch | float64 | Always |
| roll | float64 | Always |
| quality | float64 | Always |
| data_source_previous | float64 | if detail |
| data_source_next | float64 | if detail |
| seconds_to_previous_solution | float64 | if detail |
| seconds_to_next_solution | float64 | if detail |
'''
# Check version
common._check_version()
# Verify time inputs.
start_time = common._verify_time(start_time)
end_time = common._verify_time(end_time)
# Round start_time down to start of second.
start_time = int(start_time)
# Verify the minimum quality.
minimum_quality_id = common._verify_quality(minimum_quality)
if end_time < start_time:
raise ValueError('Error: end_time before start_time.')
data = _get_ypr(start_time, end_time, minimum_quality_id, detail)
return data
def interpolate_ypr(data, time_list, ignore_dropout=False):
'''
Interpolate the given YPR data into the given timestamps.
Interfaces with the quaternion library for the interpolation.
Arguments:
data: numpy structured array generated by get_yaw_pitch_roll with detail=True.
Structure: Must include the following:
[met, yaw, pitch, roll, data_source_previous, data_source_next,
seconds_to_previous_solution, seconds_to_next_solution]
time_list: an time list that data is to be interpolated to.
Must be a list of MET (int/float), datetime, or ISO-8601 strings.
ignore_dropout: bool, default False.
Normally overrides any region with a telemetry droout greater than the global
constant quaternions.TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions.
Setting this true disregards this limit, and interpolates through anything.
Return:
data: numpy structured array.
Names: [met, yaw, pitch, roll, quality, data_source_previous, data_source_next,
seconds_to_previous_solution, seconds_to_next_solution]
dtypes:
float64: met, yaw, pitch, roll, seconds_to_previous_solution, seconds_to_next_solution
int32: quality, data_source_previous, data_source_next
Note: 1Hz masking of the telemetry may lead to hidden datapoints on
the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
'''
import ephemeris_library.quaternions as quaternions # Does the actual interpolation work
# Tranlates the YPR to and from Quaternions.
import ephemeris_library.rotation_libraries.lib_quaternion as lib_quaternion
# Make a preliminary copy of the input dtype names, as they're manipulated in this function.
input_columns = data.dtype.names
# Preamble & sanity checks:
# Make sure all column are present.
_required_columns = ['met', 'yaw', 'pitch', 'roll', 'quality', 'data_source_previous',
'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']
for _column in _required_columns:
if _column not in data.dtype.names:
raise ValueError("interpolate_ypr - Missing a required column in the data structure: {:s}".format(_column))
# Extract the YPR for translation into Quaternions.
YPR_data = recfunctions.structured_to_unstructured(data[['yaw', 'pitch', 'roll']], dtype=float)
# Recast as an unstructured array for the use by lib_quaternion library, as it does not (yet) allow a structured set.
# Format is of < Yaw, Pitch, Roll >, all of floats.
# Translate the YPR data into the CRF->Body quaternions.
quat_data = lib_quaternion.YPR_to_Quaternion(YPR_data) # <Y,P,R> -> <r,i,j,k> Does not require MET.
# the YPR->Quat function allows for 999-YPR values to be given to it, returning crap.
# We need to keep the met/metadata lines for extrapolation purposes, so override any 999-YPR data with zero-quaternions
quat_data[YPR_data[:, 0] > 360.] = 0 # If YAW > 360* (==999, but equivalence in floats is annoying.
# Merge the Quaternions into the data variable.
# First strip the Data into the minimum viable dataset,
data = data[['met', 'quality', 'data_source_previous', 'data_source_next',
'seconds_to_previous_solution', 'seconds_to_next_solution']]
data = data.copy() # And make sure it's a static object, not a view.
# And then add the new columns to the dataset:
_names = ['r', 'i', 'j', 'k']
_types = [float, float, float, float]
r, i, j, k = quat_data.T # Split into invividual columns for use below.
data = recfunctions.append_fields(data, _names, [r, i, j, k], _types, usemask=False)
# Re-organize the dataset for consistency.
data = data[['met', 'r', 'i', 'j', 'k', 'quality', 'data_source_previous',
'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']]
# Feed the intermediate 'quaternion+metadata' structure into the quaternion interpolator:
interpolated_data = quaternions.interpolate_quaternions(data, time_list, ignore_dropout)
# Extract the interpolated quaternions:
output_quat = recfunctions.structured_to_unstructured(interpolated_data[['r', 'i', 'j', 'k']], dtype=float)
output_YPR = lib_quaternion.Quaternion_to_YPR(output_quat) # <r,i,j,k> -> <Y,P,R> - Does not require MET.
# And if any of the returned interpolated quaternions was a zero-quat, this was within a period of dropout or extrapolation.
# Override all zero-quaternions with a YPR value of 999.
output_YPR[lib_quaternion._Is_Zero_Quat(output_quat)] = 999.0
# Note the 'ignore_dropout' arg is not applied here, as it's guaranteed by the interpolator function to return
# non-zero quaternions in non-extrapolated regions
output_dtype = [('met', 'float64'),
('yaw', 'float64'),
('pitch', 'float64'),
('roll', 'float64'),
('quality', 'int32'),
('data_source_previous', 'int32'),
('data_source_next', 'int32'),
('seconds_to_previous_solution', 'float64'),
('seconds_to_next_solution', 'float64')]
# As can't build type-safely using numpy.stack or numpy.column_stack,
# build an empty structured array, then assign each array to the column.
output_data = numpy.empty(len(output_YPR), dtype=output_dtype)
# Then assign pointers.
output_data['met'] = interpolated_data['met']
output_data['yaw'] = output_YPR[:, 0]
output_data['pitch'] = output_YPR[:, 1]
output_data['roll'] = output_YPR[:, 2]
output_data['quality'] = interpolated_data['quality']
output_data['data_source_previous'] = interpolated_data['data_source_previous']
output_data['data_source_next'] = interpolated_data['data_source_next']
output_data['seconds_to_previous_solution'] = interpolated_data['seconds_to_previous_solution']
output_data['seconds_to_next_solution'] = interpolated_data['seconds_to_next_solution']
# And return!
return output_data
def add_string_columns(data: numpy.ndarray, use_short_names=False):
'''
Adds string-columns to the given quaternion dataset.
Arguments:
data: numpy structured array generated by get_attitude_quaternions.
Must include at least one of ['quality','data_source_previous', 'data_source_next']
use_short_names: Optional Bool. Defaults False.
If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT.
if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.
Return:
data: numpy structured array.
Appends the following columns if given.
'quality_string'
'data_source_previous_string'
'data_source_next_string'
dtypes are Unicode-32
'''
if 'quality' in data.dtype.names:
if use_short_names:
data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT)
else:
data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL)
if 'data_source_previous' in data.dtype.names:
data = common.add_string_column(data, 'data_source_previous',
'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING)
if 'data_source_next' in data.dtype.names:
data = common.add_string_column(data, 'data_source_next', 'data_source_next_string',
common.ATTITUDE_DATA_SOURCE_STRING)
return data
def _get_ypr(start_met: int,
end_met: int,
minimum_data_source_id: int,
detail: bool = False):
'''
Get yaw, pitch, and roll information from database.
Arguments:
start_met: float representing met time.
end_met: float representing met time.
minimum_data_source_id: minimum data_quality to return from the database.
detail: (bool) Default False. Return yaw, pitch, roll, quality, and data_source_previous, data_source_next, seconds_to_previous_solution, and seconds_to_next_solution
Return:
numpy structured array: [(met, yaw, pitch, roll, quality),
dtype=(float64, float64, float64, float64, int32)]
If detail: [(met, yaw, pitch, roll, quality,
data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution),
dtype=(float64, float64, float64, float64, int32,
int32, int32, float64, float64)]
'''
fields = ['met', 'yaw', 'pitch', 'roll', 'quality']
if detail:
fields.extend(['data_source_previous', 'data_source_next',
'seconds_to_previous_solution', 'seconds_to_next_solution'])
database_fields = ['`' + f + '`' for f in fields]
# connect to database and fetch data
statement = ('SELECT ' + ', '.join(database_fields)
+ ' FROM `cassiope_ephemeris`.`ypr`'
+ ' WHERE `met` BETWEEN ' + str(start_met) + ' AND ' + str(end_met)
+ ' AND `quality` >= ' + str(minimum_data_source_id)
+ ' ORDER BY `met` ASC')
connection = database._connect_to_database()
cursor = connection.cursor()
cursor.execute(statement)
data = cursor.fetchall()
cursor.close()
connection.close()
# return results
dtype_list = [('met', 'float64'),
('yaw', 'float64'),
('pitch', 'float64'),
('roll', 'float64'),
('quality', 'int32')
]
if detail:
dtype_list.extend([('data_source_previous', 'int32'),
('data_source_next', 'int32'),
('seconds_to_previous_solution', 'float64'),
('seconds_to_next_solution', 'float64')])
results = numpy.array(data, dtype=dtype_list)
return results
Functions
def add_string_columns(data: numpy.ndarray, use_short_names=False)
-
Adds string-columns to the given quaternion dataset.
Arguments
data: numpy structured array generated by get_attitude_quaternions. Must include at least one of ['quality','data_source_previous', 'data_source_next']
use_short_names: Optional Bool. Defaults False. If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT. if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.
Return
data: numpy structured array. Appends the following columns if given. 'quality_string' 'data_source_previous_string' 'data_source_next_string'
dtypes are Unicode-32
Expand source code
def add_string_columns(data: numpy.ndarray, use_short_names=False): ''' Adds string-columns to the given quaternion dataset. Arguments: data: numpy structured array generated by get_attitude_quaternions. Must include at least one of ['quality','data_source_previous', 'data_source_next'] use_short_names: Optional Bool. Defaults False. If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT. if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL. Return: data: numpy structured array. Appends the following columns if given. 'quality_string' 'data_source_previous_string' 'data_source_next_string' dtypes are Unicode-32 ''' if 'quality' in data.dtype.names: if use_short_names: data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT) else: data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL) if 'data_source_previous' in data.dtype.names: data = common.add_string_column(data, 'data_source_previous', 'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING) if 'data_source_next' in data.dtype.names: data = common.add_string_column(data, 'data_source_next', 'data_source_next_string', common.ATTITUDE_DATA_SOURCE_STRING) return data
def get_yaw_pitch_roll(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], minimum_quality: str = None, detail=False)
-
Get yaw pitch and roll information, with data source previous and next information also.
Arguments
start_time: int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second. end_time: int, float, or string representing met time. datetime object or iso formatted date string. minimum_quality: String which matches attitude.ATTITUDE_DATA_SOURCE key representing the minimum quality to return. Default None, which returns all results without filtering. detail: bool. Appends data_source_ & seconds_to__solution columns.
Return: A numpy structured array with columns representing the requested YPR & metadata columns.
Column Type Condition met float64 Always yaw float64 Always pitch float64 Always roll float64 Always quality float64 Always data_source_previous float64 if detail data_source_next float64 if detail seconds_to_previous_solution float64 if detail seconds_to_next_solution float64 if detail Expand source code
def get_yaw_pitch_roll(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], minimum_quality: str = None, detail=False): ''' Get yaw pitch and roll information, with data source previous and next information also. Arguments: start_time: int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second. end_time: int, float, or string representing met time. datetime object or iso formatted date string. minimum_quality: String which matches attitude.ATTITUDE_DATA_SOURCE key representing the minimum quality to return. Default None, which returns all results without filtering. detail: bool. Appends data_source_* & seconds_to_*_solution columns. Return: A numpy structured array with columns representing the requested YPR & metadata columns. | Column | Type | Condition | |:------ |:---- |:--------- | | met | float64 | Always | | yaw | float64 | Always | | pitch | float64 | Always | | roll | float64 | Always | | quality | float64 | Always | | data_source_previous | float64 | if detail | | data_source_next | float64 | if detail | | seconds_to_previous_solution | float64 | if detail | | seconds_to_next_solution | float64 | if detail | ''' # Check version common._check_version() # Verify time inputs. start_time = common._verify_time(start_time) end_time = common._verify_time(end_time) # Round start_time down to start of second. start_time = int(start_time) # Verify the minimum quality. minimum_quality_id = common._verify_quality(minimum_quality) if end_time < start_time: raise ValueError('Error: end_time before start_time.') data = _get_ypr(start_time, end_time, minimum_quality_id, detail) return data
def interpolate_ypr(data, time_list, ignore_dropout=False)
-
Interpolate the given YPR data into the given timestamps. Interfaces with the quaternion library for the interpolation.
Arguments
data: numpy structured array generated by get_yaw_pitch_roll with detail=True. Structure: Must include the following: [met, yaw, pitch, roll, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution]
time_list: an time list that data is to be interpolated to. Must be a list of MET (int/float), datetime, or ISO-8601 strings.
ignore_dropout: bool, default False. Normally overrides any region with a telemetry droout greater than the global constant quaternions.TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions. Setting this true disregards this limit, and interpolates through anything.
Return
data: numpy structured array. Names: [met, yaw, pitch, roll, quality, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] dtypes: float64: met, yaw, pitch, roll, seconds_to_previous_solution, seconds_to_next_solution int32: quality, data_source_previous, data_source_next
Note: 1Hz masking of the telemetry may lead to hidden datapoints on the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
Expand source code
def interpolate_ypr(data, time_list, ignore_dropout=False): ''' Interpolate the given YPR data into the given timestamps. Interfaces with the quaternion library for the interpolation. Arguments: data: numpy structured array generated by get_yaw_pitch_roll with detail=True. Structure: Must include the following: [met, yaw, pitch, roll, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] time_list: an time list that data is to be interpolated to. Must be a list of MET (int/float), datetime, or ISO-8601 strings. ignore_dropout: bool, default False. Normally overrides any region with a telemetry droout greater than the global constant quaternions.TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions. Setting this true disregards this limit, and interpolates through anything. Return: data: numpy structured array. Names: [met, yaw, pitch, roll, quality, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] dtypes: float64: met, yaw, pitch, roll, seconds_to_previous_solution, seconds_to_next_solution int32: quality, data_source_previous, data_source_next Note: 1Hz masking of the telemetry may lead to hidden datapoints on the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values. ''' import ephemeris_library.quaternions as quaternions # Does the actual interpolation work # Tranlates the YPR to and from Quaternions. import ephemeris_library.rotation_libraries.lib_quaternion as lib_quaternion # Make a preliminary copy of the input dtype names, as they're manipulated in this function. input_columns = data.dtype.names # Preamble & sanity checks: # Make sure all column are present. _required_columns = ['met', 'yaw', 'pitch', 'roll', 'quality', 'data_source_previous', 'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution'] for _column in _required_columns: if _column not in data.dtype.names: raise ValueError("interpolate_ypr - Missing a required column in the data structure: {:s}".format(_column)) # Extract the YPR for translation into Quaternions. YPR_data = recfunctions.structured_to_unstructured(data[['yaw', 'pitch', 'roll']], dtype=float) # Recast as an unstructured array for the use by lib_quaternion library, as it does not (yet) allow a structured set. # Format is of < Yaw, Pitch, Roll >, all of floats. # Translate the YPR data into the CRF->Body quaternions. quat_data = lib_quaternion.YPR_to_Quaternion(YPR_data) # <Y,P,R> -> <r,i,j,k> Does not require MET. # the YPR->Quat function allows for 999-YPR values to be given to it, returning crap. # We need to keep the met/metadata lines for extrapolation purposes, so override any 999-YPR data with zero-quaternions quat_data[YPR_data[:, 0] > 360.] = 0 # If YAW > 360* (==999, but equivalence in floats is annoying. # Merge the Quaternions into the data variable. # First strip the Data into the minimum viable dataset, data = data[['met', 'quality', 'data_source_previous', 'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']] data = data.copy() # And make sure it's a static object, not a view. # And then add the new columns to the dataset: _names = ['r', 'i', 'j', 'k'] _types = [float, float, float, float] r, i, j, k = quat_data.T # Split into invividual columns for use below. data = recfunctions.append_fields(data, _names, [r, i, j, k], _types, usemask=False) # Re-organize the dataset for consistency. data = data[['met', 'r', 'i', 'j', 'k', 'quality', 'data_source_previous', 'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']] # Feed the intermediate 'quaternion+metadata' structure into the quaternion interpolator: interpolated_data = quaternions.interpolate_quaternions(data, time_list, ignore_dropout) # Extract the interpolated quaternions: output_quat = recfunctions.structured_to_unstructured(interpolated_data[['r', 'i', 'j', 'k']], dtype=float) output_YPR = lib_quaternion.Quaternion_to_YPR(output_quat) # <r,i,j,k> -> <Y,P,R> - Does not require MET. # And if any of the returned interpolated quaternions was a zero-quat, this was within a period of dropout or extrapolation. # Override all zero-quaternions with a YPR value of 999. output_YPR[lib_quaternion._Is_Zero_Quat(output_quat)] = 999.0 # Note the 'ignore_dropout' arg is not applied here, as it's guaranteed by the interpolator function to return # non-zero quaternions in non-extrapolated regions output_dtype = [('met', 'float64'), ('yaw', 'float64'), ('pitch', 'float64'), ('roll', 'float64'), ('quality', 'int32'), ('data_source_previous', 'int32'), ('data_source_next', 'int32'), ('seconds_to_previous_solution', 'float64'), ('seconds_to_next_solution', 'float64')] # As can't build type-safely using numpy.stack or numpy.column_stack, # build an empty structured array, then assign each array to the column. output_data = numpy.empty(len(output_YPR), dtype=output_dtype) # Then assign pointers. output_data['met'] = interpolated_data['met'] output_data['yaw'] = output_YPR[:, 0] output_data['pitch'] = output_YPR[:, 1] output_data['roll'] = output_YPR[:, 2] output_data['quality'] = interpolated_data['quality'] output_data['data_source_previous'] = interpolated_data['data_source_previous'] output_data['data_source_next'] = interpolated_data['data_source_next'] output_data['seconds_to_previous_solution'] = interpolated_data['seconds_to_previous_solution'] output_data['seconds_to_next_solution'] = interpolated_data['seconds_to_next_solution'] # And return! return output_data