Module ephemeris_library.ephemeris
Expand source code
import ephemeris_library.database as database
import ephemeris_library.common as common
from ephemeris_library.common import add_utc_column
import datetime
from typing import Union
import sys
import math
import numpy
from numpy.lib import recfunctions
# User-space functions:
def get_geo_ephemeris(start_time: Union[datetime.datetime, str, int, float],
end_time: Union[datetime.datetime, str, int, float],
position: bool = True,
velocity: bool = True):
'''
Get position and velocity information in geo coordinates.
Args:
start_time (int, float, str, datetime): int, float, or string representing met time.
datetime object or iso formatted date string.
Time will be rounded down to start of second.
end_time (int, float, str, datetime): int, float, or string representing met time.
datetime object or iso formatted date string.
position (bool): Return position information. Default True
velocity (bool): Return velocity information. Default True
Returns: A numpy structured array with columns representing the requested position and/or velocity columns.
| Column | Type | Condition |
|:------ |:---- |:--------- |
| met | float64 | Always |
| geo_x | float64 | if position |
| geo_y | float64 | if position |
| geo_z | float64 | if position |
| geo_vx | float64 | if velocity |
| geo_vy | float64 | if velocity |
| geo_vz | float64 | if velocity |
'''
# Check version
common._check_version()
# verify time inputs
start_time = common._verify_time(start_time)
end_time = common._verify_time(end_time)
# Round start time down to start of second
start_time = int(start_time)
if end_time < start_time:
raise ValueError('Error: end_time before start_time.')
data = _get_ephemeris(start_time, end_time, 'geo', position=position, velocity=velocity)
return data
def get_j2k_ephemeris(start_time: Union[datetime.datetime, str, int, float],
end_time: Union[datetime.datetime, str, int, float],
position: bool = True,
velocity: bool = True):
'''
Get position and velocity information in j2k coordinates.
Args:
start_time (int, float, str, datetime): int, float, or string representing met time.
datetime object or iso formatted date string.
Time will be rounded down to start of second.
end_time (int, float, str, datetime): int, float, or string representing met time.
datetime object or iso formatted date string.
position (bool): Return position information. Default True
velocity (bool): Return velocity information. Default True
Returns: A numpy structured array with columns representing the requested position and/or velocity columns.
| Column | Type | Condition |
|:------ |:---- |:--------- |
| met | float64 | Always |
| j2k_x | float64 | if position |
| j2k_y | float64 | if position |
| j2k_z | float64 | if position |
| j2k_vx | float64 | if velocity |
| j2k_vy | float64 | if velocity |
| j2k_vz | float64 | if velocity |
'''
# Check version
common._check_version()
# verify time inputs
start_time = common._verify_time(start_time)
end_time = common._verify_time(end_time)
# Round start time down to start of second
start_time = int(start_time)
if end_time < start_time:
raise ValueError('Error: end_time before start_time.')
data = _get_ephemeris(start_time, end_time, 'j2k', position=position, velocity=velocity)
return data
def get_predicted_ephemeris(start_time: Union[datetime.datetime, str, int, float],
end_time: Union[datetime.datetime, str, int, float],
position: bool = True,
velocity: bool = True):
'''
Get predicted position and velocity information.
Predicted information is only availabile in geo coordinates.
Args:
start_time (int, float, str, datetime): int, float, or string representing met time.
datetime object or iso formatted date string.
Time will be rounded down to start of second.
end_time (int, float, str, datetime): int, float, or string representing met time.
datetime object or iso formatted date string.
position (bool): Return position information. Default True
velocity (bool): Return velocity information. Default True
Returns: A numpy structured array with columns representing the requested position and/or velocity columns.
| Column | Type | Condition |
|:------ |:---- |:--------- |
| met | float64 | Always |
| j2k_x | float64 | if position |
| j2k_y | float64 | if position |
| j2k_z | float64 | if position |
| j2k_vx | float64 | if velocity |
| j2k_vy | float64 | if velocity |
| j2k_vz | float64 | if velocity |
'''
# Check version
common._check_version()
# verify time inputs
start_time = common._verify_time(start_time)
end_time = common._verify_time(end_time)
# Round start time down to start of second.
start_time = int(start_time)
if end_time < start_time:
raise ValueError('Error: end_time before start_time.')
data = _get_ephemeris(start_time, end_time, 'predicted', position=position, velocity=velocity)
return data
def get_ephemeris_range(get_definitive: bool = True,
get_predicted: bool = True,
return_met: bool = False):
'''
Get time-availability information from the ephemeris database.
Pulls from the
Args:
get_definitive: bool. Fetches the start/stop pair for the Definitive Ephemeris range.
get_predicted: bool. Fetches the start/stop pair for the Predicted Ephemeris range.
return_met: bool. If True, returns timestamps in MET. If False, datetime.
Returns:
if get_definitive & get_predicted: returns [ Def_Start, Def_Stop, Pred_Start, Pred_Stop ]
if get_definitive: returns [ Def_Start, Def_Stop ]
if get_predicted: returns [ Pred_Start, Pred_Stop ]
'''
if not(get_definitive or get_predicted):
raise ValueError("One of get_definitive or get_predicted arg must be True!")
query_list = []
if get_definitive:
query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` ASC LIMIT 1;")
query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` DESC LIMIT 1;")
if get_predicted:
query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` ASC LIMIT 1;")
query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` DESC LIMIT 1;")
return_data = []
# Fetch the data.
connection = database._connect_to_database()
cursor = connection.cursor()
for query in query_list:
cursor.execute(query)
return_data.append(cursor.fetchall()[0][0]) # List of tuple -> Tuple -> Object.
cursor.close()
connection.close()
if return_met == False: # User wants datetime:
return_data = [datetime.datetime(1968, 5, 24) + datetime.timedelta(seconds=met) for met in return_data]
return return_data
def add_altitude_column(data: numpy.array, altitude_above_earth: bool = False):
'''
Add altitude column to numpy structured array.
Altitude is returned in meters.
Args:
data (numpy.array): Input data, must include position *_x/y/z columns for calculation (geo, j2k, or both).
altitude_above_earth (bool): Subtracts the radius of earth from altitude Calculated from WGS84. (Default False)
Returns:
Input array with the altitude(km) column added.
If geo position columns are present, the returned structure has a column 'geo_altitude'
If j2k position columns are present, the returned structure has a column 'j2k_altitude'
If both geo and j2k columns are present, two altitude columns are appended.
'''
input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns.
data_has_geo = ('geo_x' in input_columns
and 'geo_y' in input_columns
and 'geo_z' in input_columns)
data_has_j2k = ('j2k_x' in input_columns
and 'j2k_y' in input_columns
and 'j2k_z' in input_columns)
if not (data_has_geo or data_has_j2k):
raise ValueError('Input data expected one or both sets of geo_x/y/z or j2k_x/y/z inputs.')
# Calculate altitude from position.
if data_has_geo:
geo_altitude = (data['geo_x']**2 + data['geo_y']**2 + data['geo_z']**2) ** 0.5
if data_has_j2k:
j2k_altitude = (data['j2k_x']**2 + data['j2k_y']**2 + data['j2k_z']**2) ** 0.5
# If needed, calcuate & subtract the WGS84 spherioid.
if altitude_above_earth:
# Pre-calculate the WGS84 altitudes.
if data_has_geo: # Prefer GEO, as is what defines WGS84.
wgs84_radius = common.get_wgs84_earth_radius_meters(data['geo_x'], data['geo_y'], data['geo_z'])
else:
# J2K itself is also fine, as effective latitude will only be effected by Polar-Tilt, Precession & Nutation, which are relatively tiny.
# Should the accuracy of the J2K/GEO conversion be improved, pre-rotating into GEO may improve results.
wgs84_radius = common.get_wgs84_earth_radius_meters(data['j2k_x'], data['j2k_y'], data['j2k_z'])
if data_has_geo:
geo_altitude -= wgs84_radius
if data_has_j2k:
j2k_altitude -= wgs84_radius
# And then attach the data to the given set.
# First purge the new column from the old data, if needed,
# Then append the new column.
if data_has_geo:
data = data[[column for column in data.dtype.names if column != 'geo_altitude']]
data = recfunctions.append_fields(data, 'geo_altitude', geo_altitude, dtypes=['float64'], usemask=False)
if data_has_j2k:
data = data[[column for column in data.dtype.names if column != 'j2k_altitude']]
data = recfunctions.append_fields(data, 'j2k_altitude', j2k_altitude, dtypes=['float64'], usemask=False)
# And then make sure to return a nominal copy, rather than a view.
if data.base is not None:
data = data.copy()
return data
def add_apogee_perigee_column(data: numpy.ndarray):
'''
Add apogee/perigee column to numpy structured array.
Args:
data (numpy.array): Input data, must include altitude column for calculation (geo_altitude, or j2k_altitude).
If both are present, geo_altitude is used for the calculation.
Returns:
Input array with the apogee_perigee column added. Values in the column are ['ascending', 'descending', 'apogee', 'perigee', 'None']
'None' values are used for the first and last element of the array, because those values are not deterministic.
'''
input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns.
data_has_geo = 'geo_altitude' in input_columns
data_has_j2k = 'j2k_altitude' in input_columns
if data_has_geo:
altitude_column = 'geo_altitude'
elif data_has_j2k:
altitude_column = 'j2k_altitude'
else:
raise ValueError('Altitude column not found. Expected at least one of [geo_altitude, j2k_altitude].')
if len(data[altitude_column]) > 2:
altitude_delta = numpy.diff(data[altitude_column])
delta_to_prev = altitude_delta[:-1]
delta_to_next = altitude_delta[1:]
is_ascending = delta_to_prev >= 0
is_descending = delta_to_next <= 0
is_apogee = numpy.logical_and(is_ascending, is_descending)
is_perigee = numpy.logical_and(~is_ascending, ~is_descending)
apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10')
apogee_perigee_col[0] = 'None'
apogee_perigee_col[-1] = 'None'
apogee_perigee_col[1:-1][is_ascending] = 'ascending'
apogee_perigee_col[1:-1][is_descending] = 'descending'
apogee_perigee_col[1:-1][is_apogee] = 'apogee'
apogee_perigee_col[1:-1][is_perigee] = 'perigee'
else:
apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10')
apogee_perigee_col[:] = 'None'
data = recfunctions.append_fields(data, 'apogee_perigee', apogee_perigee_col, dtypes=['U10'], usemask=False)
# And then make sure to return a nominal copy, rather than a view.
if data.base is not None:
data = data.copy()
return data
def interpolate_ephemeris(data, time_list):
'''
Interpolate position/velocity data.
Return data structure replaces 'met' column.
Will decimate if overfitting is detected.
Arguments:
data: Structrued Array. Only considers 'met' & [geo|j2k]_[x|y|z|vx|vy|vz] columns
time_list: list of MET-representation of time.
Return:
numpy structured array: [ met, [*_x], [*_y], [*_z], [*_vx], [*_vy], [*_vz]]
dtypes=[float64, float64, float64, float64, float64, float64, float64 ]
Will only return each column if it was included in the initial data.
Will discard all other
'''
# Import the interpolation function.
from scipy.interpolate import CubicSpline
# To make the inteprolation easier on the overfitting checks,
# For each frame given, if only given one of Position or Velocity,
# Populate the other with zeroes. These false columns will be filtered later.
input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns.
_data_has_geo = 'geo_x' in input_columns or 'geo_vx' in input_columns
_data_has_j2k = 'j2k_x' in input_columns or 'j2k_vx' in input_columns
# Sanity check: Does not accept both.
if _data_has_geo and _data_has_j2k:
raise ValueError("intepolate_ephemeris only accepts one frame of either 'geo' or 'j2k'! Cannot accept both.")
if not (_data_has_geo or _data_has_j2k):
raise ValueError(
"intepolate_ephemeris given data arg must contain one of the following: geo_x, geo_vx, j2k_x, j2k_vx as to tell the interpolation what frame to interpolate!")
if _data_has_geo:
frame = 'geo'
else:
frame = 'j2k'
col_names = "{:s}_x {:s}_y {:s}_z {:s}_vx {:s}_vy {:s}_vz".format(frame, frame, frame, frame, frame, frame).split()
input_met = data['met']
# Pre-populate the dataset with zeros, override as needed.
input_eph = numpy.zeros((len(data['met']), 6), dtype=float) # -> Builds a Nx6 array, one for each pos & vel x/y/z.
if col_names[0] in input_columns:
input_eph[:, 0] = data[col_names[0]]
if col_names[1] in input_columns:
input_eph[:, 1] = data[col_names[1]]
if col_names[2] in input_columns:
input_eph[:, 2] = data[col_names[2]]
if col_names[3] in input_columns:
input_eph[:, 3] = data[col_names[3]]
if col_names[4] in input_columns:
input_eph[:, 4] = data[col_names[4]]
if col_names[5] in input_columns:
input_eph[:, 5] = data[col_names[5]]
# Run the initial interpolation.
output_met = time_list
output_eph = CubicSpline(input_met, input_eph)(output_met)
# <MET> <pX,pY,pX,vX,vY,vZ> -> <pX,pY,pX,vX,vY,vZ>
# Fix overfitting:
# If any geocentric altitude > 9,000 km, or velocity over 9 km/s, then halve the input ephemeris data.
while numpy.any(numpy.sum(output_eph[:, :3]**2, axis=1)**0.5 > 9e6) \
or numpy.any(numpy.sum(output_eph[:, 3:]**2, axis=1)**0.5 > 9e3):
print("Warning - Ephemeris Interpolation overfit. Halving input data.", file=sys.stderr)
# Halve the inputs, keeping the first & last entries to avoid extrapolation.
input_met = numpy.concatenate((input_met[:1], input_met[2:-2:2], input_met[-1:]))
input_eph = numpy.concatenate((input_eph[:1], input_eph[2:-2:2], input_eph[-1:]))
output_eph = CubicSpline(input_met, input_eph)(output_met)
# Then prepend the MET column to the data set
output_data = numpy.column_stack((output_met, output_eph))
# Then rebuild a structured array, with a fully populated column set
output_data = recfunctions.unstructured_to_structured(output_data, names=['met'] + col_names)
# And then filter any columns that weren't in the initial dataset.
output_data = output_data[[col for col in output_data.dtype.names if col in input_columns]]
# And to make sure it's returning a new copy, not a view, test if it's not a unique base:
if output_data.base is not None:
output_data = output_data.copy()
return output_data
######################
# Systems-space functions.
def _get_ephemeris(start_met: int,
end_met: int,
frame: str,
position: bool = False,
velocity: bool = False):
'''
Get position and velocity information from database for a specific frame.
Arguments:
start_met: float representing met time.
end_met: float representing met time.
frame: 'geo' or 'j2k' or 'predicted'
position: (bool) Get position information. Default False.
velocity: (bool) Get velocity information. Default False.
Return:
numpy structured array: [(met, <frame>_x, <frame>_y, <frame>_z, <frame>_vx, <frame>_vy, <frame>_vz),
dtype=(float64, float64, float64, float64, float64, float64, float64)]
'''
frame = frame.lower()
if frame == 'geo':
db_table = 'geo'
prefix = 'geo'
elif frame == 'j2k':
db_table = 'j2k'
prefix = 'j2k'
elif frame == 'predicted':
db_table = 'predicted_geo'
prefix = 'geo'
else:
raise ValueError("Expected frame values are ['geo', 'j2k', 'predicted'].")
fields = ['met']
if position:
fields.extend(['x', 'y', 'z'])
if velocity:
fields.extend(['vx', 'vy', 'vz'])
database_fields = ['`' + f + '`' for f in fields]
# connect to database and fetch data
statement = ('SELECT ' + ', '.join(database_fields)
+ ' FROM `cassiope_ephemeris`.`' + db_table + '`'
+ ' WHERE `met` BETWEEN ' + str(start_met) + ' AND ' + str(end_met)
+ ' ORDER BY `met` ASC')
connection = database._connect_to_database()
cursor = connection.cursor()
cursor.execute(statement)
data = cursor.fetchall()
cursor.close()
connection.close()
# return results
dtype_list = [('met', 'float64')]
if position:
dtype_list.extend([('_'.join([prefix, 'x']), 'float64'),
('_'.join([prefix, 'y']), 'float64'),
('_'.join([prefix, 'z']), 'float64')])
if velocity:
dtype_list.extend([('_'.join([prefix, 'vx']), 'float64'),
('_'.join([prefix, 'vy']), 'float64'),
('_'.join([prefix, 'vz']), 'float64')])
results = numpy.array(data, dtype=dtype_list)
return results
Functions
def add_altitude_column(data:
, altitude_above_earth: bool = False) -
Add altitude column to numpy structured array. Altitude is returned in meters.
Args
data
:numpy.array
- Input data, must include position *_x/y/z columns for calculation (geo, j2k, or both).
altitude_above_earth
:bool
- Subtracts the radius of earth from altitude Calculated from WGS84. (Default False)
Returns
Input array with the altitude(km) column added.
If geo position columns are present, the returned structure has a column 'geo_altitude' If j2k position columns are present, the returned structure has a column 'j2k_altitude' If both geo and j2k columns are present, two altitude columns are appended.
Expand source code
def add_altitude_column(data: numpy.array, altitude_above_earth: bool = False): ''' Add altitude column to numpy structured array. Altitude is returned in meters. Args: data (numpy.array): Input data, must include position *_x/y/z columns for calculation (geo, j2k, or both). altitude_above_earth (bool): Subtracts the radius of earth from altitude Calculated from WGS84. (Default False) Returns: Input array with the altitude(km) column added. If geo position columns are present, the returned structure has a column 'geo_altitude' If j2k position columns are present, the returned structure has a column 'j2k_altitude' If both geo and j2k columns are present, two altitude columns are appended. ''' input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns. data_has_geo = ('geo_x' in input_columns and 'geo_y' in input_columns and 'geo_z' in input_columns) data_has_j2k = ('j2k_x' in input_columns and 'j2k_y' in input_columns and 'j2k_z' in input_columns) if not (data_has_geo or data_has_j2k): raise ValueError('Input data expected one or both sets of geo_x/y/z or j2k_x/y/z inputs.') # Calculate altitude from position. if data_has_geo: geo_altitude = (data['geo_x']**2 + data['geo_y']**2 + data['geo_z']**2) ** 0.5 if data_has_j2k: j2k_altitude = (data['j2k_x']**2 + data['j2k_y']**2 + data['j2k_z']**2) ** 0.5 # If needed, calcuate & subtract the WGS84 spherioid. if altitude_above_earth: # Pre-calculate the WGS84 altitudes. if data_has_geo: # Prefer GEO, as is what defines WGS84. wgs84_radius = common.get_wgs84_earth_radius_meters(data['geo_x'], data['geo_y'], data['geo_z']) else: # J2K itself is also fine, as effective latitude will only be effected by Polar-Tilt, Precession & Nutation, which are relatively tiny. # Should the accuracy of the J2K/GEO conversion be improved, pre-rotating into GEO may improve results. wgs84_radius = common.get_wgs84_earth_radius_meters(data['j2k_x'], data['j2k_y'], data['j2k_z']) if data_has_geo: geo_altitude -= wgs84_radius if data_has_j2k: j2k_altitude -= wgs84_radius # And then attach the data to the given set. # First purge the new column from the old data, if needed, # Then append the new column. if data_has_geo: data = data[[column for column in data.dtype.names if column != 'geo_altitude']] data = recfunctions.append_fields(data, 'geo_altitude', geo_altitude, dtypes=['float64'], usemask=False) if data_has_j2k: data = data[[column for column in data.dtype.names if column != 'j2k_altitude']] data = recfunctions.append_fields(data, 'j2k_altitude', j2k_altitude, dtypes=['float64'], usemask=False) # And then make sure to return a nominal copy, rather than a view. if data.base is not None: data = data.copy() return data
def add_apogee_perigee_column(data: numpy.ndarray)
-
Add apogee/perigee column to numpy structured array.
Args
data
:numpy.array
- Input data, must include altitude column for calculation (geo_altitude, or j2k_altitude). If both are present, geo_altitude is used for the calculation.
Returns
Input array with the apogee_perigee column added. Values in the column are ['ascending', 'descending', 'apogee', 'perigee', 'None'] 'None' values are used for the first and last element of the array, because those values are not deterministic.
Expand source code
def add_apogee_perigee_column(data: numpy.ndarray): ''' Add apogee/perigee column to numpy structured array. Args: data (numpy.array): Input data, must include altitude column for calculation (geo_altitude, or j2k_altitude). If both are present, geo_altitude is used for the calculation. Returns: Input array with the apogee_perigee column added. Values in the column are ['ascending', 'descending', 'apogee', 'perigee', 'None'] 'None' values are used for the first and last element of the array, because those values are not deterministic. ''' input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns. data_has_geo = 'geo_altitude' in input_columns data_has_j2k = 'j2k_altitude' in input_columns if data_has_geo: altitude_column = 'geo_altitude' elif data_has_j2k: altitude_column = 'j2k_altitude' else: raise ValueError('Altitude column not found. Expected at least one of [geo_altitude, j2k_altitude].') if len(data[altitude_column]) > 2: altitude_delta = numpy.diff(data[altitude_column]) delta_to_prev = altitude_delta[:-1] delta_to_next = altitude_delta[1:] is_ascending = delta_to_prev >= 0 is_descending = delta_to_next <= 0 is_apogee = numpy.logical_and(is_ascending, is_descending) is_perigee = numpy.logical_and(~is_ascending, ~is_descending) apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10') apogee_perigee_col[0] = 'None' apogee_perigee_col[-1] = 'None' apogee_perigee_col[1:-1][is_ascending] = 'ascending' apogee_perigee_col[1:-1][is_descending] = 'descending' apogee_perigee_col[1:-1][is_apogee] = 'apogee' apogee_perigee_col[1:-1][is_perigee] = 'perigee' else: apogee_perigee_col = numpy.empty(len(data[altitude_column]), dtype='U10') apogee_perigee_col[:] = 'None' data = recfunctions.append_fields(data, 'apogee_perigee', apogee_perigee_col, dtypes=['U10'], usemask=False) # And then make sure to return a nominal copy, rather than a view. if data.base is not None: data = data.copy() return data
def get_ephemeris_range(get_definitive: bool = True, get_predicted: bool = True, return_met: bool = False)
-
Get time-availability information from the ephemeris database. Pulls from the
Args
get_definitive
- bool. Fetches the start/stop pair for the Definitive Ephemeris range.
get_predicted
- bool. Fetches the start/stop pair for the Predicted Ephemeris range.
return_met
-
bool. If True, returns timestamps in MET. If False, datetime.
Returns
- if get_definitive & get_predicted: returns [ Def_Start, Def_Stop, Pred_Start, Pred_Stop ]
if get_definitive
- returns [ Def_Start, Def_Stop ]
if get_predicted
- returns [ Pred_Start, Pred_Stop ]
Expand source code
def get_ephemeris_range(get_definitive: bool = True, get_predicted: bool = True, return_met: bool = False): ''' Get time-availability information from the ephemeris database. Pulls from the Args: get_definitive: bool. Fetches the start/stop pair for the Definitive Ephemeris range. get_predicted: bool. Fetches the start/stop pair for the Predicted Ephemeris range. return_met: bool. If True, returns timestamps in MET. If False, datetime. Returns: if get_definitive & get_predicted: returns [ Def_Start, Def_Stop, Pred_Start, Pred_Stop ] if get_definitive: returns [ Def_Start, Def_Stop ] if get_predicted: returns [ Pred_Start, Pred_Stop ] ''' if not(get_definitive or get_predicted): raise ValueError("One of get_definitive or get_predicted arg must be True!") query_list = [] if get_definitive: query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` ASC LIMIT 1;") query_list.append("SELECT `met` from `cassiope_ephemeris`.`geo` ORDER BY `met` DESC LIMIT 1;") if get_predicted: query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` ASC LIMIT 1;") query_list.append("SELECT `met` from `cassiope_ephemeris`.`predicted_geo` ORDER BY `met` DESC LIMIT 1;") return_data = [] # Fetch the data. connection = database._connect_to_database() cursor = connection.cursor() for query in query_list: cursor.execute(query) return_data.append(cursor.fetchall()[0][0]) # List of tuple -> Tuple -> Object. cursor.close() connection.close() if return_met == False: # User wants datetime: return_data = [datetime.datetime(1968, 5, 24) + datetime.timedelta(seconds=met) for met in return_data] return return_data
def get_geo_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True)
-
Get position and velocity information in geo coordinates.
Args
start_time
:int, float, str, datetime
- int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time
:int, float, str, datetime
- int, float, or string representing met time. datetime object or iso formatted date string.
position
:bool
- Return position information. Default True
velocity
:bool
- Return velocity information. Default True
Returns: A numpy structured array with columns representing the requested position and/or velocity columns.
Column Type Condition met float64 Always geo_x float64 if position geo_y float64 if position geo_z float64 if position geo_vx float64 if velocity geo_vy float64 if velocity geo_vz float64 if velocity Expand source code
def get_geo_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True): ''' Get position and velocity information in geo coordinates. Args: start_time (int, float, str, datetime): int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second. end_time (int, float, str, datetime): int, float, or string representing met time. datetime object or iso formatted date string. position (bool): Return position information. Default True velocity (bool): Return velocity information. Default True Returns: A numpy structured array with columns representing the requested position and/or velocity columns. | Column | Type | Condition | |:------ |:---- |:--------- | | met | float64 | Always | | geo_x | float64 | if position | | geo_y | float64 | if position | | geo_z | float64 | if position | | geo_vx | float64 | if velocity | | geo_vy | float64 | if velocity | | geo_vz | float64 | if velocity | ''' # Check version common._check_version() # verify time inputs start_time = common._verify_time(start_time) end_time = common._verify_time(end_time) # Round start time down to start of second start_time = int(start_time) if end_time < start_time: raise ValueError('Error: end_time before start_time.') data = _get_ephemeris(start_time, end_time, 'geo', position=position, velocity=velocity) return data
def get_j2k_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True)
-
Get position and velocity information in j2k coordinates.
Args
start_time
:int, float, str, datetime
- int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time
:int, float, str, datetime
- int, float, or string representing met time. datetime object or iso formatted date string.
position
:bool
- Return position information. Default True
velocity
:bool
- Return velocity information. Default True
Returns: A numpy structured array with columns representing the requested position and/or velocity columns.
Column Type Condition met float64 Always j2k_x float64 if position j2k_y float64 if position j2k_z float64 if position j2k_vx float64 if velocity j2k_vy float64 if velocity j2k_vz float64 if velocity Expand source code
def get_j2k_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True): ''' Get position and velocity information in j2k coordinates. Args: start_time (int, float, str, datetime): int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second. end_time (int, float, str, datetime): int, float, or string representing met time. datetime object or iso formatted date string. position (bool): Return position information. Default True velocity (bool): Return velocity information. Default True Returns: A numpy structured array with columns representing the requested position and/or velocity columns. | Column | Type | Condition | |:------ |:---- |:--------- | | met | float64 | Always | | j2k_x | float64 | if position | | j2k_y | float64 | if position | | j2k_z | float64 | if position | | j2k_vx | float64 | if velocity | | j2k_vy | float64 | if velocity | | j2k_vz | float64 | if velocity | ''' # Check version common._check_version() # verify time inputs start_time = common._verify_time(start_time) end_time = common._verify_time(end_time) # Round start time down to start of second start_time = int(start_time) if end_time < start_time: raise ValueError('Error: end_time before start_time.') data = _get_ephemeris(start_time, end_time, 'j2k', position=position, velocity=velocity) return data
def get_predicted_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True)
-
Get predicted position and velocity information. Predicted information is only availabile in geo coordinates.
Args
start_time
:int, float, str, datetime
- int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time
:int, float, str, datetime
- int, float, or string representing met time. datetime object or iso formatted date string.
position
:bool
- Return position information. Default True
velocity
:bool
- Return velocity information. Default True
Returns: A numpy structured array with columns representing the requested position and/or velocity columns.
Column Type Condition met float64 Always j2k_x float64 if position j2k_y float64 if position j2k_z float64 if position j2k_vx float64 if velocity j2k_vy float64 if velocity j2k_vz float64 if velocity Expand source code
def get_predicted_ephemeris(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], position: bool = True, velocity: bool = True): ''' Get predicted position and velocity information. Predicted information is only availabile in geo coordinates. Args: start_time (int, float, str, datetime): int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second. end_time (int, float, str, datetime): int, float, or string representing met time. datetime object or iso formatted date string. position (bool): Return position information. Default True velocity (bool): Return velocity information. Default True Returns: A numpy structured array with columns representing the requested position and/or velocity columns. | Column | Type | Condition | |:------ |:---- |:--------- | | met | float64 | Always | | j2k_x | float64 | if position | | j2k_y | float64 | if position | | j2k_z | float64 | if position | | j2k_vx | float64 | if velocity | | j2k_vy | float64 | if velocity | | j2k_vz | float64 | if velocity | ''' # Check version common._check_version() # verify time inputs start_time = common._verify_time(start_time) end_time = common._verify_time(end_time) # Round start time down to start of second. start_time = int(start_time) if end_time < start_time: raise ValueError('Error: end_time before start_time.') data = _get_ephemeris(start_time, end_time, 'predicted', position=position, velocity=velocity) return data
def interpolate_ephemeris(data, time_list)
-
Interpolate position/velocity data. Return data structure replaces 'met' column. Will decimate if overfitting is detected.
Arguments
data: Structrued Array. Only considers 'met' & [geo|j2k]_[x|y|z|vx|vy|vz] columns time_list: list of MET-representation of time.
Return
numpy structured array: [ met, [_x], [_y], [_z], [_vx], [_vy], [_vz]] dtypes=[float64, float64, float64, float64, float64, float64, float64 ] Will only return each column if it was included in the initial data. Will discard all other
Expand source code
def interpolate_ephemeris(data, time_list): ''' Interpolate position/velocity data. Return data structure replaces 'met' column. Will decimate if overfitting is detected. Arguments: data: Structrued Array. Only considers 'met' & [geo|j2k]_[x|y|z|vx|vy|vz] columns time_list: list of MET-representation of time. Return: numpy structured array: [ met, [*_x], [*_y], [*_z], [*_vx], [*_vy], [*_vz]] dtypes=[float64, float64, float64, float64, float64, float64, float64 ] Will only return each column if it was included in the initial data. Will discard all other ''' # Import the interpolation function. from scipy.interpolate import CubicSpline # To make the inteprolation easier on the overfitting checks, # For each frame given, if only given one of Position or Velocity, # Populate the other with zeroes. These false columns will be filtered later. input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns. _data_has_geo = 'geo_x' in input_columns or 'geo_vx' in input_columns _data_has_j2k = 'j2k_x' in input_columns or 'j2k_vx' in input_columns # Sanity check: Does not accept both. if _data_has_geo and _data_has_j2k: raise ValueError("intepolate_ephemeris only accepts one frame of either 'geo' or 'j2k'! Cannot accept both.") if not (_data_has_geo or _data_has_j2k): raise ValueError( "intepolate_ephemeris given data arg must contain one of the following: geo_x, geo_vx, j2k_x, j2k_vx as to tell the interpolation what frame to interpolate!") if _data_has_geo: frame = 'geo' else: frame = 'j2k' col_names = "{:s}_x {:s}_y {:s}_z {:s}_vx {:s}_vy {:s}_vz".format(frame, frame, frame, frame, frame, frame).split() input_met = data['met'] # Pre-populate the dataset with zeros, override as needed. input_eph = numpy.zeros((len(data['met']), 6), dtype=float) # -> Builds a Nx6 array, one for each pos & vel x/y/z. if col_names[0] in input_columns: input_eph[:, 0] = data[col_names[0]] if col_names[1] in input_columns: input_eph[:, 1] = data[col_names[1]] if col_names[2] in input_columns: input_eph[:, 2] = data[col_names[2]] if col_names[3] in input_columns: input_eph[:, 3] = data[col_names[3]] if col_names[4] in input_columns: input_eph[:, 4] = data[col_names[4]] if col_names[5] in input_columns: input_eph[:, 5] = data[col_names[5]] # Run the initial interpolation. output_met = time_list output_eph = CubicSpline(input_met, input_eph)(output_met) # <MET> <pX,pY,pX,vX,vY,vZ> -> <pX,pY,pX,vX,vY,vZ> # Fix overfitting: # If any geocentric altitude > 9,000 km, or velocity over 9 km/s, then halve the input ephemeris data. while numpy.any(numpy.sum(output_eph[:, :3]**2, axis=1)**0.5 > 9e6) \ or numpy.any(numpy.sum(output_eph[:, 3:]**2, axis=1)**0.5 > 9e3): print("Warning - Ephemeris Interpolation overfit. Halving input data.", file=sys.stderr) # Halve the inputs, keeping the first & last entries to avoid extrapolation. input_met = numpy.concatenate((input_met[:1], input_met[2:-2:2], input_met[-1:])) input_eph = numpy.concatenate((input_eph[:1], input_eph[2:-2:2], input_eph[-1:])) output_eph = CubicSpline(input_met, input_eph)(output_met) # Then prepend the MET column to the data set output_data = numpy.column_stack((output_met, output_eph)) # Then rebuild a structured array, with a fully populated column set output_data = recfunctions.unstructured_to_structured(output_data, names=['met'] + col_names) # And then filter any columns that weren't in the initial dataset. output_data = output_data[[col for col in output_data.dtype.names if col in input_columns]] # And to make sure it's returning a new copy, not a view, test if it's not a unique base: if output_data.base is not None: output_data = output_data.copy() return output_data