Module ephemeris_library.quaternions
Expand source code
DOCUMENTATION_WARNING = """
=== DOCUMENTATION WARNING ===
=============================
All Quaternions provided in this library are described in the 'Right-Hand' notation method.
Such that: vB = qB<-A * vA = qB<-A* . [0,vA] . qB<-A == q*.v.q
Many documentation sources, such as Wikipedia's 'Quaternion' article are 'Left-Hand' notated,
Such that: vB = qB<-A * vA = qB<-A . [0,vA] . qB<-A* == q.v.q*
Note that this just means that the notation methodology just means
the quaternions are conjugates (inversions) of one another.
All rotation chaining is best shown using 'Right-Hand' arrow notation
qA->B . qB->C = qA->C # Right-Arrow notation.
qB<-A . qC<-B = qC<-A # Left-arrow notation.
All processing is done with Scalar-First quaternions. q = [r,i,j,k]
For simplest usage of the library, please only regard the 'get_pointing_dir_geo'
unit-vector output in the GEO/ITRF frame, rotating these.
"""
import ephemeris_library.database as database
import ephemeris_library.common as common
import ephemeris_library.rotation_libraries.lib_quaternion as lib_quaternion
from ephemeris_library.common import add_utc_column
import datetime
from typing import Union
import numpy
from numpy.lib import recfunctions
TELEMETRY_HOLE_OVERRIDE_THRESHOLD = 600 # Seconds. Attitude telemetry holes bound by this constant
# Pull from ephemeris_attitude_processor/./c_Attitude_Processor/AttitudeProcessor.py
def get_attitude_quaternions(start_time: Union[datetime.datetime, str, int, float],
end_time: Union[datetime.datetime, str, int, float],
minimum_quality: str = None,
detail: bool = False):
'''
Fetches the CASSIOPE 'ITRF<-CRF' quaternions.
Scalar First, right-hand notation. (vB = q*.vA.q )
Args:
start_time: int, float, or string representing met time.
datetime object or iso formatted date string.
end_time: int, float, or string representing met time.
datetime object or iso formatted date string.
minimum_quality: String which matches common.ATTITUDE_DATA_SOURCE key representing the minimum quality to return.
Default None, which returns all results without filtering.
detail: (bool) Default False. If True includes data_souce_* & seconds_to_* columns in the return.
Return:
data: A numpy structured array with columns representing the requested quaternion & metadata columns.
| Column | Type | Condition |
|:------ |:---- |:--------- |
| met | float64 | Always |
| r | float64 | Always |
| i | float64 | Always |
| j | float64 | Always |
| k | float64 | Always |
| quality | float64 | Always |
| data_source_previous | int | if detail |
| data_source_next | int | if detail |
| seconds_to_previous_solution | float64 | if detail |
| seconds_to_next_solution | float64 | if detail |
'''
# Check version
common._check_version()
# Verify time inputs.
start_time = common._verify_time(start_time)
end_time = common._verify_time(end_time)
# Verify the minimum quality.
minimum_quality_id = common._verify_quality( minimum_quality )
if end_time < start_time:
raise ValueError('Error: end_time before start_time.')
# Fetch the data!
data = _get_quaternions_from_database(start_time, end_time, minimum_quality_id, detail)
return data
def interpolate_quaternions(data,
time_list,
ignore_dropout=False):
'''
Interpolate the given attitude data into the given timestamps.
Independant of frame or handedness.
Will return the frame system and handedness as it's inputs.
Arguments:
data: numpy structured array generated by get_quaternions, using detail=True.
Structure: Must include the following:
[met, r, i, j, k, data_source_previous, data_source_next,
seconds_to_previous_solution, seconds_to_next_solution]
time_list: an time list that data is to be interpolated to.
Must be a list of MET (int/float), datetime, or ISO-8601 strings.
ignore_dropout: bool, default False.
Normally overrides any region with a telemetry droout greater than
the global constant TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions.
Setting this true disregards this limit, and interpolates through anything.
Return:
data: numpy structured array.
Names: [met, qITRF<-CRF, quality, data_source_previous, data_source_next,
seconds_to_previous_solution, seconds_to_next_solution]
dtypes:
float64: met, r, i, j, k, seconds_to_previous_solution, seconds_to_next_solution
int32: quality, data_source_previous, data_source_next
Note: 1Hz masking of the telemetry may lead to hidden datapoints on
the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
'''
# Preamble sanity checks:
# Make sure all column are present.
_required_columns = ['met','r','i','j','k','data_source_previous','data_source_next','seconds_to_previous_solution','seconds_to_next_solution']
for _column in _required_columns:
if _column not in data.dtype.names:
raise ValueError("interpolate_quaternions - Missing a required column in data: {:s}".format(_column))
# And pre-process the saught timestamps into MET.
time_list = numpy.array([common._verify_time(time) for time in time_list])
# Quick sanity checks:
# If interpolating to the same timestamps as the input, just return the input dataset.
if len( data ) == len( time_list ) and numpy.all( data['met'] == time_list ) and ignore_dropout == False:
return data
# And if given an empty dataset, return a null dataset.
if len( data ) == 0:
empty_dataset = numpy.array( [0] * len(time_list), dtype = data.dtype )
empty_dataset['met'] = time_list
empty_dataset['seconds_to_previous_solution'] = numpy.inf
empty_dataset['seconds_to_next_solution'] = numpy.inf
return empty_dataset
# Extract the Quaternion component.
quats = recfunctions.structured_to_unstructured(data[['met', 'r', 'i', 'j', 'k']], dtype=float)
# Recast as an unstructured array for the use by lib_quaternion library, as it yet does not allow a structured set.
# Format is of < MET, qR, qI, qJ, qK >, all of floats.
# And remove the quaternions, as well as the derived 'quality' column, from the main dataset.
data = data[['met', 'data_source_previous', 'data_source_next',
'seconds_to_previous_solution', 'seconds_to_next_solution']]
###########################################
###########################################
# Pre-processing:
# Remove any zero-quaternions to allow for clean interpolation.
_nonzero_mask = ~lib_quaternion._Is_Zero_Quat(quats)
data = data [_nonzero_mask]
quats = quats[_nonzero_mask]
#####################################
#####################################
# Derive the 'Definitive' metadata from the input data information.
# as to allow for re-derivation of the time-to & quality entries.
definitive_prev_met = data['met'] - data['seconds_to_previous_solution']
definitive_prev_src = data['data_source_previous']
definitive_next_met = data['met'] + data['seconds_to_next_solution']
definitive_next_src = data['data_source_next']
# Merge the lists
definitive_met = numpy.concatenate((definitive_prev_met, definitive_next_met))
definitive_src = numpy.concatenate((definitive_prev_src, definitive_next_src))
# Sort both by time.
_sort_order = definitive_met.argsort()
definitive_met = definitive_met[_sort_order]
definitive_src = definitive_src[_sort_order]
# Remove any duplicate solutions, use dMET <= 0.2s, as that's the minimum cadence before either merging or filtering on the initial generation step.
_same_time = numpy.diff(definitive_met) <= 0.2
_same_type = numpy.diff(definitive_src) == 0
_valid_mask = ~numpy.logical_and(_same_time, _same_type) # Note the NOT prefix.
# And prepend a 'True' as to match the array sizes and keep the first solution.
_valid_mask = numpy.concatenate(([True], _valid_mask))
# And apply the mask.
definitive_met = definitive_met[_valid_mask]
definitive_src = definitive_src[_valid_mask]
# And finally, remove any solutions that are the start/end of a telemetry hole (src=0)
# This Shouldn't hit, as the zero-quats are removed above, but this is to just make sure.
if ignore_dropout:
_dropout_mask = definitive_src == 0
definitive_met = definitive_met[~_dropout_mask]
definitive_src = definitive_src[~_dropout_mask]
# And that's a list of all 'definitive' solution timestamps within the published telemetry!
# Barring any hidden masks, of course.
###########################################
###########################################
# Split the requested interpolation points to allow for extrapolation / use of bad data.
# Interpolation requires at least two entires.
if len( quats ) > 2: # And any valid times must be between two valid given times.
_interpolation_mask = numpy.logical_and( numpy.min( data['met'] ) <= time_list, time_list <= numpy.max( data['met'] ) )
else: # If there's 1 or 0 points, everything will be extrapolated.
_interpolation_mask = numpy.zeros( len( time_list ), dtype=bool )
time_list_interpolation = time_list[ _interpolation_mask ]
time_list_extrapolation = time_list[~_interpolation_mask ]
###########################################
###########################################
# The quaternion interpolation itself is easy enough.
input_met = quats[:, 0 ]
input_quat = quats[:, 1:]
met_interpolation = time_list_interpolation
quat_interpolation = lib_quaternion.Quaternion_Interpolation(input_met, input_quat, met_interpolation)
# Returns an unstructured object of <qR, qI, qJ, qK>
# Then create a list of zero-quaternions for the extrapolated dataset.
met_extrapolation = time_list_extrapolation
quat_extrapolation = lib_quaternion.qZero( len( met_extrapolation ))
# And merge the arrays
output_met = numpy.concatenate(( met_interpolation, met_extrapolation ))
output_quat = numpy.concatenate(( quat_interpolation, quat_extrapolation ))
# And sort by the timestamp.
_sort_order = output_met.argsort()
output_met = output_met [_sort_order]
output_quat = output_quat[_sort_order]
###########################################
###########################################
# And now using the output dataset, derive the Time-To & Data_Source entries:
# Pre-populate solutions for overriding.
# Time & Source entries set at +/- infinity, and zeroes.
met_prev, src_prev = numpy.full(len(output_met), -numpy.inf), numpy.zeros(len(output_met))
met_next, src_next = numpy.full(len(output_met), numpy.inf), numpy.zeros(len(output_met))
# Use numpy to find the 'Definitive' solutions prior & following to each requested timestamp.
_insert_indecies_left = numpy.searchsorted(definitive_met, output_met, 'left') # If output_met is in definitive_met, returns the index of the definitive that matches. 1-> [0,1,2] = 1
_insert_indecies_right = numpy.searchsorted(definitive_met, output_met, 'right') # If output_met is in definitive_met, returns the following index. 1-> [0,1,2] = 2
# If met_interpolation is NOT in definitive_met, will both return the same index: 1.5 -> [0,1,2] = 2
# If the insert points are do not match, then the requested 'output' met is in the Definitive list.
_output_in_definitive_mask = _insert_indecies_left != _insert_indecies_right
# If both are ==0 , then the solution is prior to the first definitive point.
# If both are ==len(x), then the solution is after the last definitive solution.
_extrapolation_prior = numpy.logical_and( _insert_indecies_left == 0, _insert_indecies_right == 0 )
_extrapolation_after = numpy.logical_and( _insert_indecies_left == len(definitive_met), _insert_indecies_right == len(definitive_met) )
# And if not extrapolating in either direction, and not already in the Definitive list, then have to override both next & prior metadata.
_interpolation_mask = ~ numpy.logical_or.reduce(( _extrapolation_prior, _extrapolation_after, _output_in_definitive_mask ))
_has_next_sol_mask = numpy.logical_or( _extrapolation_prior, _interpolation_mask )
_has_prev_sol_mask = numpy.logical_or( _extrapolation_after, _interpolation_mask )
# There's some weird referencing/np-views here, so make a copy of the relevant arrays just for sanity.
met_prev, src_prev = met_prev.copy(), src_prev.copy()
met_next, src_next = met_next.copy(), src_next.copy()
# Set the prev/next sols to their corresponding definitive solutions, if needed
if numpy.any(_output_in_definitive_mask):
met_prev[_output_in_definitive_mask] = met_next[_output_in_definitive_mask] = definitive_met[_insert_indecies_left[_output_in_definitive_mask]]
src_prev[_output_in_definitive_mask] = src_next[_output_in_definitive_mask] = definitive_src[_insert_indecies_left[_output_in_definitive_mask]]
if numpy.any( _has_prev_sol_mask ):
met_prev[_has_prev_sol_mask] = definitive_met[_insert_indecies_left[_has_prev_sol_mask]-1]
src_prev[_has_prev_sol_mask] = definitive_src[_insert_indecies_left[_has_prev_sol_mask]-1]
if numpy.any( _has_next_sol_mask ):
met_next[_has_next_sol_mask] = definitive_met[_insert_indecies_left[_has_next_sol_mask]]
src_next[_has_next_sol_mask] = definitive_src[_insert_indecies_left[_has_next_sol_mask]]
# Then the absolute MET timestamps to the Time-To-Next/Prev time-deltas.
time_to_next_solution = met_next - output_met # Guaranteed positive. May be infininte due to using np.inf.
time_to_prev_solution = output_met - met_prev # Guaranteed positive. May be infininte due to using np.inf.
# And rename the Source for consistency.
data_source_next_solution = src_next
data_source_prev_solution = src_prev
##############################
##############################
# For each of the Interpolation points, rederive the 'Quality' flag.
# Time-to solutions are just the absolute delta between the times & the saught times.
TT_P = time_to_prev_solution
TT_N = time_to_next_solution
DS_P = data_source_prev_solution
DS_N = data_source_next_solution
TBT = time_between_solutions = TT_P + TT_N
TBT[ numpy.isnan( TBT ) ] = numpy.inf # Inf-Inf = NaN, so override.
# And then re-derive the quality column
# If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 10s, then fine solution.
_qual_fine = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 0 <= TBT, TBT < 10])
# If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 30s, then Moderate solution.
_qual_mod = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 10 <= TBT, TBT < 30])
# If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 120, then Coarse solution.
_qual_coarse = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 30 <= TBT, TBT < 120])
# If Star-Tracker, but a massive spline, mark it as bad as CSS-based.
_qual_awful = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 120 <= TBT])
# If either Solution is CSS, then it's a CSS solution.
_qual_css = numpy.logical_or(DS_P == 2, DS_N == 2)
# If either Solution is ADM-2, then it's a CSS solution.
_qual_adm2 = numpy.logical_or(DS_P == 1, DS_N == 1)
# And if either sol is a dropout mask, or the interpolation is longer than the global threshold value, then mark it as a dropout.
_qual_drop = numpy.logical_or.reduce(( DS_P == 0, DS_N == 0, TBT >= TELEMETRY_HOLE_OVERRIDE_THRESHOLD ))
output_quality = numpy.zeros(len( output_met ), dtype=int) + 10
output_quality[ _qual_fine ] = 4
output_quality[ _qual_mod ] = 3
output_quality[ _qual_coarse] = 2
output_quality[ _qual_awful ] = 1
output_quality[ _qual_css ] = 1
output_quality[ _qual_adm2 ] = 1
output_quality[ _qual_drop ] = 0
###########
# If not forcing interpolation through the dropout regions, override the quaternions with zero-quaternions, if needed.
if not ignore_dropout:
output_quat[ output_quality == 0 ] = 0
#######################################
#######################################
# And finally, rebuild the structured array.
# Data format: [met, r, i, j, k, quality, data_source_previous, data_source_next,
# seconds_to_previous_solution, seconds_to_next_solution]
output_dtype = [('met', 'float64'),
('r', 'float64'),
('i', 'float64'),
('j', 'float64'),
('k', 'float64'),
('quality', 'int32'),
('data_source_previous', 'int32'),
('data_source_next', 'int32'),
('seconds_to_previous_solution', 'float64'),
('seconds_to_next_solution', 'float64')]
# As can't build type-safely using numpy.stack or numpy.column_stack,
# build an empty structured array, then assign each array to the column.
output_data = numpy.empty(len(output_met), dtype=output_dtype)
# Then assign pointers.
output_data['met'] = output_met
output_data['r'] = output_quat[:, 0]
output_data['i'] = output_quat[:, 1]
output_data['j'] = output_quat[:, 2]
output_data['k'] = output_quat[:, 3]
output_data['quality'] = output_quality
output_data['data_source_previous'] = data_source_prev_solution
output_data['data_source_next'] = data_source_next_solution
output_data['seconds_to_previous_solution'] = time_to_prev_solution
output_data['seconds_to_next_solution'] = time_to_next_solution
# And to make sure it's returning a new copy, not a view, test if it's not a unique base:
if output_data.base is not None:
output_data = output_data.copy()
return output_data
def get_pointing_dir_geo(data,
vector: Union[str, list]):
'''
Given a quaternion dataset, append columns to indicate the spacecraft CRF/Body vectors
in the GEO/ITRF frame.
Only considers the 'r','i','j','k' columns pulled from get_quaternions/interpolate_quaternions.
Given quaternions -MUST- be in the 'ITRF<-CRF' frame-rotation as published.
Zero-quaternions are returned as 'NaN'
Available Default CRF/Body Vectors:
x = +x = Ram = [1,0,0]
y = +y = Starboard = [0,1,0]
z = +y = Baseplate = [0,0,1]
-x = Anti-ram = [-1,0,0]
-y = Port = [0,-1,0]
-z = Main Panel = [0,0,-1]
Args:
data: numpy structured array generated by get_quaternions/interpolate_quaternions.
Structure: Must include the following: [r, i, j, k]
body_vector: String or len-3 list/tuple of floats.
String must be bound to a body-vector, which specify a 3-tuple body-vector.
List/Tuple must be of 3 floats, which is vector-normalized, and used as a body-vector
Returns:
data: numpy structured array.
Appends to the given data object.
Names: [geo_x, geo_y, geo_z]
dtypes: [float, float, float]
Units: [ Arbitrary unit vector in GEO frame ]
'''
# Parse the body-vector into a sane unit vector.
crf_vector = _parse_body_vector( vector )
# Make sure the given list contains all required columns.
if any([column not in data.dtype.names for column in ['r', 'i', 'j', 'k']]):
raise ValueError("Given data object does not contain at least one of the requisite columns: 'r','i','j','k'")
# Remove any of the output frames in the structured array
data = data[[ col for col in data.dtype.names if col not in ['geo_x', 'geo_y', 'geo_z']]]
# Extract the quaternions for simpler processing.
# Extract the unstructured array
qITRF_from_CRF = recfunctions.structured_to_unstructured( data[['r', 'i', 'j', 'k']] ) # qITRF<-CRF, Scalar-First, Right-Hand notation.
# Pre-populate a body frame using the given vector.
crf_vector = numpy.full((len( qITRF_from_CRF ), 3), crf_vector) # Nx3 repeating the crf_vector
# Then rotate each GEO/ITRF basis vector into the Body frame.
# If given a Zero-quaternion, will return a Zero-vector. [0,0,0]
geo_vector = lib_quaternion.Vector_Rotate( qITRF_from_CRF , crf_vector )
# And then override any rows generated with zero-quaternions with NaN values.
_Is_Zero_Quat = lib_quaternion._Is_Zero_Quat( qITRF_from_CRF )
geo_vector[_Is_Zero_Quat] = numpy.nan
# And then split the columns for ingest by recfunctions
geo_x, geo_y, geo_z = numpy.hsplit(geo_vector, 3)
# And then append the new vectors to the given dataset.
_names = ['geo_x', 'geo_y', 'geo_z']
_type = float
data = recfunctions.append_fields(data, _names, [geo_x, geo_y, geo_z], dtypes=_type, usemask=False)
return data
def add_string_columns( data: numpy.ndarray, use_short_names=False ):
'''
Adds string-columns to the given quaternion dataset.
Arguments:
data: numpy structured array generated by get_attitude_quaternions.
Must include at least one of ['quality','data_source_previous', 'data_source_next']
use_short_names: Optional Bool. Defaults False.
If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT.
if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.
Return:
data: numpy structured array.
Appends the following columns if given.
'quality_string'
'data_source_previous_string'
'data_source_next_string'
dtypes are Unicode-32
'''
if 'quality' in data.dtype.names:
if use_short_names:
data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT )
else:
data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL )
if 'data_source_previous' in data.dtype.names:
data = common.add_string_column( data, 'data_source_previous', 'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING )
if 'data_source_next' in data.dtype.names:
data = common.add_string_column( data, 'data_source_next', 'data_source_next_string', common.ATTITUDE_DATA_SOURCE_STRING )
return data
def _get_quaternions_from_database(start_met: float,
end_met: float,
minimum_quality: int = 0,
detail: bool = False):
'''
Fetch attitude quaternions and metadata from database.
Quaternions are of the 'ITRF<-CRF' rotation. (Right-hand notation, Scalar-First)
Arguments:
start_met: float representing met time.
end_met: float representing met time.
minimum_quality: int. Default 0. Minimum quality to return from the database.
detail: bool. Default False. Extends returned structure with attitude metadata.
Return:
numpy structured array: [ names=(met, r, i, j, k, quality),
dtype=(float64, float64, float64, float64, float64, int32)]
If detail: [names+=(data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution),
dtype+=(int32, int32, float64, float64)]
'''
fields = ['met', 'r', 'i', 'j', 'k', 'quality']
if detail:
fields += ['data_source_previous', 'data_source_next',
'seconds_to_previous_solution', 'seconds_to_next_solution']
database_fields = ['`' + f + '`' for f in fields] # Pre- and append the strings with backticks for use in MySQL.
# connect to database and fetch data
statement = ('SELECT ' + ', '.join(database_fields)
+ ' FROM `cassiope_ephemeris`.`quaternion`'
+ ' WHERE `met` BETWEEN ' + str(start_met) + ' AND ' + str(end_met)
+ ' AND `quality` >= ' + str(minimum_quality)
+ ' ORDER BY `met` ASC')
connection = database._connect_to_database()
cursor = connection.cursor()
cursor.execute(statement)
data = cursor.fetchall()
cursor.close()
connection.close()
dtype_list = [('met', 'float64'),
('r', 'float64' ),
('i', 'float64' ),
('j', 'float64' ),
('k', 'float64' ),
('quality', 'int32') ]
if detail:
dtype_list += [('data_source_previous', 'int32'),
('data_source_next', 'int32'),
('seconds_to_previous_solution', 'float64'),
('seconds_to_next_solution', 'float64')]
results = numpy.array( data, dtype=dtype_list )
return results
def _parse_body_vector(body_vector):
'''
Input:
body_vector: string, tuple/list/nd.array.
If Str: Must decode as a body vector string.
else: Must map to a valid vector in the body-frame.
Returns:
list: a len-3 list containing a normalized vector in the body-frame.
'''
if isinstance(body_vector, (list, tuple, numpy.ndarray)):
if len(body_vector) != 3:
raise ValueError("Given a body_vector list of length != 3!")
# Check that the contents are valid.
try:
_temp = [float(val) for val in body_vector]
except Exception as E:
raise ValueError("Could not parse the contents of the body_vector list as floats!") from E
# Got here, then it's a valid 3-tuple body vector.
# Normalize, return it.
x, y, z = body_vector
vector_len = (x**2 + y**2 + z**2)**0.5
# If given zeros, then raise an exception
if vector_len < 1e-10:
raise ValueError("Can't parse a body_vector of zeros!")
# Otherwise, return the normalzied vector:
x /= vector_len
y /= vector_len
z /= vector_len
return [x, y, z]
# If not bound in a list, test if it falls under a default string.
if isinstance(body_vector, str):
if body_vector.lower() in ['x', '+x', 'ram']:
return [1., 0, 0]
if body_vector.lower() in ['y', '+y', 'starboard']:
return [0, 1., 0]
if body_vector.lower() in ['z', '+z', 'baseplate']:
return [0, 0, 1.]
if body_vector.lower() in ['-x', 'anti-ram']:
return [-1., 0, 0]
if body_vector.lower() in ['-y', 'port']:
return [0, -1., 0]
if body_vector.lower() in ['-z', 'main panel']:
return [0, 0, -1.]
# And if got here, then didn't recognize what was given.
raise ValueError("Could not recognize the given input as a valid body_vector!")
Functions
def add_string_columns(data: numpy.ndarray, use_short_names=False)
-
Adds string-columns to the given quaternion dataset.
Arguments
data: numpy structured array generated by get_attitude_quaternions. Must include at least one of ['quality','data_source_previous', 'data_source_next']
use_short_names: Optional Bool. Defaults False. If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT. if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.
Return
data: numpy structured array. Appends the following columns if given. 'quality_string' 'data_source_previous_string' 'data_source_next_string'
dtypes are Unicode-32
Expand source code
def add_string_columns( data: numpy.ndarray, use_short_names=False ): ''' Adds string-columns to the given quaternion dataset. Arguments: data: numpy structured array generated by get_attitude_quaternions. Must include at least one of ['quality','data_source_previous', 'data_source_next'] use_short_names: Optional Bool. Defaults False. If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT. if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL. Return: data: numpy structured array. Appends the following columns if given. 'quality_string' 'data_source_previous_string' 'data_source_next_string' dtypes are Unicode-32 ''' if 'quality' in data.dtype.names: if use_short_names: data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT ) else: data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL ) if 'data_source_previous' in data.dtype.names: data = common.add_string_column( data, 'data_source_previous', 'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING ) if 'data_source_next' in data.dtype.names: data = common.add_string_column( data, 'data_source_next', 'data_source_next_string', common.ATTITUDE_DATA_SOURCE_STRING ) return data
def get_attitude_quaternions(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], minimum_quality: str = None, detail: bool = False)
-
Fetches the CASSIOPE 'ITRF<-CRF' quaternions. Scalar First, right-hand notation. (vB = q*.vA.q )
Args
start_time
- int, float, or string representing met time. datetime object or iso formatted date string.
end_time
- int, float, or string representing met time. datetime object or iso formatted date string.
minimum_quality
- String which matches common.ATTITUDE_DATA_SOURCE key representing the minimum quality to return. Default None, which returns all results without filtering.
detail
- (bool) Default False. If True includes data_souce_ & seconds_to_ columns in the return.
Return
data: A numpy structured array with columns representing the requested quaternion & metadata columns.
Column Type Condition met float64 Always r float64 Always i float64 Always j float64 Always k float64 Always quality float64 Always data_source_previous int if detail data_source_next int if detail seconds_to_previous_solution float64 if detail seconds_to_next_solution float64 if detail Expand source code
def get_attitude_quaternions(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], minimum_quality: str = None, detail: bool = False): ''' Fetches the CASSIOPE 'ITRF<-CRF' quaternions. Scalar First, right-hand notation. (vB = q*.vA.q ) Args: start_time: int, float, or string representing met time. datetime object or iso formatted date string. end_time: int, float, or string representing met time. datetime object or iso formatted date string. minimum_quality: String which matches common.ATTITUDE_DATA_SOURCE key representing the minimum quality to return. Default None, which returns all results without filtering. detail: (bool) Default False. If True includes data_souce_* & seconds_to_* columns in the return. Return: data: A numpy structured array with columns representing the requested quaternion & metadata columns. | Column | Type | Condition | |:------ |:---- |:--------- | | met | float64 | Always | | r | float64 | Always | | i | float64 | Always | | j | float64 | Always | | k | float64 | Always | | quality | float64 | Always | | data_source_previous | int | if detail | | data_source_next | int | if detail | | seconds_to_previous_solution | float64 | if detail | | seconds_to_next_solution | float64 | if detail | ''' # Check version common._check_version() # Verify time inputs. start_time = common._verify_time(start_time) end_time = common._verify_time(end_time) # Verify the minimum quality. minimum_quality_id = common._verify_quality( minimum_quality ) if end_time < start_time: raise ValueError('Error: end_time before start_time.') # Fetch the data! data = _get_quaternions_from_database(start_time, end_time, minimum_quality_id, detail) return data
def get_pointing_dir_geo(data, vector: Union[str, list])
-
Given a quaternion dataset, append columns to indicate the spacecraft CRF/Body vectors in the GEO/ITRF frame. Only considers the 'r','i','j','k' columns pulled from get_quaternions/interpolate_quaternions. Given quaternions -MUST- be in the 'ITRF<-CRF' frame-rotation as published. Zero-quaternions are returned as 'NaN'
Available Default CRF/Body Vectors: x = +x = Ram = [1,0,0] y = +y = Starboard = [0,1,0] z = +y = Baseplate = [0,0,1] -x = Anti-ram = [-1,0,0] -y = Port = [0,-1,0] -z = Main Panel = [0,0,-1]
Args
data
- numpy structured array generated by get_quaternions/interpolate_quaternions. Structure: Must include the following: [r, i, j, k]
body_vector
- String or len-3 list/tuple of floats. String must be bound to a body-vector, which specify a 3-tuple body-vector. List/Tuple must be of 3 floats, which is vector-normalized, and used as a body-vector
Returns
data
-
numpy structured array. Appends to the given data object.
Names: [geo_x, geo_y, geo_z] dtypes: [float, float, float] Units: [ Arbitrary unit vector in GEO frame ]
Expand source code
def get_pointing_dir_geo(data, vector: Union[str, list]): ''' Given a quaternion dataset, append columns to indicate the spacecraft CRF/Body vectors in the GEO/ITRF frame. Only considers the 'r','i','j','k' columns pulled from get_quaternions/interpolate_quaternions. Given quaternions -MUST- be in the 'ITRF<-CRF' frame-rotation as published. Zero-quaternions are returned as 'NaN' Available Default CRF/Body Vectors: x = +x = Ram = [1,0,0] y = +y = Starboard = [0,1,0] z = +y = Baseplate = [0,0,1] -x = Anti-ram = [-1,0,0] -y = Port = [0,-1,0] -z = Main Panel = [0,0,-1] Args: data: numpy structured array generated by get_quaternions/interpolate_quaternions. Structure: Must include the following: [r, i, j, k] body_vector: String or len-3 list/tuple of floats. String must be bound to a body-vector, which specify a 3-tuple body-vector. List/Tuple must be of 3 floats, which is vector-normalized, and used as a body-vector Returns: data: numpy structured array. Appends to the given data object. Names: [geo_x, geo_y, geo_z] dtypes: [float, float, float] Units: [ Arbitrary unit vector in GEO frame ] ''' # Parse the body-vector into a sane unit vector. crf_vector = _parse_body_vector( vector ) # Make sure the given list contains all required columns. if any([column not in data.dtype.names for column in ['r', 'i', 'j', 'k']]): raise ValueError("Given data object does not contain at least one of the requisite columns: 'r','i','j','k'") # Remove any of the output frames in the structured array data = data[[ col for col in data.dtype.names if col not in ['geo_x', 'geo_y', 'geo_z']]] # Extract the quaternions for simpler processing. # Extract the unstructured array qITRF_from_CRF = recfunctions.structured_to_unstructured( data[['r', 'i', 'j', 'k']] ) # qITRF<-CRF, Scalar-First, Right-Hand notation. # Pre-populate a body frame using the given vector. crf_vector = numpy.full((len( qITRF_from_CRF ), 3), crf_vector) # Nx3 repeating the crf_vector # Then rotate each GEO/ITRF basis vector into the Body frame. # If given a Zero-quaternion, will return a Zero-vector. [0,0,0] geo_vector = lib_quaternion.Vector_Rotate( qITRF_from_CRF , crf_vector ) # And then override any rows generated with zero-quaternions with NaN values. _Is_Zero_Quat = lib_quaternion._Is_Zero_Quat( qITRF_from_CRF ) geo_vector[_Is_Zero_Quat] = numpy.nan # And then split the columns for ingest by recfunctions geo_x, geo_y, geo_z = numpy.hsplit(geo_vector, 3) # And then append the new vectors to the given dataset. _names = ['geo_x', 'geo_y', 'geo_z'] _type = float data = recfunctions.append_fields(data, _names, [geo_x, geo_y, geo_z], dtypes=_type, usemask=False) return data
def interpolate_quaternions(data, time_list, ignore_dropout=False)
-
Interpolate the given attitude data into the given timestamps. Independant of frame or handedness. Will return the frame system and handedness as it's inputs.
Arguments
data: numpy structured array generated by get_quaternions, using detail=True. Structure: Must include the following: [met, r, i, j, k, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution]
time_list: an time list that data is to be interpolated to. Must be a list of MET (int/float), datetime, or ISO-8601 strings.
ignore_dropout: bool, default False. Normally overrides any region with a telemetry droout greater than the global constant TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions. Setting this true disregards this limit, and interpolates through anything.
Return
data: numpy structured array. Names: [met, qITRF<-CRF, quality, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] dtypes: float64: met, r, i, j, k, seconds_to_previous_solution, seconds_to_next_solution int32: quality, data_source_previous, data_source_next
Note: 1Hz masking of the telemetry may lead to hidden datapoints on the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
Expand source code
def interpolate_quaternions(data, time_list, ignore_dropout=False): ''' Interpolate the given attitude data into the given timestamps. Independant of frame or handedness. Will return the frame system and handedness as it's inputs. Arguments: data: numpy structured array generated by get_quaternions, using detail=True. Structure: Must include the following: [met, r, i, j, k, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] time_list: an time list that data is to be interpolated to. Must be a list of MET (int/float), datetime, or ISO-8601 strings. ignore_dropout: bool, default False. Normally overrides any region with a telemetry droout greater than the global constant TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions. Setting this true disregards this limit, and interpolates through anything. Return: data: numpy structured array. Names: [met, qITRF<-CRF, quality, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] dtypes: float64: met, r, i, j, k, seconds_to_previous_solution, seconds_to_next_solution int32: quality, data_source_previous, data_source_next Note: 1Hz masking of the telemetry may lead to hidden datapoints on the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values. ''' # Preamble sanity checks: # Make sure all column are present. _required_columns = ['met','r','i','j','k','data_source_previous','data_source_next','seconds_to_previous_solution','seconds_to_next_solution'] for _column in _required_columns: if _column not in data.dtype.names: raise ValueError("interpolate_quaternions - Missing a required column in data: {:s}".format(_column)) # And pre-process the saught timestamps into MET. time_list = numpy.array([common._verify_time(time) for time in time_list]) # Quick sanity checks: # If interpolating to the same timestamps as the input, just return the input dataset. if len( data ) == len( time_list ) and numpy.all( data['met'] == time_list ) and ignore_dropout == False: return data # And if given an empty dataset, return a null dataset. if len( data ) == 0: empty_dataset = numpy.array( [0] * len(time_list), dtype = data.dtype ) empty_dataset['met'] = time_list empty_dataset['seconds_to_previous_solution'] = numpy.inf empty_dataset['seconds_to_next_solution'] = numpy.inf return empty_dataset # Extract the Quaternion component. quats = recfunctions.structured_to_unstructured(data[['met', 'r', 'i', 'j', 'k']], dtype=float) # Recast as an unstructured array for the use by lib_quaternion library, as it yet does not allow a structured set. # Format is of < MET, qR, qI, qJ, qK >, all of floats. # And remove the quaternions, as well as the derived 'quality' column, from the main dataset. data = data[['met', 'data_source_previous', 'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']] ########################################### ########################################### # Pre-processing: # Remove any zero-quaternions to allow for clean interpolation. _nonzero_mask = ~lib_quaternion._Is_Zero_Quat(quats) data = data [_nonzero_mask] quats = quats[_nonzero_mask] ##################################### ##################################### # Derive the 'Definitive' metadata from the input data information. # as to allow for re-derivation of the time-to & quality entries. definitive_prev_met = data['met'] - data['seconds_to_previous_solution'] definitive_prev_src = data['data_source_previous'] definitive_next_met = data['met'] + data['seconds_to_next_solution'] definitive_next_src = data['data_source_next'] # Merge the lists definitive_met = numpy.concatenate((definitive_prev_met, definitive_next_met)) definitive_src = numpy.concatenate((definitive_prev_src, definitive_next_src)) # Sort both by time. _sort_order = definitive_met.argsort() definitive_met = definitive_met[_sort_order] definitive_src = definitive_src[_sort_order] # Remove any duplicate solutions, use dMET <= 0.2s, as that's the minimum cadence before either merging or filtering on the initial generation step. _same_time = numpy.diff(definitive_met) <= 0.2 _same_type = numpy.diff(definitive_src) == 0 _valid_mask = ~numpy.logical_and(_same_time, _same_type) # Note the NOT prefix. # And prepend a 'True' as to match the array sizes and keep the first solution. _valid_mask = numpy.concatenate(([True], _valid_mask)) # And apply the mask. definitive_met = definitive_met[_valid_mask] definitive_src = definitive_src[_valid_mask] # And finally, remove any solutions that are the start/end of a telemetry hole (src=0) # This Shouldn't hit, as the zero-quats are removed above, but this is to just make sure. if ignore_dropout: _dropout_mask = definitive_src == 0 definitive_met = definitive_met[~_dropout_mask] definitive_src = definitive_src[~_dropout_mask] # And that's a list of all 'definitive' solution timestamps within the published telemetry! # Barring any hidden masks, of course. ########################################### ########################################### # Split the requested interpolation points to allow for extrapolation / use of bad data. # Interpolation requires at least two entires. if len( quats ) > 2: # And any valid times must be between two valid given times. _interpolation_mask = numpy.logical_and( numpy.min( data['met'] ) <= time_list, time_list <= numpy.max( data['met'] ) ) else: # If there's 1 or 0 points, everything will be extrapolated. _interpolation_mask = numpy.zeros( len( time_list ), dtype=bool ) time_list_interpolation = time_list[ _interpolation_mask ] time_list_extrapolation = time_list[~_interpolation_mask ] ########################################### ########################################### # The quaternion interpolation itself is easy enough. input_met = quats[:, 0 ] input_quat = quats[:, 1:] met_interpolation = time_list_interpolation quat_interpolation = lib_quaternion.Quaternion_Interpolation(input_met, input_quat, met_interpolation) # Returns an unstructured object of <qR, qI, qJ, qK> # Then create a list of zero-quaternions for the extrapolated dataset. met_extrapolation = time_list_extrapolation quat_extrapolation = lib_quaternion.qZero( len( met_extrapolation )) # And merge the arrays output_met = numpy.concatenate(( met_interpolation, met_extrapolation )) output_quat = numpy.concatenate(( quat_interpolation, quat_extrapolation )) # And sort by the timestamp. _sort_order = output_met.argsort() output_met = output_met [_sort_order] output_quat = output_quat[_sort_order] ########################################### ########################################### # And now using the output dataset, derive the Time-To & Data_Source entries: # Pre-populate solutions for overriding. # Time & Source entries set at +/- infinity, and zeroes. met_prev, src_prev = numpy.full(len(output_met), -numpy.inf), numpy.zeros(len(output_met)) met_next, src_next = numpy.full(len(output_met), numpy.inf), numpy.zeros(len(output_met)) # Use numpy to find the 'Definitive' solutions prior & following to each requested timestamp. _insert_indecies_left = numpy.searchsorted(definitive_met, output_met, 'left') # If output_met is in definitive_met, returns the index of the definitive that matches. 1-> [0,1,2] = 1 _insert_indecies_right = numpy.searchsorted(definitive_met, output_met, 'right') # If output_met is in definitive_met, returns the following index. 1-> [0,1,2] = 2 # If met_interpolation is NOT in definitive_met, will both return the same index: 1.5 -> [0,1,2] = 2 # If the insert points are do not match, then the requested 'output' met is in the Definitive list. _output_in_definitive_mask = _insert_indecies_left != _insert_indecies_right # If both are ==0 , then the solution is prior to the first definitive point. # If both are ==len(x), then the solution is after the last definitive solution. _extrapolation_prior = numpy.logical_and( _insert_indecies_left == 0, _insert_indecies_right == 0 ) _extrapolation_after = numpy.logical_and( _insert_indecies_left == len(definitive_met), _insert_indecies_right == len(definitive_met) ) # And if not extrapolating in either direction, and not already in the Definitive list, then have to override both next & prior metadata. _interpolation_mask = ~ numpy.logical_or.reduce(( _extrapolation_prior, _extrapolation_after, _output_in_definitive_mask )) _has_next_sol_mask = numpy.logical_or( _extrapolation_prior, _interpolation_mask ) _has_prev_sol_mask = numpy.logical_or( _extrapolation_after, _interpolation_mask ) # There's some weird referencing/np-views here, so make a copy of the relevant arrays just for sanity. met_prev, src_prev = met_prev.copy(), src_prev.copy() met_next, src_next = met_next.copy(), src_next.copy() # Set the prev/next sols to their corresponding definitive solutions, if needed if numpy.any(_output_in_definitive_mask): met_prev[_output_in_definitive_mask] = met_next[_output_in_definitive_mask] = definitive_met[_insert_indecies_left[_output_in_definitive_mask]] src_prev[_output_in_definitive_mask] = src_next[_output_in_definitive_mask] = definitive_src[_insert_indecies_left[_output_in_definitive_mask]] if numpy.any( _has_prev_sol_mask ): met_prev[_has_prev_sol_mask] = definitive_met[_insert_indecies_left[_has_prev_sol_mask]-1] src_prev[_has_prev_sol_mask] = definitive_src[_insert_indecies_left[_has_prev_sol_mask]-1] if numpy.any( _has_next_sol_mask ): met_next[_has_next_sol_mask] = definitive_met[_insert_indecies_left[_has_next_sol_mask]] src_next[_has_next_sol_mask] = definitive_src[_insert_indecies_left[_has_next_sol_mask]] # Then the absolute MET timestamps to the Time-To-Next/Prev time-deltas. time_to_next_solution = met_next - output_met # Guaranteed positive. May be infininte due to using np.inf. time_to_prev_solution = output_met - met_prev # Guaranteed positive. May be infininte due to using np.inf. # And rename the Source for consistency. data_source_next_solution = src_next data_source_prev_solution = src_prev ############################## ############################## # For each of the Interpolation points, rederive the 'Quality' flag. # Time-to solutions are just the absolute delta between the times & the saught times. TT_P = time_to_prev_solution TT_N = time_to_next_solution DS_P = data_source_prev_solution DS_N = data_source_next_solution TBT = time_between_solutions = TT_P + TT_N TBT[ numpy.isnan( TBT ) ] = numpy.inf # Inf-Inf = NaN, so override. # And then re-derive the quality column # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 10s, then fine solution. _qual_fine = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 0 <= TBT, TBT < 10]) # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 30s, then Moderate solution. _qual_mod = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 10 <= TBT, TBT < 30]) # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 120, then Coarse solution. _qual_coarse = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 30 <= TBT, TBT < 120]) # If Star-Tracker, but a massive spline, mark it as bad as CSS-based. _qual_awful = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 120 <= TBT]) # If either Solution is CSS, then it's a CSS solution. _qual_css = numpy.logical_or(DS_P == 2, DS_N == 2) # If either Solution is ADM-2, then it's a CSS solution. _qual_adm2 = numpy.logical_or(DS_P == 1, DS_N == 1) # And if either sol is a dropout mask, or the interpolation is longer than the global threshold value, then mark it as a dropout. _qual_drop = numpy.logical_or.reduce(( DS_P == 0, DS_N == 0, TBT >= TELEMETRY_HOLE_OVERRIDE_THRESHOLD )) output_quality = numpy.zeros(len( output_met ), dtype=int) + 10 output_quality[ _qual_fine ] = 4 output_quality[ _qual_mod ] = 3 output_quality[ _qual_coarse] = 2 output_quality[ _qual_awful ] = 1 output_quality[ _qual_css ] = 1 output_quality[ _qual_adm2 ] = 1 output_quality[ _qual_drop ] = 0 ########### # If not forcing interpolation through the dropout regions, override the quaternions with zero-quaternions, if needed. if not ignore_dropout: output_quat[ output_quality == 0 ] = 0 ####################################### ####################################### # And finally, rebuild the structured array. # Data format: [met, r, i, j, k, quality, data_source_previous, data_source_next, # seconds_to_previous_solution, seconds_to_next_solution] output_dtype = [('met', 'float64'), ('r', 'float64'), ('i', 'float64'), ('j', 'float64'), ('k', 'float64'), ('quality', 'int32'), ('data_source_previous', 'int32'), ('data_source_next', 'int32'), ('seconds_to_previous_solution', 'float64'), ('seconds_to_next_solution', 'float64')] # As can't build type-safely using numpy.stack or numpy.column_stack, # build an empty structured array, then assign each array to the column. output_data = numpy.empty(len(output_met), dtype=output_dtype) # Then assign pointers. output_data['met'] = output_met output_data['r'] = output_quat[:, 0] output_data['i'] = output_quat[:, 1] output_data['j'] = output_quat[:, 2] output_data['k'] = output_quat[:, 3] output_data['quality'] = output_quality output_data['data_source_previous'] = data_source_prev_solution output_data['data_source_next'] = data_source_next_solution output_data['seconds_to_previous_solution'] = time_to_prev_solution output_data['seconds_to_next_solution'] = time_to_next_solution # And to make sure it's returning a new copy, not a view, test if it's not a unique base: if output_data.base is not None: output_data = output_data.copy() return output_data