Module ephemeris_library.attitude

Expand source code
import ephemeris_library.database as database
import ephemeris_library.common as common
from ephemeris_library.common import add_utc_column

import datetime
import numpy

from typing import Union
from numpy.lib import recfunctions


def get_yaw_pitch_roll(start_time: Union[datetime.datetime, str, int, float],
                       end_time: Union[datetime.datetime, str, int, float],
                       minimum_quality: str = None,
                       detail=False):
  '''
  Get yaw pitch and roll information, with data source previous and next information also.

  Arguments:
    start_time: int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.
    end_time: int, float, or string representing met time.
              datetime object or iso formatted date string.
    minimum_quality: String which matches attitude.ATTITUDE_DATA_SOURCE key representing the minimum quality to return.
              Default None, which returns all results without filtering.
    detail: bool. Appends data_source_* & seconds_to_*_solution columns.

  Return: A numpy structured array with columns representing the requested YPR & metadata columns.

    | Column                        | Type    | Condition |
    |:------                        |:----    |:--------- |
    | met                           | float64 | Always    |
    | yaw                           | float64 | Always    |
    | pitch                         | float64 | Always    |
    | roll                          | float64 | Always    |
    | quality                       | float64 | Always    |
    | data_source_previous          | float64 | if detail |
    | data_source_next              | float64 | if detail |
    | seconds_to_previous_solution  | float64 | if detail |
    | seconds_to_next_solution      | float64 | if detail |

  '''
  # Check version
  common._check_version()

  # Verify time inputs.
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start_time down to start of second.
  start_time = int(start_time)

  # Verify the minimum quality.
  minimum_quality_id = common._verify_quality(minimum_quality)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ypr(start_time, end_time, minimum_quality_id, detail)

  return data


def interpolate_ypr(data, time_list, ignore_dropout=False):
  '''
  Interpolate the given YPR data into the given timestamps.
  Interfaces with the quaternion library for the interpolation.

  Arguments:
    data: numpy structured array generated by get_yaw_pitch_roll with detail=True.
          Structure: Must include the following:
          [met, yaw, pitch, roll, data_source_previous, data_source_next,
          seconds_to_previous_solution, seconds_to_next_solution]

    time_list: an time list that data is to be interpolated to.
          Must be a list of MET (int/float), datetime, or ISO-8601 strings.

    ignore_dropout: bool, default False.
        Normally overrides any region with a telemetry droout greater than the global 
          constant quaternions.TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions.
        Setting this true disregards this limit, and interpolates through anything.

  Return:
    data: numpy structured array.
      Names: [met, yaw, pitch, roll, quality, data_source_previous, data_source_next,
              seconds_to_previous_solution, seconds_to_next_solution]
      dtypes:
        float64: met, yaw, pitch, roll, seconds_to_previous_solution, seconds_to_next_solution
        int32:   quality, data_source_previous, data_source_next

  Note: 1Hz masking of the telemetry may lead to hidden datapoints on
    the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
  '''
  import ephemeris_library.quaternions as quaternions    # Does the actual interpolation work
  # Tranlates the YPR to and from Quaternions.
  import ephemeris_library.rotation_libraries.lib_quaternion as lib_quaternion

  # Make a preliminary copy of the input dtype names, as they're manipulated in this function.
  input_columns = data.dtype.names

  # Preamble & sanity checks:
  # Make sure all column are present.
  _required_columns = ['met', 'yaw', 'pitch', 'roll', 'quality', 'data_source_previous',
                       'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']
  for _column in _required_columns:
    if _column not in data.dtype.names:
      raise ValueError("interpolate_ypr - Missing a required column in the data structure: {:s}".format(_column))

  # Extract the YPR for translation into Quaternions.
  YPR_data = recfunctions.structured_to_unstructured(data[['yaw', 'pitch', 'roll']], dtype=float)
  # Recast as an unstructured array for the use by lib_quaternion library, as it does not (yet) allow a structured set.
  # Format is of < Yaw, Pitch, Roll >, all of floats.

  # Translate the YPR data into the CRF->Body quaternions.
  quat_data = lib_quaternion.YPR_to_Quaternion(YPR_data)  # <Y,P,R> -> <r,i,j,k> Does not require MET.
  # the YPR->Quat function allows for 999-YPR values to be given to it, returning crap.
  # We need to keep the met/metadata lines for extrapolation purposes, so override any 999-YPR data with zero-quaternions
  quat_data[YPR_data[:, 0] > 360.] = 0   # If YAW > 360* (==999, but equivalence in floats is annoying.

  # Merge the Quaternions into the data variable.
  # First strip the Data into the minimum viable dataset,
  data = data[['met', 'quality', 'data_source_previous', 'data_source_next',
               'seconds_to_previous_solution', 'seconds_to_next_solution']]
  data = data.copy()  # And make sure it's a static object, not a view.

  # And then add the new columns to the dataset:
  _names = ['r', 'i', 'j', 'k']
  _types = [float, float, float, float]
  r, i, j, k = quat_data.T  # Split into invividual columns for use below.
  data = recfunctions.append_fields(data, _names, [r, i, j, k], _types, usemask=False)

  # Re-organize the dataset for consistency.
  data = data[['met', 'r', 'i', 'j', 'k', 'quality', 'data_source_previous',
               'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']]

  # Feed the intermediate 'quaternion+metadata' structure into the quaternion interpolator:
  interpolated_data = quaternions.interpolate_quaternions(data, time_list, ignore_dropout)

  # Extract the interpolated quaternions:
  output_quat = recfunctions.structured_to_unstructured(interpolated_data[['r', 'i', 'j', 'k']], dtype=float)
  output_YPR = lib_quaternion.Quaternion_to_YPR(output_quat)  # <r,i,j,k> -> <Y,P,R> - Does not require MET.

  # And if any of the returned interpolated quaternions was a zero-quat, this was within a period of dropout or extrapolation.
  # Override all zero-quaternions with a YPR value of 999.
  output_YPR[lib_quaternion._Is_Zero_Quat(output_quat)] = 999.0
  # Note the 'ignore_dropout' arg is not applied here, as it's guaranteed by the interpolator function to return
  # non-zero quaternions in non-extrapolated regions

  output_dtype = [('met', 'float64'),
                  ('yaw', 'float64'),
                  ('pitch', 'float64'),
                  ('roll', 'float64'),
                  ('quality', 'int32'),
                  ('data_source_previous', 'int32'),
                  ('data_source_next', 'int32'),
                  ('seconds_to_previous_solution', 'float64'),
                  ('seconds_to_next_solution', 'float64')]

  # As can't build type-safely using numpy.stack or numpy.column_stack,
  # build an empty structured array, then assign each array to the column.

  output_data = numpy.empty(len(output_YPR), dtype=output_dtype)

  # Then assign pointers.
  output_data['met'] = interpolated_data['met']
  output_data['yaw'] = output_YPR[:, 0]
  output_data['pitch'] = output_YPR[:, 1]
  output_data['roll'] = output_YPR[:, 2]
  output_data['quality'] = interpolated_data['quality']
  output_data['data_source_previous'] = interpolated_data['data_source_previous']
  output_data['data_source_next'] = interpolated_data['data_source_next']
  output_data['seconds_to_previous_solution'] = interpolated_data['seconds_to_previous_solution']
  output_data['seconds_to_next_solution'] = interpolated_data['seconds_to_next_solution']

  # And return!
  return output_data


def add_string_columns(data: numpy.ndarray, use_short_names=False):
  '''
  Adds string-columns to the given quaternion dataset.

  Arguments:
    data: numpy structured array generated by get_attitude_quaternions.
      Must include at least one of ['quality','data_source_previous', 'data_source_next']

    use_short_names: Optional Bool. Defaults False. 
      If True,  encodes 'quality' with common.QUALITY_FLAG_STR_SHORT.
      if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.

  Return:
    data: numpy structured array.
      Appends the following columns if given.
      'quality_string'
      'data_source_previous_string'
      'data_source_next_string'

      dtypes are Unicode-32
  '''
  if 'quality' in data.dtype.names:
    if use_short_names:
      data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT)
    else:
      data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL)

  if 'data_source_previous' in data.dtype.names:
    data = common.add_string_column(data, 'data_source_previous',
                                    'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING)
  if 'data_source_next' in data.dtype.names:
    data = common.add_string_column(data, 'data_source_next', 'data_source_next_string',
                                    common.ATTITUDE_DATA_SOURCE_STRING)

  return data


def _get_ypr(start_met: int,
             end_met: int,
             minimum_data_source_id: int,
             detail: bool = False):
  '''
  Get yaw, pitch, and roll information from database.

  Arguments:
    start_met: float representing met time.
    end_met: float representing met time.
    minimum_data_source_id: minimum data_quality to return from the database.
    detail: (bool) Default False. Return yaw, pitch, roll, quality, and data_source_previous, data_source_next, seconds_to_previous_solution, and seconds_to_next_solution

  Return:
    numpy structured array: [(met, yaw, pitch, roll, quality),
                              dtype=(float64, float64, float64, float64, int32)]
    If detail:  [(met, yaw, pitch, roll, quality,
                  data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution),
                dtype=(float64, float64, float64, float64, int32,
                  int32, int32, float64, float64)]

  '''
  fields = ['met', 'yaw', 'pitch', 'roll', 'quality']
  if detail:
    fields.extend(['data_source_previous', 'data_source_next',
                   'seconds_to_previous_solution', 'seconds_to_next_solution'])

  database_fields = ['`' + f + '`' for f in fields]

  # connect to database and fetch data
  statement = ('SELECT ' + ', '.join(database_fields)
               + ' FROM `cassiope_ephemeris`.`ypr`'
               + ' WHERE `met` BETWEEN ' + str(start_met) + ' AND ' + str(end_met)
               + ' AND `quality` >= ' + str(minimum_data_source_id)
               + ' ORDER BY `met` ASC')

  connection = database._connect_to_database()
  cursor = connection.cursor()
  cursor.execute(statement)
  data = cursor.fetchall()
  cursor.close()
  connection.close()

  # return results
  dtype_list = [('met', 'float64'),
                ('yaw', 'float64'),
                ('pitch', 'float64'),
                ('roll', 'float64'),
                ('quality', 'int32')
                ]
  if detail:
    dtype_list.extend([('data_source_previous', 'int32'),
                       ('data_source_next', 'int32'),
                       ('seconds_to_previous_solution', 'float64'),
                       ('seconds_to_next_solution', 'float64')])

  results = numpy.array(data, dtype=dtype_list)
  return results

Functions

def add_string_columns(data: numpy.ndarray, use_short_names=False)

Adds string-columns to the given quaternion dataset.

Arguments

data: numpy structured array generated by get_attitude_quaternions. Must include at least one of ['quality','data_source_previous', 'data_source_next']

use_short_names: Optional Bool. Defaults False. If True, encodes 'quality' with common.QUALITY_FLAG_STR_SHORT. if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.

Return

data: numpy structured array. Appends the following columns if given. 'quality_string' 'data_source_previous_string' 'data_source_next_string'

dtypes are Unicode-32

Expand source code
def add_string_columns(data: numpy.ndarray, use_short_names=False):
  '''
  Adds string-columns to the given quaternion dataset.

  Arguments:
    data: numpy structured array generated by get_attitude_quaternions.
      Must include at least one of ['quality','data_source_previous', 'data_source_next']

    use_short_names: Optional Bool. Defaults False. 
      If True,  encodes 'quality' with common.QUALITY_FLAG_STR_SHORT.
      if False, encodes 'quality' with common.QUALITY_FLAG_STR_DETAIL.

  Return:
    data: numpy structured array.
      Appends the following columns if given.
      'quality_string'
      'data_source_previous_string'
      'data_source_next_string'

      dtypes are Unicode-32
  '''
  if 'quality' in data.dtype.names:
    if use_short_names:
      data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_SHORT)
    else:
      data = common.add_string_column(data, 'quality', 'quality_string', common.QUALITY_FLAG_STR_DETAIL)

  if 'data_source_previous' in data.dtype.names:
    data = common.add_string_column(data, 'data_source_previous',
                                    'data_source_previous_string', common.ATTITUDE_DATA_SOURCE_STRING)
  if 'data_source_next' in data.dtype.names:
    data = common.add_string_column(data, 'data_source_next', 'data_source_next_string',
                                    common.ATTITUDE_DATA_SOURCE_STRING)

  return data
def get_yaw_pitch_roll(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], minimum_quality: str = None, detail=False)

Get yaw pitch and roll information, with data source previous and next information also.

Arguments

start_time: int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second. end_time: int, float, or string representing met time. datetime object or iso formatted date string. minimum_quality: String which matches attitude.ATTITUDE_DATA_SOURCE key representing the minimum quality to return. Default None, which returns all results without filtering. detail: bool. Appends data_source_ & seconds_to__solution columns.

Return: A numpy structured array with columns representing the requested YPR & metadata columns.

Column Type Condition
met float64 Always
yaw float64 Always
pitch float64 Always
roll float64 Always
quality float64 Always
data_source_previous float64 if detail
data_source_next float64 if detail
seconds_to_previous_solution float64 if detail
seconds_to_next_solution float64 if detail
Expand source code
def get_yaw_pitch_roll(start_time: Union[datetime.datetime, str, int, float],
                       end_time: Union[datetime.datetime, str, int, float],
                       minimum_quality: str = None,
                       detail=False):
  '''
  Get yaw pitch and roll information, with data source previous and next information also.

  Arguments:
    start_time: int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.
    end_time: int, float, or string representing met time.
              datetime object or iso formatted date string.
    minimum_quality: String which matches attitude.ATTITUDE_DATA_SOURCE key representing the minimum quality to return.
              Default None, which returns all results without filtering.
    detail: bool. Appends data_source_* & seconds_to_*_solution columns.

  Return: A numpy structured array with columns representing the requested YPR & metadata columns.

    | Column                        | Type    | Condition |
    |:------                        |:----    |:--------- |
    | met                           | float64 | Always    |
    | yaw                           | float64 | Always    |
    | pitch                         | float64 | Always    |
    | roll                          | float64 | Always    |
    | quality                       | float64 | Always    |
    | data_source_previous          | float64 | if detail |
    | data_source_next              | float64 | if detail |
    | seconds_to_previous_solution  | float64 | if detail |
    | seconds_to_next_solution      | float64 | if detail |

  '''
  # Check version
  common._check_version()

  # Verify time inputs.
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start_time down to start of second.
  start_time = int(start_time)

  # Verify the minimum quality.
  minimum_quality_id = common._verify_quality(minimum_quality)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ypr(start_time, end_time, minimum_quality_id, detail)

  return data
def interpolate_ypr(data, time_list, ignore_dropout=False)

Interpolate the given YPR data into the given timestamps. Interfaces with the quaternion library for the interpolation.

Arguments

data: numpy structured array generated by get_yaw_pitch_roll with detail=True. Structure: Must include the following: [met, yaw, pitch, roll, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution]

time_list: an time list that data is to be interpolated to. Must be a list of MET (int/float), datetime, or ISO-8601 strings.

ignore_dropout: bool, default False. Normally overrides any region with a telemetry droout greater than the global constant quaternions.TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions. Setting this true disregards this limit, and interpolates through anything.

Return

data: numpy structured array. Names: [met, yaw, pitch, roll, quality, data_source_previous, data_source_next, seconds_to_previous_solution, seconds_to_next_solution] dtypes: float64: met, yaw, pitch, roll, seconds_to_previous_solution, seconds_to_next_solution int32: quality, data_source_previous, data_source_next

Note: 1Hz masking of the telemetry may lead to hidden datapoints on the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.

Expand source code
def interpolate_ypr(data, time_list, ignore_dropout=False):
  '''
  Interpolate the given YPR data into the given timestamps.
  Interfaces with the quaternion library for the interpolation.

  Arguments:
    data: numpy structured array generated by get_yaw_pitch_roll with detail=True.
          Structure: Must include the following:
          [met, yaw, pitch, roll, data_source_previous, data_source_next,
          seconds_to_previous_solution, seconds_to_next_solution]

    time_list: an time list that data is to be interpolated to.
          Must be a list of MET (int/float), datetime, or ISO-8601 strings.

    ignore_dropout: bool, default False.
        Normally overrides any region with a telemetry droout greater than the global 
          constant quaternions.TELEMETRY_HOLE_OVERRIDE_THRESHOLD with zero-quaternions.
        Setting this true disregards this limit, and interpolates through anything.

  Return:
    data: numpy structured array.
      Names: [met, yaw, pitch, roll, quality, data_source_previous, data_source_next,
              seconds_to_previous_solution, seconds_to_next_solution]
      dtypes:
        float64: met, yaw, pitch, roll, seconds_to_previous_solution, seconds_to_next_solution
        int32:   quality, data_source_previous, data_source_next

  Note: 1Hz masking of the telemetry may lead to hidden datapoints on
    the rebuilding of the 'definitive' timestamps used to build the 'time_to_*' values.
  '''
  import ephemeris_library.quaternions as quaternions    # Does the actual interpolation work
  # Tranlates the YPR to and from Quaternions.
  import ephemeris_library.rotation_libraries.lib_quaternion as lib_quaternion

  # Make a preliminary copy of the input dtype names, as they're manipulated in this function.
  input_columns = data.dtype.names

  # Preamble & sanity checks:
  # Make sure all column are present.
  _required_columns = ['met', 'yaw', 'pitch', 'roll', 'quality', 'data_source_previous',
                       'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']
  for _column in _required_columns:
    if _column not in data.dtype.names:
      raise ValueError("interpolate_ypr - Missing a required column in the data structure: {:s}".format(_column))

  # Extract the YPR for translation into Quaternions.
  YPR_data = recfunctions.structured_to_unstructured(data[['yaw', 'pitch', 'roll']], dtype=float)
  # Recast as an unstructured array for the use by lib_quaternion library, as it does not (yet) allow a structured set.
  # Format is of < Yaw, Pitch, Roll >, all of floats.

  # Translate the YPR data into the CRF->Body quaternions.
  quat_data = lib_quaternion.YPR_to_Quaternion(YPR_data)  # <Y,P,R> -> <r,i,j,k> Does not require MET.
  # the YPR->Quat function allows for 999-YPR values to be given to it, returning crap.
  # We need to keep the met/metadata lines for extrapolation purposes, so override any 999-YPR data with zero-quaternions
  quat_data[YPR_data[:, 0] > 360.] = 0   # If YAW > 360* (==999, but equivalence in floats is annoying.

  # Merge the Quaternions into the data variable.
  # First strip the Data into the minimum viable dataset,
  data = data[['met', 'quality', 'data_source_previous', 'data_source_next',
               'seconds_to_previous_solution', 'seconds_to_next_solution']]
  data = data.copy()  # And make sure it's a static object, not a view.

  # And then add the new columns to the dataset:
  _names = ['r', 'i', 'j', 'k']
  _types = [float, float, float, float]
  r, i, j, k = quat_data.T  # Split into invividual columns for use below.
  data = recfunctions.append_fields(data, _names, [r, i, j, k], _types, usemask=False)

  # Re-organize the dataset for consistency.
  data = data[['met', 'r', 'i', 'j', 'k', 'quality', 'data_source_previous',
               'data_source_next', 'seconds_to_previous_solution', 'seconds_to_next_solution']]

  # Feed the intermediate 'quaternion+metadata' structure into the quaternion interpolator:
  interpolated_data = quaternions.interpolate_quaternions(data, time_list, ignore_dropout)

  # Extract the interpolated quaternions:
  output_quat = recfunctions.structured_to_unstructured(interpolated_data[['r', 'i', 'j', 'k']], dtype=float)
  output_YPR = lib_quaternion.Quaternion_to_YPR(output_quat)  # <r,i,j,k> -> <Y,P,R> - Does not require MET.

  # And if any of the returned interpolated quaternions was a zero-quat, this was within a period of dropout or extrapolation.
  # Override all zero-quaternions with a YPR value of 999.
  output_YPR[lib_quaternion._Is_Zero_Quat(output_quat)] = 999.0
  # Note the 'ignore_dropout' arg is not applied here, as it's guaranteed by the interpolator function to return
  # non-zero quaternions in non-extrapolated regions

  output_dtype = [('met', 'float64'),
                  ('yaw', 'float64'),
                  ('pitch', 'float64'),
                  ('roll', 'float64'),
                  ('quality', 'int32'),
                  ('data_source_previous', 'int32'),
                  ('data_source_next', 'int32'),
                  ('seconds_to_previous_solution', 'float64'),
                  ('seconds_to_next_solution', 'float64')]

  # As can't build type-safely using numpy.stack or numpy.column_stack,
  # build an empty structured array, then assign each array to the column.

  output_data = numpy.empty(len(output_YPR), dtype=output_dtype)

  # Then assign pointers.
  output_data['met'] = interpolated_data['met']
  output_data['yaw'] = output_YPR[:, 0]
  output_data['pitch'] = output_YPR[:, 1]
  output_data['roll'] = output_YPR[:, 2]
  output_data['quality'] = interpolated_data['quality']
  output_data['data_source_previous'] = interpolated_data['data_source_previous']
  output_data['data_source_next'] = interpolated_data['data_source_next']
  output_data['seconds_to_previous_solution'] = interpolated_data['seconds_to_previous_solution']
  output_data['seconds_to_next_solution'] = interpolated_data['seconds_to_next_solution']

  # And return!
  return output_data