Module ephemeris_library.ephemeris

Expand source code
import ephemeris_library.database as database
import ephemeris_library.common as common
from ephemeris_library.common import add_utc_column

import datetime
from typing import Union
import sys
import math

import numpy
from numpy.lib import recfunctions


# User-space functions:

def get_geo_ephemeris(start_time: Union[datetime.datetime, str, int, float],
                      end_time: Union[datetime.datetime, str, int, float],
                      position: bool = True,
                      velocity: bool = True):
  '''
  Get position and velocity information in geo coordinates.

  Args:
    start_time (int, float, str, datetime): int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time (int, float, str, datetime): int, float, or string representing met time.
              datetime object or iso formatted date string.

    position (bool): Return position information. Default True

    velocity (bool): Return velocity information. Default True

  Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

    | Column  | Type           | Condition |
    |:------  |:----           |:--------- |
    | met     | float64        | Always |
    | geo_x   | float64        | if position |
    | geo_y   | float64        | if position |
    | geo_z   | float64        | if position |
    | geo_vx  | float64        | if velocity |
    | geo_vy  | float64        | if velocity |
    | geo_vz  | float64        | if velocity |


  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ephemeris(start_time, end_time, 'geo', position=position, velocity=velocity)

  return data


def get_j2k_ephemeris(start_time: Union[datetime.datetime, str, int, float],
                      end_time: Union[datetime.datetime, str, int, float],
                      position: bool = True,
                      velocity: bool = True):
  '''
  Get position and velocity information in j2k coordinates.

  Args:
    start_time (int, float, str, datetime): int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time (int, float, str, datetime): int, float, or string representing met time.
              datetime object or iso formatted date string.

    position (bool): Return position information. Default True

    velocity (bool): Return velocity information. Default True

  Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

    | Column  | Type           | Condition |
    |:------  |:----           |:--------- |
    | met     | float64        | Always |
    | j2k_x   | float64        | if position |
    | j2k_y   | float64        | if position |
    | j2k_z   | float64        | if position |
    | j2k_vx  | float64        | if velocity |
    | j2k_vy  | float64        | if velocity |
    | j2k_vz  | float64        | if velocity |


  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ephemeris(start_time, end_time, 'j2k', position=position, velocity=velocity)

  return data


def get_predicted_ephemeris(start_time: Union[datetime.datetime, str, int, float],
                            end_time: Union[datetime.datetime, str, int, float],
                            position: bool = True,
                            velocity: bool = True):
  '''
  Get predicted position and velocity information.
  Predicted information is only availabile in geo coordinates.

  Args:
    start_time (int, float, str, datetime): int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time (int, float, str, datetime): int, float, or string representing met time.
              datetime object or iso formatted date string.

    position (bool): Return position information. Default True

    velocity (bool): Return velocity information. Default True

  Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

    | Column  | Type           | Condition |
    |:------  |:----           |:--------- |
    | met     | float64        | Always |
    | j2k_x   | float64        | if position |
    | j2k_y   | float64        | if position |
    | j2k_z   | float64        | if position |
    | j2k_vx  | float64        | if velocity |
    | j2k_vy  | float64        | if velocity |
    | j2k_vz  | float64        | if velocity |

  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second.
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ephemeris(start_time, end_time, 'predicted', position=position, velocity=velocity)

  return data


def get_ephemeris_range(get_definitive: bool = True,
                        get_predicted: bool = True,
                        return_met: bool = False):
  '''
  Get time-availability information from the ephemeris database.
  Pulls from the 

  Args:
    get_definitive: bool. Fetches the start/stop pair for the Definitive Ephemeris range.
    get_predicted:  bool. Fetches the start/stop pair for the Predicted Ephemeris range.
    return_met:     bool. If True, returns timestamps in MET. If False, datetime.

  Returns:
    if get_definitive & get_predicted: returns [ Def_Start, Def_Stop, Pred_Start, Pred_Stop ]
    if get_definitive: returns [ Def_Start,  Def_Stop ]
    if get_predicted:  returns [ Pred_Start, Pred_Stop ]
  '''
  if not(get_definitive or get_predicted):
    raise ValueError("One of get_definitive or get_predicted arg must be True!")

  query_list = []
  if get_definitive:
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` ASC  LIMIT 1;")
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` DESC LIMIT 1;")
  if get_predicted:
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` ASC  LIMIT 1;")
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` DESC LIMIT 1;")

  return_data = []

  # Fetch the data.
  connection = database._connect_to_database()
  cursor = connection.cursor()
  for query in query_list:
    cursor.execute(query)
    return_data.append(cursor.fetchall()[0][0])  # List of tuple -> Tuple -> Object.

  cursor.close()
  connection.close()

  if return_met == False:  # User wants datetime:
    return_data = [datetime.datetime(1968, 5, 24) + datetime.timedelta(seconds=met) for met in return_data]

  return return_data


def add_altitude_column(data: numpy.array, altitude_above_earth: bool = False):
  '''
  Add altitude column to numpy structured array.
  Altitude is returned in meters.

  Args:
    data (numpy.array): Input data, must include position *_x/y/z columns for calculation (geo, j2k, or both).

    altitude_above_earth (bool): Subtracts the radius of earth from altitude Calculated from WGS84. (Default False)

  Returns:
    Input array with the altitude(km) column added.

    If geo position columns are present, the returned structure has a column 'geo_altitude'
    If j2k position columns are present, the returned structure has a column 'j2k_altitude'
    If both geo and j2k columns are present, two altitude columns are appended.

  '''

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  data_has_geo = ('geo_x' in input_columns
                  and 'geo_y' in input_columns
                  and 'geo_z' in input_columns)

  data_has_j2k = ('j2k_x' in input_columns
                  and 'j2k_y' in input_columns
                  and 'j2k_z' in input_columns)

  if not (data_has_geo or data_has_j2k):
    raise ValueError('Input data expected one or both sets of geo_x/y/z or j2k_x/y/z inputs.')

  # Calculate altitude from position.
  if data_has_geo:
    geo_altitude = (data['geo_x']**2 + data['geo_y']**2 + data['geo_z']**2) ** 0.5
  if data_has_j2k:
    j2k_altitude = (data['j2k_x']**2 + data['j2k_y']**2 + data['j2k_z']**2) ** 0.5

  # If needed, calcuate & subtract the WGS84 spherioid.
  if altitude_above_earth:
    # Pre-calculate the WGS84 altitudes.
    if data_has_geo:  # Prefer GEO, as is what defines WGS84.
      wgs84_radius = common.get_wgs84_earth_radius_meters(data['geo_x'], data['geo_y'], data['geo_z'])
    else:
      # J2K itself is also fine, as effective latitude will only be effected by Polar-Tilt, Precession & Nutation, which are relatively tiny.
      # Should the accuracy of the J2K/GEO conversion be improved, pre-rotating into GEO may improve results.
      wgs84_radius = common.get_wgs84_earth_radius_meters(data['j2k_x'], data['j2k_y'], data['j2k_z'])

    if data_has_geo:
      geo_altitude -= wgs84_radius
    if data_has_j2k:
      j2k_altitude -= wgs84_radius

  # And then attach the data to the given set.
  # First purge the new column from the old data, if needed,
  # Then append the new column.
  if data_has_geo:
    data = data[[column for column in data.dtype.names if column != 'geo_altitude']]
    data = recfunctions.append_fields(data, 'geo_altitude', geo_altitude, dtypes=['float64'], usemask=False)

  if data_has_j2k:
    data = data[[column for column in data.dtype.names if column != 'j2k_altitude']]
    data = recfunctions.append_fields(data, 'j2k_altitude', j2k_altitude, dtypes=['float64'], usemask=False)

  # And then make sure to return a nominal copy, rather than a view.
  if data.base is not None:
    data = data.copy()

  return data


def add_apogee_perigee_column(data: numpy.ndarray):
  '''
  Add apogee/perigee column to numpy structured array.

  Args:
    data (numpy.array): Input data, must include altitude column for calculation (geo_altitude, or j2k_altitude).
      If both are present, geo_altitude is used for the calculation.

  Returns:
    Input array with the apogee_perigee column added. Values in the column are ['ascending', 'descending', 'apogee', 'perigee', 'None']
    'None' values are used for the first and last element of the array, because those values are not deterministic.
  '''

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  data_has_geo = 'geo_altitude' in input_columns
  data_has_j2k = 'j2k_altitude' in input_columns

  if data_has_geo:
    altitude_column = 'geo_altitude'
  elif data_has_j2k:
    altitude_column = 'j2k_altitude'
  else:
    raise ValueError('Altitude column not found. Expected at least one of [geo_altitude, j2k_altitude].')

  if len(data[altitude_column]) > 2:
    altitude_delta = numpy.diff(data[altitude_column])
    delta_to_prev = altitude_delta[:-1]
    delta_to_next = altitude_delta[1:]

    is_ascending = delta_to_prev >= 0
    is_descending = delta_to_next <= 0

    is_apogee = numpy.logical_and(is_ascending, is_descending)
    is_perigee = numpy.logical_and(~is_ascending, ~is_descending)

    apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10')
    apogee_perigee_col[0] = 'None'
    apogee_perigee_col[-1] = 'None'

    apogee_perigee_col[1:-1][is_ascending] = 'ascending'
    apogee_perigee_col[1:-1][is_descending] = 'descending'
    apogee_perigee_col[1:-1][is_apogee] = 'apogee'
    apogee_perigee_col[1:-1][is_perigee] = 'perigee'

  else:
    apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10')
    apogee_perigee_col[:] = 'None'

  data = recfunctions.append_fields(data, 'apogee_perigee', apogee_perigee_col, dtypes=['U10'], usemask=False)

  # And then make sure to return a nominal copy, rather than a view.
  if data.base is not None:
    data = data.copy()

  return data


def interpolate_ephemeris(data, time_list):
  '''
  Interpolate position/velocity data.
  Return data structure replaces 'met' column.
  Will decimate if overfitting is detected.

  Arguments:
    data: Structrued Array. Only considers 'met' & [geo|j2k]_[x|y|z|vx|vy|vz] columns
    time_list: list of MET-representation of time.

  Return:
    numpy structured array: [ met, [*_x], [*_y], [*_z], [*_vx], [*_vy], [*_vz]]
      dtypes=[float64, float64, float64, float64, float64, float64, float64 ]
      Will only return each column if it was included in the initial data.
      Will discard all other
  '''

  # Import the interpolation function.
  from scipy.interpolate import CubicSpline

  # To make the inteprolation easier on the overfitting checks,
  # For each frame given, if only given one of Position or Velocity,
  # Populate the other with zeroes. These false columns will be filtered later.

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  _data_has_geo = 'geo_x' in input_columns or 'geo_vx' in input_columns
  _data_has_j2k = 'j2k_x' in input_columns or 'j2k_vx' in input_columns

  # Sanity check: Does not accept both.
  if _data_has_geo and _data_has_j2k:
    raise ValueError("intepolate_ephemeris only accepts one frame of either 'geo' or 'j2k'! Cannot accept both.")
  if not (_data_has_geo or _data_has_j2k):
    raise ValueError(
        "intepolate_ephemeris given data arg must contain one of the following: geo_x, geo_vx, j2k_x, j2k_vx as to tell the interpolation what frame to interpolate!")

  if _data_has_geo:
    frame = 'geo'
  else:
    frame = 'j2k'

  col_names = "{:s}_x {:s}_y {:s}_z {:s}_vx {:s}_vy {:s}_vz".format(frame, frame, frame, frame, frame, frame).split()

  input_met = data['met']
  # Pre-populate the dataset with zeros, override as needed.
  input_eph = numpy.zeros((len(data['met']), 6), dtype=float)  # -> Builds a Nx6 array, one for each pos & vel x/y/z.

  if col_names[0] in input_columns:
    input_eph[:, 0] = data[col_names[0]]
  if col_names[1] in input_columns:
    input_eph[:, 1] = data[col_names[1]]
  if col_names[2] in input_columns:
    input_eph[:, 2] = data[col_names[2]]
  if col_names[3] in input_columns:
    input_eph[:, 3] = data[col_names[3]]
  if col_names[4] in input_columns:
    input_eph[:, 4] = data[col_names[4]]
  if col_names[5] in input_columns:
    input_eph[:, 5] = data[col_names[5]]

  # Run the initial interpolation.
  output_met = time_list
  output_eph = CubicSpline(input_met, input_eph)(output_met)
  # <MET> <pX,pY,pX,vX,vY,vZ> -> <pX,pY,pX,vX,vY,vZ>

  # Fix overfitting:
  # If any geocentric altitude > 9,000 km, or velocity over 9 km/s, then halve the input ephemeris data.

  while numpy.any(numpy.sum(output_eph[:, :3]**2, axis=1)**0.5 > 9e6) \
          or numpy.any(numpy.sum(output_eph[:, 3:]**2, axis=1)**0.5 > 9e3):
    print("Warning - Ephemeris Interpolation overfit. Halving input data.", file=sys.stderr)

    # Halve the inputs, keeping the first & last entries to avoid extrapolation.
    input_met = numpy.concatenate((input_met[:1], input_met[2:-2:2], input_met[-1:]))
    input_eph = numpy.concatenate((input_eph[:1], input_eph[2:-2:2], input_eph[-1:]))

    output_eph = CubicSpline(input_met, input_eph)(output_met)

  # Then prepend the MET column to the data set
  output_data = numpy.column_stack((output_met, output_eph))

  # Then rebuild a structured array, with a fully populated column set
  output_data = recfunctions.unstructured_to_structured(output_data, names=['met'] + col_names)

  # And then filter any columns that weren't in the initial dataset.
  output_data = output_data[[col for col in output_data.dtype.names if col in input_columns]]

  # And to make sure it's returning a new copy, not a view, test if it's not a unique base:
  if output_data.base is not None:
    output_data = output_data.copy()

  return output_data

######################
# Systems-space functions.


def _get_ephemeris(start_met: int,
                   end_met: int,
                   frame: str,
                   position: bool = False,
                   velocity: bool = False):
  '''
  Get position and velocity information from database for a specific frame.

  Arguments:
    start_met: float representing met time.
    end_met: float representing met time.
    frame: 'geo' or 'j2k' or 'predicted'
    position: (bool) Get position information. Default False.
    velocity: (bool) Get velocity information. Default False.

  Return:
    numpy structured array: [(met, <frame>_x, <frame>_y, <frame>_z, <frame>_vx, <frame>_vy, <frame>_vz),
                              dtype=(float64, float64, float64, float64, float64, float64, float64)]
  '''
  frame = frame.lower()

  if frame == 'geo':
    db_table = 'geo'
    prefix = 'geo'

  elif frame == 'j2k':
    db_table = 'j2k'
    prefix = 'j2k'

  elif frame == 'predicted':
    db_table = 'predicted_geo'
    prefix = 'geo'

  else:
    raise ValueError("Expected frame values are ['geo', 'j2k', 'predicted'].")

  fields = ['met']
  if position:
    fields.extend(['x', 'y', 'z'])
  if velocity:
    fields.extend(['vx', 'vy', 'vz'])

  database_fields = ['`' + f + '`' for f in fields]

  # connect to database and fetch data
  statement = ('SELECT ' + ', '.join(database_fields)
               + ' FROM `cassiope_ephemeris`.`' + db_table + '`'
               + ' WHERE `met` BETWEEN ' + str(start_met) + ' AND ' + str(end_met)
               + ' ORDER BY `met` ASC')
  connection = database._connect_to_database()
  cursor = connection.cursor()
  cursor.execute(statement)
  data = cursor.fetchall()
  cursor.close()
  connection.close()

  # return results
  dtype_list = [('met', 'float64')]
  if position:
    dtype_list.extend([('_'.join([prefix, 'x']), 'float64'),
                       ('_'.join([prefix, 'y']), 'float64'),
                       ('_'.join([prefix, 'z']), 'float64')])
  if velocity:
    dtype_list.extend([('_'.join([prefix, 'vx']), 'float64'),
                       ('_'.join([prefix, 'vy']), 'float64'),
                       ('_'.join([prefix, 'vz']), 'float64')])

  results = numpy.array(data, dtype=dtype_list)
  return results

Functions

def add_altitude_column(data: , altitude_above_earth: bool = False)

Add altitude column to numpy structured array. Altitude is returned in meters.

Args

data : numpy.array
Input data, must include position *_x/y/z columns for calculation (geo, j2k, or both).
altitude_above_earth : bool
Subtracts the radius of earth from altitude Calculated from WGS84. (Default False)

Returns

Input array with the altitude(km) column added.

If geo position columns are present, the returned structure has a column 'geo_altitude' If j2k position columns are present, the returned structure has a column 'j2k_altitude' If both geo and j2k columns are present, two altitude columns are appended.

Expand source code
def add_altitude_column(data: numpy.array, altitude_above_earth: bool = False):
  '''
  Add altitude column to numpy structured array.
  Altitude is returned in meters.

  Args:
    data (numpy.array): Input data, must include position *_x/y/z columns for calculation (geo, j2k, or both).

    altitude_above_earth (bool): Subtracts the radius of earth from altitude Calculated from WGS84. (Default False)

  Returns:
    Input array with the altitude(km) column added.

    If geo position columns are present, the returned structure has a column 'geo_altitude'
    If j2k position columns are present, the returned structure has a column 'j2k_altitude'
    If both geo and j2k columns are present, two altitude columns are appended.

  '''

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  data_has_geo = ('geo_x' in input_columns
                  and 'geo_y' in input_columns
                  and 'geo_z' in input_columns)

  data_has_j2k = ('j2k_x' in input_columns
                  and 'j2k_y' in input_columns
                  and 'j2k_z' in input_columns)

  if not (data_has_geo or data_has_j2k):
    raise ValueError('Input data expected one or both sets of geo_x/y/z or j2k_x/y/z inputs.')

  # Calculate altitude from position.
  if data_has_geo:
    geo_altitude = (data['geo_x']**2 + data['geo_y']**2 + data['geo_z']**2) ** 0.5
  if data_has_j2k:
    j2k_altitude = (data['j2k_x']**2 + data['j2k_y']**2 + data['j2k_z']**2) ** 0.5

  # If needed, calcuate & subtract the WGS84 spherioid.
  if altitude_above_earth:
    # Pre-calculate the WGS84 altitudes.
    if data_has_geo:  # Prefer GEO, as is what defines WGS84.
      wgs84_radius = common.get_wgs84_earth_radius_meters(data['geo_x'], data['geo_y'], data['geo_z'])
    else:
      # J2K itself is also fine, as effective latitude will only be effected by Polar-Tilt, Precession & Nutation, which are relatively tiny.
      # Should the accuracy of the J2K/GEO conversion be improved, pre-rotating into GEO may improve results.
      wgs84_radius = common.get_wgs84_earth_radius_meters(data['j2k_x'], data['j2k_y'], data['j2k_z'])

    if data_has_geo:
      geo_altitude -= wgs84_radius
    if data_has_j2k:
      j2k_altitude -= wgs84_radius

  # And then attach the data to the given set.
  # First purge the new column from the old data, if needed,
  # Then append the new column.
  if data_has_geo:
    data = data[[column for column in data.dtype.names if column != 'geo_altitude']]
    data = recfunctions.append_fields(data, 'geo_altitude', geo_altitude, dtypes=['float64'], usemask=False)

  if data_has_j2k:
    data = data[[column for column in data.dtype.names if column != 'j2k_altitude']]
    data = recfunctions.append_fields(data, 'j2k_altitude', j2k_altitude, dtypes=['float64'], usemask=False)

  # And then make sure to return a nominal copy, rather than a view.
  if data.base is not None:
    data = data.copy()

  return data
def add_apogee_perigee_column(data: numpy.ndarray)

Add apogee/perigee column to numpy structured array.

Args

data : numpy.array
Input data, must include altitude column for calculation (geo_altitude, or j2k_altitude). If both are present, geo_altitude is used for the calculation.

Returns

Input array with the apogee_perigee column added. Values in the column are ['ascending', 'descending', 'apogee', 'perigee', 'None'] 'None' values are used for the first and last element of the array, because those values are not deterministic.

Expand source code
def add_apogee_perigee_column(data: numpy.ndarray):
  '''
  Add apogee/perigee column to numpy structured array.

  Args:
    data (numpy.array): Input data, must include altitude column for calculation (geo_altitude, or j2k_altitude).
      If both are present, geo_altitude is used for the calculation.

  Returns:
    Input array with the apogee_perigee column added. Values in the column are ['ascending', 'descending', 'apogee', 'perigee', 'None']
    'None' values are used for the first and last element of the array, because those values are not deterministic.
  '''

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  data_has_geo = 'geo_altitude' in input_columns
  data_has_j2k = 'j2k_altitude' in input_columns

  if data_has_geo:
    altitude_column = 'geo_altitude'
  elif data_has_j2k:
    altitude_column = 'j2k_altitude'
  else:
    raise ValueError('Altitude column not found. Expected at least one of [geo_altitude, j2k_altitude].')

  if len(data[altitude_column]) > 2:
    altitude_delta = numpy.diff(data[altitude_column])
    delta_to_prev = altitude_delta[:-1]
    delta_to_next = altitude_delta[1:]

    is_ascending = delta_to_prev >= 0
    is_descending = delta_to_next <= 0

    is_apogee = numpy.logical_and(is_ascending, is_descending)
    is_perigee = numpy.logical_and(~is_ascending, ~is_descending)

    apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10')
    apogee_perigee_col[0] = 'None'
    apogee_perigee_col[-1] = 'None'

    apogee_perigee_col[1:-1][is_ascending] = 'ascending'
    apogee_perigee_col[1:-1][is_descending] = 'descending'
    apogee_perigee_col[1:-1][is_apogee] = 'apogee'
    apogee_perigee_col[1:-1][is_perigee] = 'perigee'

  else:
    apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10')
    apogee_perigee_col[:] = 'None'

  data = recfunctions.append_fields(data, 'apogee_perigee', apogee_perigee_col, dtypes=['U10'], usemask=False)

  # And then make sure to return a nominal copy, rather than a view.
  if data.base is not None:
    data = data.copy()

  return data
def get_ephemeris_range(get_definitive: bool = True, get_predicted: bool = True, return_met: bool = False)

Get time-availability information from the ephemeris database. Pulls from the

Args

get_definitive
bool. Fetches the start/stop pair for the Definitive Ephemeris range.
get_predicted
bool. Fetches the start/stop pair for the Predicted Ephemeris range.
return_met

bool. If True, returns timestamps in MET. If False, datetime.

Returns

if get_definitive & get_predicted: returns [ Def_Start, Def_Stop, Pred_Start, Pred_Stop ]
if get_definitive
returns [ Def_Start, Def_Stop ]
if get_predicted
returns [ Pred_Start, Pred_Stop ]
Expand source code
def get_ephemeris_range(get_definitive: bool = True,
                        get_predicted: bool = True,
                        return_met: bool = False):
  '''
  Get time-availability information from the ephemeris database.
  Pulls from the 

  Args:
    get_definitive: bool. Fetches the start/stop pair for the Definitive Ephemeris range.
    get_predicted:  bool. Fetches the start/stop pair for the Predicted Ephemeris range.
    return_met:     bool. If True, returns timestamps in MET. If False, datetime.

  Returns:
    if get_definitive & get_predicted: returns [ Def_Start, Def_Stop, Pred_Start, Pred_Stop ]
    if get_definitive: returns [ Def_Start,  Def_Stop ]
    if get_predicted:  returns [ Pred_Start, Pred_Stop ]
  '''
  if not(get_definitive or get_predicted):
    raise ValueError("One of get_definitive or get_predicted arg must be True!")

  query_list = []
  if get_definitive:
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` ASC  LIMIT 1;")
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` DESC LIMIT 1;")
  if get_predicted:
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` ASC  LIMIT 1;")
    query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` DESC LIMIT 1;")

  return_data = []

  # Fetch the data.
  connection = database._connect_to_database()
  cursor = connection.cursor()
  for query in query_list:
    cursor.execute(query)
    return_data.append(cursor.fetchall()[0][0])  # List of tuple -> Tuple -> Object.

  cursor.close()
  connection.close()

  if return_met == False:  # User wants datetime:
    return_data = [datetime.datetime(1968, 5, 24) + datetime.timedelta(seconds=met) for met in return_data]

  return return_data
def get_geo_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True)

Get position and velocity information in geo coordinates.

Args

start_time : int, float, str, datetime
int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time : int, float, str, datetime
int, float, or string representing met time. datetime object or iso formatted date string.
position : bool
Return position information. Default True
velocity : bool
Return velocity information. Default True

Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

Column Type Condition
met float64 Always
geo_x float64 if position
geo_y float64 if position
geo_z float64 if position
geo_vx float64 if velocity
geo_vy float64 if velocity
geo_vz float64 if velocity
Expand source code
def get_geo_ephemeris(start_time: Union[datetime.datetime, str, int, float],
                      end_time: Union[datetime.datetime, str, int, float],
                      position: bool = True,
                      velocity: bool = True):
  '''
  Get position and velocity information in geo coordinates.

  Args:
    start_time (int, float, str, datetime): int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time (int, float, str, datetime): int, float, or string representing met time.
              datetime object or iso formatted date string.

    position (bool): Return position information. Default True

    velocity (bool): Return velocity information. Default True

  Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

    | Column  | Type           | Condition |
    |:------  |:----           |:--------- |
    | met     | float64        | Always |
    | geo_x   | float64        | if position |
    | geo_y   | float64        | if position |
    | geo_z   | float64        | if position |
    | geo_vx  | float64        | if velocity |
    | geo_vy  | float64        | if velocity |
    | geo_vz  | float64        | if velocity |


  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ephemeris(start_time, end_time, 'geo', position=position, velocity=velocity)

  return data
def get_j2k_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True)

Get position and velocity information in j2k coordinates.

Args

start_time : int, float, str, datetime
int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time : int, float, str, datetime
int, float, or string representing met time. datetime object or iso formatted date string.
position : bool
Return position information. Default True
velocity : bool
Return velocity information. Default True

Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

Column Type Condition
met float64 Always
j2k_x float64 if position
j2k_y float64 if position
j2k_z float64 if position
j2k_vx float64 if velocity
j2k_vy float64 if velocity
j2k_vz float64 if velocity
Expand source code
def get_j2k_ephemeris(start_time: Union[datetime.datetime, str, int, float],
                      end_time: Union[datetime.datetime, str, int, float],
                      position: bool = True,
                      velocity: bool = True):
  '''
  Get position and velocity information in j2k coordinates.

  Args:
    start_time (int, float, str, datetime): int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time (int, float, str, datetime): int, float, or string representing met time.
              datetime object or iso formatted date string.

    position (bool): Return position information. Default True

    velocity (bool): Return velocity information. Default True

  Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

    | Column  | Type           | Condition |
    |:------  |:----           |:--------- |
    | met     | float64        | Always |
    | j2k_x   | float64        | if position |
    | j2k_y   | float64        | if position |
    | j2k_z   | float64        | if position |
    | j2k_vx  | float64        | if velocity |
    | j2k_vy  | float64        | if velocity |
    | j2k_vz  | float64        | if velocity |


  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ephemeris(start_time, end_time, 'j2k', position=position, velocity=velocity)

  return data
def get_predicted_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True)

Get predicted position and velocity information. Predicted information is only availabile in geo coordinates.

Args

start_time : int, float, str, datetime
int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time : int, float, str, datetime
int, float, or string representing met time. datetime object or iso formatted date string.
position : bool
Return position information. Default True
velocity : bool
Return velocity information. Default True

Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

Column Type Condition
met float64 Always
j2k_x float64 if position
j2k_y float64 if position
j2k_z float64 if position
j2k_vx float64 if velocity
j2k_vy float64 if velocity
j2k_vz float64 if velocity
Expand source code
def get_predicted_ephemeris(start_time: Union[datetime.datetime, str, int, float],
                            end_time: Union[datetime.datetime, str, int, float],
                            position: bool = True,
                            velocity: bool = True):
  '''
  Get predicted position and velocity information.
  Predicted information is only availabile in geo coordinates.

  Args:
    start_time (int, float, str, datetime): int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time (int, float, str, datetime): int, float, or string representing met time.
              datetime object or iso formatted date string.

    position (bool): Return position information. Default True

    velocity (bool): Return velocity information. Default True

  Returns: A numpy structured array with columns representing the requested position and/or velocity columns.

    | Column  | Type           | Condition |
    |:------  |:----           |:--------- |
    | met     | float64        | Always |
    | j2k_x   | float64        | if position |
    | j2k_y   | float64        | if position |
    | j2k_z   | float64        | if position |
    | j2k_vx  | float64        | if velocity |
    | j2k_vy  | float64        | if velocity |
    | j2k_vz  | float64        | if velocity |

  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second.
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_ephemeris(start_time, end_time, 'predicted', position=position, velocity=velocity)

  return data
def interpolate_ephemeris(data, time_list)

Interpolate position/velocity data. Return data structure replaces 'met' column. Will decimate if overfitting is detected.

Arguments

data: Structrued Array. Only considers 'met' & [geo|j2k]_[x|y|z|vx|vy|vz] columns time_list: list of MET-representation of time.

Return

numpy structured array: [ met, [_x], [_y], [_z], [_vx], [_vy], [_vz]] dtypes=[float64, float64, float64, float64, float64, float64, float64 ] Will only return each column if it was included in the initial data. Will discard all other

Expand source code
def interpolate_ephemeris(data, time_list):
  '''
  Interpolate position/velocity data.
  Return data structure replaces 'met' column.
  Will decimate if overfitting is detected.

  Arguments:
    data: Structrued Array. Only considers 'met' & [geo|j2k]_[x|y|z|vx|vy|vz] columns
    time_list: list of MET-representation of time.

  Return:
    numpy structured array: [ met, [*_x], [*_y], [*_z], [*_vx], [*_vy], [*_vz]]
      dtypes=[float64, float64, float64, float64, float64, float64, float64 ]
      Will only return each column if it was included in the initial data.
      Will discard all other
  '''

  # Import the interpolation function.
  from scipy.interpolate import CubicSpline

  # To make the inteprolation easier on the overfitting checks,
  # For each frame given, if only given one of Position or Velocity,
  # Populate the other with zeroes. These false columns will be filtered later.

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  _data_has_geo = 'geo_x' in input_columns or 'geo_vx' in input_columns
  _data_has_j2k = 'j2k_x' in input_columns or 'j2k_vx' in input_columns

  # Sanity check: Does not accept both.
  if _data_has_geo and _data_has_j2k:
    raise ValueError("intepolate_ephemeris only accepts one frame of either 'geo' or 'j2k'! Cannot accept both.")
  if not (_data_has_geo or _data_has_j2k):
    raise ValueError(
        "intepolate_ephemeris given data arg must contain one of the following: geo_x, geo_vx, j2k_x, j2k_vx as to tell the interpolation what frame to interpolate!")

  if _data_has_geo:
    frame = 'geo'
  else:
    frame = 'j2k'

  col_names = "{:s}_x {:s}_y {:s}_z {:s}_vx {:s}_vy {:s}_vz".format(frame, frame, frame, frame, frame, frame).split()

  input_met = data['met']
  # Pre-populate the dataset with zeros, override as needed.
  input_eph = numpy.zeros((len(data['met']), 6), dtype=float)  # -> Builds a Nx6 array, one for each pos & vel x/y/z.

  if col_names[0] in input_columns:
    input_eph[:, 0] = data[col_names[0]]
  if col_names[1] in input_columns:
    input_eph[:, 1] = data[col_names[1]]
  if col_names[2] in input_columns:
    input_eph[:, 2] = data[col_names[2]]
  if col_names[3] in input_columns:
    input_eph[:, 3] = data[col_names[3]]
  if col_names[4] in input_columns:
    input_eph[:, 4] = data[col_names[4]]
  if col_names[5] in input_columns:
    input_eph[:, 5] = data[col_names[5]]

  # Run the initial interpolation.
  output_met = time_list
  output_eph = CubicSpline(input_met, input_eph)(output_met)
  # <MET> <pX,pY,pX,vX,vY,vZ> -> <pX,pY,pX,vX,vY,vZ>

  # Fix overfitting:
  # If any geocentric altitude > 9,000 km, or velocity over 9 km/s, then halve the input ephemeris data.

  while numpy.any(numpy.sum(output_eph[:, :3]**2, axis=1)**0.5 > 9e6) \
          or numpy.any(numpy.sum(output_eph[:, 3:]**2, axis=1)**0.5 > 9e3):
    print("Warning - Ephemeris Interpolation overfit. Halving input data.", file=sys.stderr)

    # Halve the inputs, keeping the first & last entries to avoid extrapolation.
    input_met = numpy.concatenate((input_met[:1], input_met[2:-2:2], input_met[-1:]))
    input_eph = numpy.concatenate((input_eph[:1], input_eph[2:-2:2], input_eph[-1:]))

    output_eph = CubicSpline(input_met, input_eph)(output_met)

  # Then prepend the MET column to the data set
  output_data = numpy.column_stack((output_met, output_eph))

  # Then rebuild a structured array, with a fully populated column set
  output_data = recfunctions.unstructured_to_structured(output_data, names=['met'] + col_names)

  # And then filter any columns that weren't in the initial dataset.
  output_data = output_data[[col for col in output_data.dtype.names if col in input_columns]]

  # And to make sure it's returning a new copy, not a view, test if it's not a unique base:
  if output_data.base is not None:
    output_data = output_data.copy()

  return output_data