Module ephemeris_library.quaternions

Expand source code
DOCUMENTATION_WARNING = """
=== DOCUMENTATION WARNING ===
=============================
All Quaternions provided in this library are described in the 'Right-Hand' notation method.
Such that: vB = qB<-A * vA = qB<-A* . [0,vA] . qB<-A == q*.v.q

Many documentation sources, such as Wikipedia's 'Quaternion' article are 'Left-Hand' notated,
Such that: vB = qB<-A * vA = qB<-A . [0,vA] . qB<-A* == q.v.q*

Note that this just means that the notation methodology just means 
the quaternions are conjugates (inversions) of one another.

All rotation chaining is best shown using 'Right-Hand' arrow notation
qA->B . qB->C = qA->C   # Right-Arrow notation.
qB<-A . qC<-B = qC<-A   # Left-arrow notation.

All processing is done with Scalar-First quaternions. q = [r,i,j,k]

For simplest usage of the library, please only regard the 'get_pointing_dir_geo'
  unit-vector output in the GEO/ITRF frame, rotating these.
"""

import ephemeris_library.database as database
import ephemeris_library.common as common
import ephemeris_library.rotation_libraries.lib_quaternion as lib_quaternion
from ephemeris_library.common import add_utc_column

import datetime
from typing import Union
import numpy
from numpy.lib import recfunctions


TELEMETRY_HOLE_OVERRIDE_THRESHOLD = 600  # Seconds. Attitude telemetry holes bound by this constant
# Pull from ephemeris_attitude_processor/./c_Attitude_Processor/AttitudeProcessor.py


def get_attitude_quaternions(start_time: Union[datetime.datetime, str, int, float],
                             end_time: Union[datetime.datetime, str, int, float],
                             minimum_quality: str = None,
                             detail: bool = False):
  '''
  Fetches the CASSIOPE 'ITRF<-CRF' quaternions.
  Scalar First, right-hand notation. (vB = q*.vA.q )

  Args:
    start_time: int, float, or string representing met time.
                datetime object or iso formatted date string.
    end_time: int, float, or string representing met time.
              datetime object or iso formatted date string.
    minimum_quality: String which matches common.ATTITUDE_DATA_SOURCE key representing the minimum quality to return.
                    Default None, which returns all results without filtering.
    detail: (bool) Default False. If True includes data_souce_* & seconds_to_* columns in the return.

  Return:
    data: A numpy structured array with columns representing the requested quaternion & metadata columns.

    | Column                       | Type    | Condition |
    |:------                       |:----    |:--------- |
    | met                          | float64 | Always    |
    | r                            | float64 | Always    |
    | i                            | float64 | Always    |
    | j                            | float64 | Always    |
    | k                            | float64 | Always    |
    | quality                      | float64 | Always    |
    | data_source_previous         | int     | if detail |
    | data_source_next             | int     | if detail |
    | seconds_to_previous_solution | float64 | if detail |
    | seconds_to_next_solution     | float64 | if detail |    

  '''
  # Check version
  common._check_version()

  # Verify time inputs.
  start_time = common._verify_time(start_time)
  end_time   = common._verify_time(end_time)
  # Verify the minimum quality.
  minimum_quality_id = common._verify_quality( minimum_quality )

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')
    
  # Fetch the data!
  data = _get_quaternions_from_database(start_time, end_time, minimum_quality_id, detail)

  return data

def interpolate_quaternions(data,
                            time_list,
                            ignore_dropout=False):
  '''
  Interpolate the given attitude data into the given timestamps.
  Independant of frame or handedness.
  Will return the frame system and handedness as it's inputs.
  
  Arguments:
    data: numpy structured array generated by get_quaternions, using detail=True.
          Structure: Must include the following:
          [met, r, i, j, k, data_source_previous, data_source_next,
          seconds_to_previous_solution, seconds_to_next_solution]

    time_list: an time list that data is to be interpolated to.
          Must be a list of MET (int/float), datetime, or ISO-8601 strings.

    ignore_dropout: bool, default False.
        Normally overrides any region with a telemetry droout greater than
        the global constant TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions.
        Setting this true disregards this limit, and interpolates through anything.

  Return:
    data: numpy structured array.
      Names: [met, qITRF<-CRF, quality, data_source_previous, data_source_next,
              seconds_to_previous_solution, seconds_to_next_solution]
      dtypes:
        float64: met, r, i, j, k, seconds_to_previous_solution, seconds_to_next_solution
        int32:   quality, data_source_previous, data_source_next

  Note: 1Hz masking of the telemetry may lead to hidden datapoints on
    the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
  '''
  # Preamble sanity checks:
  # Make sure all column are present.
  _required_columns = ['met','r','i','j','k','data_source_previous','data_source_next','seconds_to_previous_solution','seconds_to_next_solution']
  
  for _column in _required_columns:
    if _column not in data.dtype.names:
      raise ValueError("interpolate_quaternions - Missing a required column in data: {:s}".format(_column))
  
  # And pre-process the saught timestamps into MET.
  time_list = numpy.array([common._verify_time(time) for time in time_list])

  # Quick sanity checks:
  # If interpolating to the same timestamps as the input, just return the input dataset.
  if len( data ) == len( time_list ) and numpy.all( data['met'] == time_list ) and ignore_dropout == False:
    return data
  
  # And if given an empty dataset, return a null dataset.
  if len( data ) == 0:
    empty_dataset = numpy.array( [0] * len(time_list), dtype = data.dtype )
    empty_dataset['met'] = time_list
    empty_dataset['seconds_to_previous_solution'] = numpy.inf
    empty_dataset['seconds_to_next_solution']     = numpy.inf
    return empty_dataset

  # Extract the Quaternion component.
  quats = recfunctions.structured_to_unstructured(data[['met', 'r', 'i', 'j', 'k']], dtype=float)
  # Recast as an unstructured array for the use by lib_quaternion library, as it yet does not allow a structured set.
  # Format is of < MET, qR, qI, qJ, qK >, all of floats.
  
  # And remove the quaternions, as well as the derived 'quality' column, from the main dataset.
  data = data[['met', 'data_source_previous', 'data_source_next',
               'seconds_to_previous_solution', 'seconds_to_next_solution']]
  
  ###########################################
  ###########################################
  # Pre-processing:
  # Remove any zero-quaternions to allow for clean interpolation.
  _nonzero_mask = ~lib_quaternion._Is_Zero_Quat(quats)
  data  = data [_nonzero_mask]
  quats = quats[_nonzero_mask]

  #####################################
  #####################################
  # Derive the 'Definitive' metadata from the input data information.
  #  as to allow for re-derivation of the time-to & quality entries.  
  definitive_prev_met = data['met'] - data['seconds_to_previous_solution']
  definitive_prev_src = data['data_source_previous']

  definitive_next_met = data['met'] + data['seconds_to_next_solution']
  definitive_next_src = data['data_source_next']

  # Merge the lists
  definitive_met = numpy.concatenate((definitive_prev_met, definitive_next_met))
  definitive_src = numpy.concatenate((definitive_prev_src, definitive_next_src))

  # Sort both by time.
  _sort_order = definitive_met.argsort()
  definitive_met = definitive_met[_sort_order]
  definitive_src = definitive_src[_sort_order]

  # Remove any duplicate solutions, use dMET <= 0.2s, as that's the minimum cadence before either merging or filtering on the initial generation step.
  _same_time = numpy.diff(definitive_met) <= 0.2
  _same_type = numpy.diff(definitive_src) == 0
  _valid_mask = ~numpy.logical_and(_same_time, _same_type)  # Note the NOT prefix.
  # And prepend a 'True' as to match the array sizes and keep the first solution.
  _valid_mask = numpy.concatenate(([True], _valid_mask))
  # And apply the mask.
  definitive_met = definitive_met[_valid_mask]
  definitive_src = definitive_src[_valid_mask]

  # And finally, remove any solutions that are the start/end of a telemetry hole (src=0)
  # This Shouldn't hit, as the zero-quats are removed above, but this is to just make sure.
  
  if ignore_dropout:
    _dropout_mask = definitive_src == 0
    definitive_met = definitive_met[~_dropout_mask]
    definitive_src = definitive_src[~_dropout_mask]

  # And that's a list of all 'definitive' solution timestamps within the published telemetry!
  # Barring any hidden masks, of course.

  ###########################################
  ###########################################
  
  # Split the requested interpolation points to allow for extrapolation / use of bad data.
  # Interpolation requires at least two entires.
  if len( quats ) > 2: # And any valid times must be between two valid given times.
    _interpolation_mask = numpy.logical_and( numpy.min( data['met'] ) <= time_list, time_list <= numpy.max( data['met'] ) )
  else: # If there's 1 or 0 points, everything will be extrapolated. 
    _interpolation_mask = numpy.zeros( len( time_list ), dtype=bool )
    
  time_list_interpolation = time_list[ _interpolation_mask ]
  time_list_extrapolation = time_list[~_interpolation_mask ]
 
  ###########################################
  ###########################################
  
  # The quaternion interpolation itself is easy enough.
  input_met   = quats[:, 0 ]
  input_quat  = quats[:, 1:]
  met_interpolation  = time_list_interpolation
  quat_interpolation = lib_quaternion.Quaternion_Interpolation(input_met, input_quat, met_interpolation)
  # Returns an unstructured object of <qR, qI, qJ, qK>

  # Then create a list of zero-quaternions for the extrapolated dataset.
  met_extrapolation  = time_list_extrapolation
  quat_extrapolation = lib_quaternion.qZero( len( met_extrapolation ))
  
  # And merge the arrays
  output_met  = numpy.concatenate(( met_interpolation,  met_extrapolation  ))
  output_quat = numpy.concatenate(( quat_interpolation, quat_extrapolation ))
  # And sort by the timestamp.
  _sort_order = output_met.argsort()
  output_met  = output_met [_sort_order]
  output_quat = output_quat[_sort_order]
  
  ###########################################
  ###########################################
  
  # And now using the output dataset, derive the Time-To & Data_Source entries:

  # Pre-populate solutions for overriding.
  # Time & Source entries set at +/- infinity, and zeroes.
  met_prev, src_prev = numpy.full(len(output_met), -numpy.inf), numpy.zeros(len(output_met))
  met_next, src_next = numpy.full(len(output_met),  numpy.inf), numpy.zeros(len(output_met))

  # Use numpy to find the 'Definitive' solutions prior & following to each requested timestamp.
  _insert_indecies_left  = numpy.searchsorted(definitive_met, output_met, 'left')  # If output_met is in definitive_met, returns the index of the definitive that matches. 1-> [0,1,2] = 1
  _insert_indecies_right = numpy.searchsorted(definitive_met, output_met, 'right') # If output_met is in definitive_met, returns the following index.                      1-> [0,1,2] = 2
  # If met_interpolation is NOT in definitive_met, will both return the same index:  1.5 -> [0,1,2] = 2
  
  # If the insert points are do not match, then the requested 'output' met is in the Definitive list.
  _output_in_definitive_mask  = _insert_indecies_left != _insert_indecies_right
  # If both are ==0     , then the solution is prior to the first definitive point.
  # If both are ==len(x), then the solution is after the last definitive solution.
  _extrapolation_prior = numpy.logical_and( _insert_indecies_left == 0,                   _insert_indecies_right == 0 )
  _extrapolation_after = numpy.logical_and( _insert_indecies_left == len(definitive_met), _insert_indecies_right == len(definitive_met) )
  # And if not extrapolating in either direction, and not already in the Definitive list, then have to override both next & prior metadata.
  _interpolation_mask = ~ numpy.logical_or.reduce(( _extrapolation_prior, _extrapolation_after, _output_in_definitive_mask ))
  
  _has_next_sol_mask = numpy.logical_or( _extrapolation_prior, _interpolation_mask )
  _has_prev_sol_mask = numpy.logical_or( _extrapolation_after, _interpolation_mask )
  
  # There's some weird referencing/np-views here, so make a copy of the relevant arrays just for sanity.
  met_prev, src_prev = met_prev.copy(), src_prev.copy()
  met_next, src_next = met_next.copy(), src_next.copy()

  # Set the prev/next sols to their corresponding definitive solutions, if needed
  if numpy.any(_output_in_definitive_mask):
    met_prev[_output_in_definitive_mask] = met_next[_output_in_definitive_mask] = definitive_met[_insert_indecies_left[_output_in_definitive_mask]]
    src_prev[_output_in_definitive_mask] = src_next[_output_in_definitive_mask] = definitive_src[_insert_indecies_left[_output_in_definitive_mask]]
  
  if numpy.any( _has_prev_sol_mask ):
    met_prev[_has_prev_sol_mask] = definitive_met[_insert_indecies_left[_has_prev_sol_mask]-1]
    src_prev[_has_prev_sol_mask] = definitive_src[_insert_indecies_left[_has_prev_sol_mask]-1]
  
  if numpy.any( _has_next_sol_mask ):
    met_next[_has_next_sol_mask] = definitive_met[_insert_indecies_left[_has_next_sol_mask]]
    src_next[_has_next_sol_mask] = definitive_src[_insert_indecies_left[_has_next_sol_mask]]  
  
  # Then the absolute MET timestamps to the Time-To-Next/Prev time-deltas.
  time_to_next_solution = met_next - output_met # Guaranteed positive. May be infininte due to using np.inf.
  time_to_prev_solution = output_met - met_prev # Guaranteed positive. May be infininte due to using np.inf.
  # And rename the Source for consistency.
  data_source_next_solution = src_next
  data_source_prev_solution = src_prev
  
  ##############################
  ##############################
  
  # For each of the Interpolation points, rederive the 'Quality' flag.
  # Time-to solutions are just the absolute delta between the times & the saught times.
  TT_P = time_to_prev_solution
  TT_N = time_to_next_solution
  DS_P = data_source_prev_solution
  DS_N = data_source_next_solution

  TBT = time_between_solutions = TT_P + TT_N
  TBT[ numpy.isnan( TBT ) ] = numpy.inf # Inf-Inf = NaN, so override.
  
  # And then re-derive the quality column
  # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 10s, then fine solution.
  _qual_fine = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 0  <= TBT, TBT < 10])
  # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 30s, then Moderate solution.
  _qual_mod  = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 10 <= TBT, TBT < 30])
  # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 120, then Coarse solution.
  _qual_coarse = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 30 <= TBT, TBT < 120])
  
  # If Star-Tracker, but a massive spline, mark it as bad as CSS-based.
  _qual_awful = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 120 <= TBT])
  
  # If either Solution is CSS,   then it's a CSS solution.
  _qual_css   = numpy.logical_or(DS_P == 2, DS_N == 2)
  # If either Solution is ADM-2, then it's a CSS solution.
  _qual_adm2  = numpy.logical_or(DS_P == 1, DS_N == 1)
  # And if either sol is a dropout mask, or the interpolation is longer than the global threshold value, then mark it as a dropout.
  _qual_drop = numpy.logical_or.reduce(( DS_P == 0, DS_N == 0, TBT >= TELEMETRY_HOLE_OVERRIDE_THRESHOLD ))
  
  output_quality = numpy.zeros(len( output_met ), dtype=int) + 10
  output_quality[ _qual_fine  ] = 4
  output_quality[ _qual_mod   ] = 3
  output_quality[ _qual_coarse] = 2
  output_quality[ _qual_awful ] = 1
  output_quality[ _qual_css   ] = 1
  output_quality[ _qual_adm2  ] = 1
  output_quality[ _qual_drop  ] = 0

  ###########
  # If not forcing interpolation through the dropout regions, override the quaternions with zero-quaternions, if needed.
  if not ignore_dropout:
    output_quat[ output_quality == 0 ] = 0
    
  
  
  #######################################
  #######################################
  # And finally, rebuild the structured array.

  # Data format: [met, r, i, j, k, quality, data_source_previous, data_source_next,
  #               seconds_to_previous_solution, seconds_to_next_solution]

  output_dtype = [('met', 'float64'),
                  ('r',   'float64'),
                  ('i',   'float64'),
                  ('j',   'float64'),
                  ('k',   'float64'),
                  ('quality', 'int32'),
                  ('data_source_previous', 'int32'),
                  ('data_source_next', 'int32'),
                  ('seconds_to_previous_solution', 'float64'),
                  ('seconds_to_next_solution', 'float64')]

  # As can't build type-safely using numpy.stack or numpy.column_stack,
  # build an empty structured array, then assign each array to the column.

  output_data = numpy.empty(len(output_met), dtype=output_dtype)

  # Then assign pointers.
  output_data['met'] = output_met
  output_data['r']   = output_quat[:, 0]
  output_data['i']   = output_quat[:, 1]
  output_data['j']   = output_quat[:, 2]
  output_data['k']   = output_quat[:, 3]
  output_data['quality'] = output_quality
  output_data['data_source_previous']         = data_source_prev_solution
  output_data['data_source_next']             = data_source_next_solution
  output_data['seconds_to_previous_solution'] = time_to_prev_solution
  output_data['seconds_to_next_solution']     = time_to_next_solution

  # And to make sure it's returning a new copy, not a view, test if it's not a unique base:
  if output_data.base is not None:
    output_data = output_data.copy()

  return output_data

def get_pointing_dir_geo(data,
                         vector: Union[str, list]):
  '''
  Given a quaternion dataset, append columns to indicate the spacecraft CRF/Body vectors
    in the GEO/ITRF frame.
    Only considers the 'r','i','j','k' columns pulled from get_quaternions/interpolate_quaternions.
    Given quaternions -MUST- be in the 'ITRF<-CRF' frame-rotation as published.
    Zero-quaternions are returned as 'NaN'

  Available Default CRF/Body Vectors:
    x = +x = Ram        = [1,0,0]
    y = +y = Starboard  = [0,1,0]
    z = +y = Baseplate  = [0,0,1]
        -x = Anti-ram   = [-1,0,0]
        -y = Port       = [0,-1,0]
        -z = Main Panel = [0,0,-1]

  Args:
    data: numpy structured array generated by get_quaternions/interpolate_quaternions.
          Structure: Must include the following: [r, i, j, k]
    body_vector: String or len-3 list/tuple of floats.
          String must be bound to a body-vector, which specify a 3-tuple body-vector.
          List/Tuple must be of 3 floats, which is vector-normalized, and used as a body-vector

  Returns:
    data: numpy structured array.
      Appends to the given data object.

      Names:  [geo_x, geo_y, geo_z]
      dtypes: [float, float, float]
      Units:  [ Arbitrary unit vector in GEO frame ]
  '''
  # Parse the body-vector into a sane unit vector.
  crf_vector = _parse_body_vector( vector )

  # Make sure the given list contains all required columns.
  if any([column not in data.dtype.names for column in ['r', 'i', 'j', 'k']]):
    raise ValueError("Given data object does not contain at least one of the requisite columns: 'r','i','j','k'")
  # Remove any of the output frames in the structured array
  data = data[[ col for col in data.dtype.names if col not in ['geo_x', 'geo_y', 'geo_z']]]
  
  # Extract the quaternions for simpler processing.
  # Extract the unstructured array
  qITRF_from_CRF = recfunctions.structured_to_unstructured( data[['r', 'i', 'j', 'k']] ) # qITRF<-CRF, Scalar-First, Right-Hand notation.

  # Pre-populate a body frame using the given vector.
  crf_vector = numpy.full((len( qITRF_from_CRF ), 3), crf_vector)  # Nx3 repeating the crf_vector

  # Then rotate each GEO/ITRF basis vector into the Body frame.
  # If given a Zero-quaternion, will return a Zero-vector. [0,0,0]
  geo_vector = lib_quaternion.Vector_Rotate( qITRF_from_CRF , crf_vector )

  # And then override any rows generated with zero-quaternions with NaN values.
  _Is_Zero_Quat = lib_quaternion._Is_Zero_Quat( qITRF_from_CRF )
  geo_vector[_Is_Zero_Quat] = numpy.nan

  # And then split the columns for ingest by recfunctions
  geo_x, geo_y, geo_z = numpy.hsplit(geo_vector, 3)

  # And then append the new vectors to the given dataset.
  _names = ['geo_x', 'geo_y', 'geo_z']
  _type = float
  data = recfunctions.append_fields(data, _names, [geo_x, geo_y, geo_z], dtypes=_type, usemask=False)

  return data

def add_string_columns( data: numpy.ndarray, use_short_names=False ):
  '''
  Adds string-columns to the given quaternion dataset.

  Arguments:
    data: numpy structured array generated by get_attitude_quaternions.
      Must include at least one of ['quality','data_source_previous', 'data_source_next']

    use_short_names: Optional Bool. Defaults False. 
      If True,  encodes 'quality' with common.QUALITY_FLAG_STR_SHORT.
      if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.
    
  Return:
    data: numpy structured array.
      Appends the following columns if given.
      'quality_string'
      'data_source_previous_string'
      'data_source_next_string'
      
      dtypes are Unicode-32
  '''  
  if 'quality' in data.dtype.names:
    if use_short_names:
      data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT )
    else:
      data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL )
    
  if 'data_source_previous' in data.dtype.names:
    data = common.add_string_column( data, 'data_source_previous', 'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING )
  if 'data_source_next' in data.dtype.names:
    data = common.add_string_column( data, 'data_source_next', 'data_source_next_string', common.ATTITUDE_DATA_SOURCE_STRING )
    
  return data

def _get_quaternions_from_database(start_met: float,
                                   end_met: float,
                                   minimum_quality: int = 0,
                                   detail: bool = False):
  '''
  Fetch attitude quaternions and metadata from database.
  Quaternions are of the 'ITRF<-CRF' rotation. (Right-hand notation, Scalar-First)

  Arguments:
    start_met: float representing met time.
    end_met: float representing met time.
    minimum_quality: int. Default 0. Minimum quality to return from the database.
    detail: bool. Default False. Extends returned structure with attitude metadata.

  Return:
    numpy structured array: [ names=(met, r, i, j, k, quality),
                              dtype=(float64, float64, float64, float64, float64, int32)]
    If detail:  [names+=(data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution),
                 dtype+=(int32, int32, float64, float64)]

  '''
  fields = ['met', 'r', 'i', 'j', 'k', 'quality']
  
  if detail:
    fields += ['data_source_previous', 'data_source_next',
               'seconds_to_previous_solution', 'seconds_to_next_solution']
              
  database_fields = ['`' + f + '`' for f in fields] # Pre- and append the strings with backticks for use in MySQL.

  # connect to database and fetch data
  statement = ('SELECT ' + ', '.join(database_fields)
               + ' FROM `cassiope_ephemeris`.`quaternion`'
               + ' WHERE `met` BETWEEN ' + str(start_met) + ' AND ' + str(end_met)
               + ' AND `quality` >= ' + str(minimum_quality)
               + ' ORDER BY `met` ASC')

  connection = database._connect_to_database()
  cursor = connection.cursor()
  cursor.execute(statement)
  data = cursor.fetchall()
  cursor.close()
  connection.close()

  dtype_list = [('met', 'float64'),
                ('r',   'float64' ),
                ('i',   'float64' ),
                ('j',   'float64' ),
                ('k',   'float64' ),
                ('quality', 'int32') ]
  if detail:
    dtype_list += [('data_source_previous', 'int32'),
                   ('data_source_next', 'int32'),
                   ('seconds_to_previous_solution', 'float64'),
                   ('seconds_to_next_solution', 'float64')]

  results = numpy.array( data, dtype=dtype_list )
  return results

def _parse_body_vector(body_vector):
  '''
  Input:
    body_vector: string, tuple/list/nd.array.
      If Str: Must decode as a body vector string.
      else:   Must map to a valid vector in the body-frame.

  Returns:
    list: a len-3 list containing a normalized vector in the body-frame.
  '''
  if isinstance(body_vector, (list, tuple, numpy.ndarray)):
    if len(body_vector) != 3:
      raise ValueError("Given a body_vector list of length != 3!")
    # Check that the contents are valid.
    try:
      _temp = [float(val) for val in body_vector]
    except Exception as E:
      raise ValueError("Could not parse the contents of the body_vector list as floats!") from E

    # Got here, then it's a valid 3-tuple body vector.
    # Normalize, return it.
    x, y, z = body_vector
    vector_len = (x**2 + y**2 + z**2)**0.5
    # If given zeros, then raise an exception
    if vector_len < 1e-10:
      raise ValueError("Can't parse a body_vector of zeros!")

    # Otherwise, return the normalzied vector:
    x /= vector_len
    y /= vector_len
    z /= vector_len

    return [x, y, z]

  # If not bound in a list, test if it falls under a default string.
  if isinstance(body_vector, str):
    if body_vector.lower() in ['x', '+x', 'ram']:
      return [1., 0, 0]
    if body_vector.lower() in ['y', '+y', 'starboard']:
      return [0, 1., 0]
    if body_vector.lower() in ['z', '+z', 'baseplate']:
      return [0, 0, 1.]
    if body_vector.lower() in ['-x', 'anti-ram']:
      return [-1., 0, 0]
    if body_vector.lower() in ['-y', 'port']:
      return [0, -1., 0]
    if body_vector.lower() in ['-z', 'main panel']:
      return [0, 0, -1.]

  # And if got here, then didn't recognize what was given.
  raise ValueError("Could not recognize the given input as a valid body_vector!")

Functions

def add_string_columns(data: numpy.ndarray, use_short_names=False)

Adds string-columns to the given quaternion dataset.

Arguments

data: numpy structured array generated by get_attitude_quaternions. Must include at least one of ['quality','data_source_previous', 'data_source_next']

use_short_names: Optional Bool. Defaults False. If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT. if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.

Return

data: numpy structured array. Appends the following columns if given. 'quality_string' 'data_source_previous_string' 'data_source_next_string'

dtypes are Unicode-32

Expand source code
def add_string_columns( data: numpy.ndarray, use_short_names=False ):
  '''
  Adds string-columns to the given quaternion dataset.

  Arguments:
    data: numpy structured array generated by get_attitude_quaternions.
      Must include at least one of ['quality','data_source_previous', 'data_source_next']

    use_short_names: Optional Bool. Defaults False. 
      If True,  encodes 'quality' with common.QUALITY_FLAG_STR_SHORT.
      if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.
    
  Return:
    data: numpy structured array.
      Appends the following columns if given.
      'quality_string'
      'data_source_previous_string'
      'data_source_next_string'
      
      dtypes are Unicode-32
  '''  
  if 'quality' in data.dtype.names:
    if use_short_names:
      data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT )
    else:
      data = common.add_string_column( data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL )
    
  if 'data_source_previous' in data.dtype.names:
    data = common.add_string_column( data, 'data_source_previous', 'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING )
  if 'data_source_next' in data.dtype.names:
    data = common.add_string_column( data, 'data_source_next', 'data_source_next_string', common.ATTITUDE_DATA_SOURCE_STRING )
    
  return data
def get_attitude_quaternions(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], minimum_quality: str = None, detail: bool = False)

Fetches the CASSIOPE 'ITRF<-CRF' quaternions. Scalar First, right-hand notation. (vB = q*.vA.q )

Args

start_time
int, float, or string representing met time. datetime object or iso formatted date string.
end_time
int, float, or string representing met time. datetime object or iso formatted date string.
minimum_quality
String which matches common.ATTITUDE_DATA_SOURCE key representing the minimum quality to return. Default None, which returns all results without filtering.
detail
(bool) Default False. If True includes data_souce_ & seconds_to_ columns in the return.

Return

data: A numpy structured array with columns representing the requested quaternion & metadata columns.

Column Type Condition
met float64 Always
r float64 Always
i float64 Always
j float64 Always
k float64 Always
quality float64 Always
data_source_previous int if detail
data_source_next int if detail
seconds_to_previous_solution float64 if detail
seconds_to_next_solution float64 if detail
Expand source code
def get_attitude_quaternions(start_time: Union[datetime.datetime, str, int, float],
                             end_time: Union[datetime.datetime, str, int, float],
                             minimum_quality: str = None,
                             detail: bool = False):
  '''
  Fetches the CASSIOPE 'ITRF<-CRF' quaternions.
  Scalar First, right-hand notation. (vB = q*.vA.q )

  Args:
    start_time: int, float, or string representing met time.
                datetime object or iso formatted date string.
    end_time: int, float, or string representing met time.
              datetime object or iso formatted date string.
    minimum_quality: String which matches common.ATTITUDE_DATA_SOURCE key representing the minimum quality to return.
                    Default None, which returns all results without filtering.
    detail: (bool) Default False. If True includes data_souce_* & seconds_to_* columns in the return.

  Return:
    data: A numpy structured array with columns representing the requested quaternion & metadata columns.

    | Column                       | Type    | Condition |
    |:------                       |:----    |:--------- |
    | met                          | float64 | Always    |
    | r                            | float64 | Always    |
    | i                            | float64 | Always    |
    | j                            | float64 | Always    |
    | k                            | float64 | Always    |
    | quality                      | float64 | Always    |
    | data_source_previous         | int     | if detail |
    | data_source_next             | int     | if detail |
    | seconds_to_previous_solution | float64 | if detail |
    | seconds_to_next_solution     | float64 | if detail |    

  '''
  # Check version
  common._check_version()

  # Verify time inputs.
  start_time = common._verify_time(start_time)
  end_time   = common._verify_time(end_time)
  # Verify the minimum quality.
  minimum_quality_id = common._verify_quality( minimum_quality )

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')
    
  # Fetch the data!
  data = _get_quaternions_from_database(start_time, end_time, minimum_quality_id, detail)

  return data
def get_pointing_dir_geo(data, vector: Union[str, list])

Given a quaternion dataset, append columns to indicate the spacecraft CRF/Body vectors in the GEO/ITRF frame. Only considers the 'r','i','j','k' columns pulled from get_quaternions/interpolate_quaternions. Given quaternions -MUST- be in the 'ITRF<-CRF' frame-rotation as published. Zero-quaternions are returned as 'NaN'

Available Default CRF/Body Vectors: x = +x = Ram = [1,0,0] y = +y = Starboard = [0,1,0] z = +y = Baseplate = [0,0,1] -x = Anti-ram = [-1,0,0] -y = Port = [0,-1,0] -z = Main Panel = [0,0,-1]

Args

data
numpy structured array generated by get_quaternions/interpolate_quaternions. Structure: Must include the following: [r, i, j, k]
body_vector
String or len-3 list/tuple of floats. String must be bound to a body-vector, which specify a 3-tuple body-vector. List/Tuple must be of 3 floats, which is vector-normalized, and used as a body-vector

Returns

data

numpy structured array. Appends to the given data object.

Names: [geo_x, geo_y, geo_z] dtypes: [float, float, float] Units: [ Arbitrary unit vector in GEO frame ]

Expand source code
def get_pointing_dir_geo(data,
                         vector: Union[str, list]):
  '''
  Given a quaternion dataset, append columns to indicate the spacecraft CRF/Body vectors
    in the GEO/ITRF frame.
    Only considers the 'r','i','j','k' columns pulled from get_quaternions/interpolate_quaternions.
    Given quaternions -MUST- be in the 'ITRF<-CRF' frame-rotation as published.
    Zero-quaternions are returned as 'NaN'

  Available Default CRF/Body Vectors:
    x = +x = Ram        = [1,0,0]
    y = +y = Starboard  = [0,1,0]
    z = +y = Baseplate  = [0,0,1]
        -x = Anti-ram   = [-1,0,0]
        -y = Port       = [0,-1,0]
        -z = Main Panel = [0,0,-1]

  Args:
    data: numpy structured array generated by get_quaternions/interpolate_quaternions.
          Structure: Must include the following: [r, i, j, k]
    body_vector: String or len-3 list/tuple of floats.
          String must be bound to a body-vector, which specify a 3-tuple body-vector.
          List/Tuple must be of 3 floats, which is vector-normalized, and used as a body-vector

  Returns:
    data: numpy structured array.
      Appends to the given data object.

      Names:  [geo_x, geo_y, geo_z]
      dtypes: [float, float, float]
      Units:  [ Arbitrary unit vector in GEO frame ]
  '''
  # Parse the body-vector into a sane unit vector.
  crf_vector = _parse_body_vector( vector )

  # Make sure the given list contains all required columns.
  if any([column not in data.dtype.names for column in ['r', 'i', 'j', 'k']]):
    raise ValueError("Given data object does not contain at least one of the requisite columns: 'r','i','j','k'")
  # Remove any of the output frames in the structured array
  data = data[[ col for col in data.dtype.names if col not in ['geo_x', 'geo_y', 'geo_z']]]
  
  # Extract the quaternions for simpler processing.
  # Extract the unstructured array
  qITRF_from_CRF = recfunctions.structured_to_unstructured( data[['r', 'i', 'j', 'k']] ) # qITRF<-CRF, Scalar-First, Right-Hand notation.

  # Pre-populate a body frame using the given vector.
  crf_vector = numpy.full((len( qITRF_from_CRF ), 3), crf_vector)  # Nx3 repeating the crf_vector

  # Then rotate each GEO/ITRF basis vector into the Body frame.
  # If given a Zero-quaternion, will return a Zero-vector. [0,0,0]
  geo_vector = lib_quaternion.Vector_Rotate( qITRF_from_CRF , crf_vector )

  # And then override any rows generated with zero-quaternions with NaN values.
  _Is_Zero_Quat = lib_quaternion._Is_Zero_Quat( qITRF_from_CRF )
  geo_vector[_Is_Zero_Quat] = numpy.nan

  # And then split the columns for ingest by recfunctions
  geo_x, geo_y, geo_z = numpy.hsplit(geo_vector, 3)

  # And then append the new vectors to the given dataset.
  _names = ['geo_x', 'geo_y', 'geo_z']
  _type = float
  data = recfunctions.append_fields(data, _names, [geo_x, geo_y, geo_z], dtypes=_type, usemask=False)

  return data
def interpolate_quaternions(data, time_list, ignore_dropout=False)

Interpolate the given attitude data into the given timestamps. Independant of frame or handedness. Will return the frame system and handedness as it's inputs.

Arguments

data: numpy structured array generated by get_quaternions, using detail=True. Structure: Must include the following: [met, r, i, j, k, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution]

time_list: an time list that data is to be interpolated to. Must be a list of MET (int/float), datetime, or ISO-8601 strings.

ignore_dropout: bool, default False. Normally overrides any region with a telemetry droout greater than the global constant TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions. Setting this true disregards this limit, and interpolates through anything.

Return

data: numpy structured array. Names: [met, qITRF<-CRF, quality, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] dtypes: float64: met, r, i, j, k, seconds_to_previous_solution, seconds_to_next_solution int32: quality, data_source_previous, data_source_next

Note: 1Hz masking of the telemetry may lead to hidden datapoints on the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.

Expand source code
def interpolate_quaternions(data,
                            time_list,
                            ignore_dropout=False):
  '''
  Interpolate the given attitude data into the given timestamps.
  Independant of frame or handedness.
  Will return the frame system and handedness as it's inputs.
  
  Arguments:
    data: numpy structured array generated by get_quaternions, using detail=True.
          Structure: Must include the following:
          [met, r, i, j, k, data_source_previous, data_source_next,
          seconds_to_previous_solution, seconds_to_next_solution]

    time_list: an time list that data is to be interpolated to.
          Must be a list of MET (int/float), datetime, or ISO-8601 strings.

    ignore_dropout: bool, default False.
        Normally overrides any region with a telemetry droout greater than
        the global constant TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions.
        Setting this true disregards this limit, and interpolates through anything.

  Return:
    data: numpy structured array.
      Names: [met, qITRF<-CRF, quality, data_source_previous, data_source_next,
              seconds_to_previous_solution, seconds_to_next_solution]
      dtypes:
        float64: met, r, i, j, k, seconds_to_previous_solution, seconds_to_next_solution
        int32:   quality, data_source_previous, data_source_next

  Note: 1Hz masking of the telemetry may lead to hidden datapoints on
    the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
  '''
  # Preamble sanity checks:
  # Make sure all column are present.
  _required_columns = ['met','r','i','j','k','data_source_previous','data_source_next','seconds_to_previous_solution','seconds_to_next_solution']
  
  for _column in _required_columns:
    if _column not in data.dtype.names:
      raise ValueError("interpolate_quaternions - Missing a required column in data: {:s}".format(_column))
  
  # And pre-process the saught timestamps into MET.
  time_list = numpy.array([common._verify_time(time) for time in time_list])

  # Quick sanity checks:
  # If interpolating to the same timestamps as the input, just return the input dataset.
  if len( data ) == len( time_list ) and numpy.all( data['met'] == time_list ) and ignore_dropout == False:
    return data
  
  # And if given an empty dataset, return a null dataset.
  if len( data ) == 0:
    empty_dataset = numpy.array( [0] * len(time_list), dtype = data.dtype )
    empty_dataset['met'] = time_list
    empty_dataset['seconds_to_previous_solution'] = numpy.inf
    empty_dataset['seconds_to_next_solution']     = numpy.inf
    return empty_dataset

  # Extract the Quaternion component.
  quats = recfunctions.structured_to_unstructured(data[['met', 'r', 'i', 'j', 'k']], dtype=float)
  # Recast as an unstructured array for the use by lib_quaternion library, as it yet does not allow a structured set.
  # Format is of < MET, qR, qI, qJ, qK >, all of floats.
  
  # And remove the quaternions, as well as the derived 'quality' column, from the main dataset.
  data = data[['met', 'data_source_previous', 'data_source_next',
               'seconds_to_previous_solution', 'seconds_to_next_solution']]
  
  ###########################################
  ###########################################
  # Pre-processing:
  # Remove any zero-quaternions to allow for clean interpolation.
  _nonzero_mask = ~lib_quaternion._Is_Zero_Quat(quats)
  data  = data [_nonzero_mask]
  quats = quats[_nonzero_mask]

  #####################################
  #####################################
  # Derive the 'Definitive' metadata from the input data information.
  #  as to allow for re-derivation of the time-to & quality entries.  
  definitive_prev_met = data['met'] - data['seconds_to_previous_solution']
  definitive_prev_src = data['data_source_previous']

  definitive_next_met = data['met'] + data['seconds_to_next_solution']
  definitive_next_src = data['data_source_next']

  # Merge the lists
  definitive_met = numpy.concatenate((definitive_prev_met, definitive_next_met))
  definitive_src = numpy.concatenate((definitive_prev_src, definitive_next_src))

  # Sort both by time.
  _sort_order = definitive_met.argsort()
  definitive_met = definitive_met[_sort_order]
  definitive_src = definitive_src[_sort_order]

  # Remove any duplicate solutions, use dMET <= 0.2s, as that's the minimum cadence before either merging or filtering on the initial generation step.
  _same_time = numpy.diff(definitive_met) <= 0.2
  _same_type = numpy.diff(definitive_src) == 0
  _valid_mask = ~numpy.logical_and(_same_time, _same_type)  # Note the NOT prefix.
  # And prepend a 'True' as to match the array sizes and keep the first solution.
  _valid_mask = numpy.concatenate(([True], _valid_mask))
  # And apply the mask.
  definitive_met = definitive_met[_valid_mask]
  definitive_src = definitive_src[_valid_mask]

  # And finally, remove any solutions that are the start/end of a telemetry hole (src=0)
  # This Shouldn't hit, as the zero-quats are removed above, but this is to just make sure.
  
  if ignore_dropout:
    _dropout_mask = definitive_src == 0
    definitive_met = definitive_met[~_dropout_mask]
    definitive_src = definitive_src[~_dropout_mask]

  # And that's a list of all 'definitive' solution timestamps within the published telemetry!
  # Barring any hidden masks, of course.

  ###########################################
  ###########################################
  
  # Split the requested interpolation points to allow for extrapolation / use of bad data.
  # Interpolation requires at least two entires.
  if len( quats ) > 2: # And any valid times must be between two valid given times.
    _interpolation_mask = numpy.logical_and( numpy.min( data['met'] ) <= time_list, time_list <= numpy.max( data['met'] ) )
  else: # If there's 1 or 0 points, everything will be extrapolated. 
    _interpolation_mask = numpy.zeros( len( time_list ), dtype=bool )
    
  time_list_interpolation = time_list[ _interpolation_mask ]
  time_list_extrapolation = time_list[~_interpolation_mask ]
 
  ###########################################
  ###########################################
  
  # The quaternion interpolation itself is easy enough.
  input_met   = quats[:, 0 ]
  input_quat  = quats[:, 1:]
  met_interpolation  = time_list_interpolation
  quat_interpolation = lib_quaternion.Quaternion_Interpolation(input_met, input_quat, met_interpolation)
  # Returns an unstructured object of <qR, qI, qJ, qK>

  # Then create a list of zero-quaternions for the extrapolated dataset.
  met_extrapolation  = time_list_extrapolation
  quat_extrapolation = lib_quaternion.qZero( len( met_extrapolation ))
  
  # And merge the arrays
  output_met  = numpy.concatenate(( met_interpolation,  met_extrapolation  ))
  output_quat = numpy.concatenate(( quat_interpolation, quat_extrapolation ))
  # And sort by the timestamp.
  _sort_order = output_met.argsort()
  output_met  = output_met [_sort_order]
  output_quat = output_quat[_sort_order]
  
  ###########################################
  ###########################################
  
  # And now using the output dataset, derive the Time-To & Data_Source entries:

  # Pre-populate solutions for overriding.
  # Time & Source entries set at +/- infinity, and zeroes.
  met_prev, src_prev = numpy.full(len(output_met), -numpy.inf), numpy.zeros(len(output_met))
  met_next, src_next = numpy.full(len(output_met),  numpy.inf), numpy.zeros(len(output_met))

  # Use numpy to find the 'Definitive' solutions prior & following to each requested timestamp.
  _insert_indecies_left  = numpy.searchsorted(definitive_met, output_met, 'left')  # If output_met is in definitive_met, returns the index of the definitive that matches. 1-> [0,1,2] = 1
  _insert_indecies_right = numpy.searchsorted(definitive_met, output_met, 'right') # If output_met is in definitive_met, returns the following index.                      1-> [0,1,2] = 2
  # If met_interpolation is NOT in definitive_met, will both return the same index:  1.5 -> [0,1,2] = 2
  
  # If the insert points are do not match, then the requested 'output' met is in the Definitive list.
  _output_in_definitive_mask  = _insert_indecies_left != _insert_indecies_right
  # If both are ==0     , then the solution is prior to the first definitive point.
  # If both are ==len(x), then the solution is after the last definitive solution.
  _extrapolation_prior = numpy.logical_and( _insert_indecies_left == 0,                   _insert_indecies_right == 0 )
  _extrapolation_after = numpy.logical_and( _insert_indecies_left == len(definitive_met), _insert_indecies_right == len(definitive_met) )
  # And if not extrapolating in either direction, and not already in the Definitive list, then have to override both next & prior metadata.
  _interpolation_mask = ~ numpy.logical_or.reduce(( _extrapolation_prior, _extrapolation_after, _output_in_definitive_mask ))
  
  _has_next_sol_mask = numpy.logical_or( _extrapolation_prior, _interpolation_mask )
  _has_prev_sol_mask = numpy.logical_or( _extrapolation_after, _interpolation_mask )
  
  # There's some weird referencing/np-views here, so make a copy of the relevant arrays just for sanity.
  met_prev, src_prev = met_prev.copy(), src_prev.copy()
  met_next, src_next = met_next.copy(), src_next.copy()

  # Set the prev/next sols to their corresponding definitive solutions, if needed
  if numpy.any(_output_in_definitive_mask):
    met_prev[_output_in_definitive_mask] = met_next[_output_in_definitive_mask] = definitive_met[_insert_indecies_left[_output_in_definitive_mask]]
    src_prev[_output_in_definitive_mask] = src_next[_output_in_definitive_mask] = definitive_src[_insert_indecies_left[_output_in_definitive_mask]]
  
  if numpy.any( _has_prev_sol_mask ):
    met_prev[_has_prev_sol_mask] = definitive_met[_insert_indecies_left[_has_prev_sol_mask]-1]
    src_prev[_has_prev_sol_mask] = definitive_src[_insert_indecies_left[_has_prev_sol_mask]-1]
  
  if numpy.any( _has_next_sol_mask ):
    met_next[_has_next_sol_mask] = definitive_met[_insert_indecies_left[_has_next_sol_mask]]
    src_next[_has_next_sol_mask] = definitive_src[_insert_indecies_left[_has_next_sol_mask]]  
  
  # Then the absolute MET timestamps to the Time-To-Next/Prev time-deltas.
  time_to_next_solution = met_next - output_met # Guaranteed positive. May be infininte due to using np.inf.
  time_to_prev_solution = output_met - met_prev # Guaranteed positive. May be infininte due to using np.inf.
  # And rename the Source for consistency.
  data_source_next_solution = src_next
  data_source_prev_solution = src_prev
  
  ##############################
  ##############################
  
  # For each of the Interpolation points, rederive the 'Quality' flag.
  # Time-to solutions are just the absolute delta between the times & the saught times.
  TT_P = time_to_prev_solution
  TT_N = time_to_next_solution
  DS_P = data_source_prev_solution
  DS_N = data_source_next_solution

  TBT = time_between_solutions = TT_P + TT_N
  TBT[ numpy.isnan( TBT ) ] = numpy.inf # Inf-Inf = NaN, so override.
  
  # And then re-derive the quality column
  # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 10s, then fine solution.
  _qual_fine = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 0  <= TBT, TBT < 10])
  # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 30s, then Moderate solution.
  _qual_mod  = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 10 <= TBT, TBT < 30])
  # If both solutions in ADM-7/SSA/SSB/Fused, and time delta < 120, then Coarse solution.
  _qual_coarse = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 30 <= TBT, TBT < 120])
  
  # If Star-Tracker, but a massive spline, mark it as bad as CSS-based.
  _qual_awful = numpy.logical_and.reduce([DS_P >= 3, DS_N >= 3, 120 <= TBT])
  
  # If either Solution is CSS,   then it's a CSS solution.
  _qual_css   = numpy.logical_or(DS_P == 2, DS_N == 2)
  # If either Solution is ADM-2, then it's a CSS solution.
  _qual_adm2  = numpy.logical_or(DS_P == 1, DS_N == 1)
  # And if either sol is a dropout mask, or the interpolation is longer than the global threshold value, then mark it as a dropout.
  _qual_drop = numpy.logical_or.reduce(( DS_P == 0, DS_N == 0, TBT >= TELEMETRY_HOLE_OVERRIDE_THRESHOLD ))
  
  output_quality = numpy.zeros(len( output_met ), dtype=int) + 10
  output_quality[ _qual_fine  ] = 4
  output_quality[ _qual_mod   ] = 3
  output_quality[ _qual_coarse] = 2
  output_quality[ _qual_awful ] = 1
  output_quality[ _qual_css   ] = 1
  output_quality[ _qual_adm2  ] = 1
  output_quality[ _qual_drop  ] = 0

  ###########
  # If not forcing interpolation through the dropout regions, override the quaternions with zero-quaternions, if needed.
  if not ignore_dropout:
    output_quat[ output_quality == 0 ] = 0
    
  
  
  #######################################
  #######################################
  # And finally, rebuild the structured array.

  # Data format: [met, r, i, j, k, quality, data_source_previous, data_source_next,
  #               seconds_to_previous_solution, seconds_to_next_solution]

  output_dtype = [('met', 'float64'),
                  ('r',   'float64'),
                  ('i',   'float64'),
                  ('j',   'float64'),
                  ('k',   'float64'),
                  ('quality', 'int32'),
                  ('data_source_previous', 'int32'),
                  ('data_source_next', 'int32'),
                  ('seconds_to_previous_solution', 'float64'),
                  ('seconds_to_next_solution', 'float64')]

  # As can't build type-safely using numpy.stack or numpy.column_stack,
  # build an empty structured array, then assign each array to the column.

  output_data = numpy.empty(len(output_met), dtype=output_dtype)

  # Then assign pointers.
  output_data['met'] = output_met
  output_data['r']   = output_quat[:, 0]
  output_data['i']   = output_quat[:, 1]
  output_data['j']   = output_quat[:, 2]
  output_data['k']   = output_quat[:, 3]
  output_data['quality'] = output_quality
  output_data['data_source_previous']         = data_source_prev_solution
  output_data['data_source_next']             = data_source_next_solution
  output_data['seconds_to_previous_solution'] = time_to_prev_solution
  output_data['seconds_to_next_solution']     = time_to_next_solution

  # And to make sure it's returning a new copy, not a view, test if it's not a unique base:
  if output_data.base is not None:
    output_data = output_data.copy()

  return output_data