Module ephemeris_library.eclipse
Expand source code
import ephemeris_library.common as common
import ephemeris_library.database as database
import datetime
from typing import Union
import numpy
from numpy.lib import recfunctions
ECLIPSE_STATUS_STRING = {1: 'Unknown', 2: 'Sunlit', 3: 'Penumbra', 4: 'Umbra'}
def get_eclipse(start_time: Union[datetime.datetime, str, int, float],
end_time: Union[datetime.datetime, str, int, float],
range_times: bool = True):
'''
Get eclipse state information.
Args:
start_time: int, float, or string representing met time.
datetime object or iso formatted date string.
Time will be rounded down to start of second.
end_time: int, float, or string representing met time.
datetime object or iso formatted date string.
range_times: bool. Default True. If true return eclipse information as time ranges.
If False, return data at 1Hz cadence.
Return:
data: Numpy Structured array.
if range_times == True:
names=[met_start, met_end, eclipse_status]
dtypes=[float64, float64, Unicode[8]]
if range_times == False:
names=[met, eclipse_status]
dtypes=[float64, Unicode[8]]
'''
# Check version
common._check_version()
# verify time inputs
start_time = common._verify_time(start_time)
end_time = common._verify_time(end_time)
# Round start time down to start of second.
start_time = int(start_time)
if end_time < start_time:
raise ValueError('Error: end_time before start_time.')
data = _get_eclipse(start_time, end_time)
if len(data) == 0:
return data
# Trim times to requested start/end times.
if data['met_start'][0] < start_time:
data['met_start'][0] = start_time
if data['met_end'][-1] > end_time:
data['met_end'][-1] = end_time
if not range_times:
hertz_data = []
last_status = None
for timespan in data:
last_status = timespan['eclipse_status']
hertz_data.extend([(time, timespan['eclipse_status'])
for time in range(int(timespan['met_start']), int(timespan['met_end']))])
hertz_data.append((end_time, last_status))
data = numpy.array(hertz_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')])
return data
def interpolate_eclipse(data, time_list):
'''
Interpolate eclipse data.
Return data structure replaces 'met' column.
Arguments:
data: Structrued Array. Only considers 'met' & 'eclipse_status' columns
time_list: list of MET-representation of time.
Return:
numpy structured array: [ met, eclipse_status]
dtypes=[float64, Unicode[8]]
Will discard all other columns
'''
input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns.
if not ('met' in input_columns and 'eclipse_status' in input_columns):
raise ValueError(
"intepolate_eclipse given data arg must contain the following: met, eclipse_status")
# Shortcut if time_list is the same as existing met times.
if len(data) == len(time_list) and numpy.all(data['met'] == time_list):
return data
# convert (time, status) array to (time, status, time2, status2)
orig_data = data[['met', 'eclipse_status']].copy()
orig_data = [(met, status, met2, status2)
for (met, status), (met2, status2) in numpy.column_stack((data[0:-1], data[1:]))]
orig_data = numpy.array(orig_data, dtype=[('met', 'float64'),
('status', 'U8'),
('met2', 'float64'),
('status2', 'U8')
])
new_data = []
for time in time_list:
row = orig_data[(orig_data['met'] <= time) & (orig_data['met2'] > time)]
if row['status'] == row['status2']:
new_status = row['status'][0].copy()
elif (row['status'] == 'Unknown'
or row['status2'] == 'Unknown'):
new_status = 'Unknown'
elif (row['status'] == 'Umbra'
or row['status2'] == 'Umbra'):
new_status = 'Umbra'
elif (row['status'] == 'Penumbra'
or row['status2'] == 'Penumbra'):
new_status = 'Penumbra'
else:
# Avoids crash if status is none of the above (e.g. [])
new_status = 'Unknown'
new_data.append((time, new_status))
return numpy.array(new_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')])
def _get_eclipse(start_met: int,
end_met: int):
'''
Get eclipse state from database.
Arguments:
start_met: float representing met time.
end_met: float representing met time.
Return:
numpy structured array: [(met_start, met_end, eclipse_status),
dtype=(float64, float64, unicode[8])]
'''
fields = ['met_start', 'met_end', 'eclipse_id']
database_fields = ['`' + f + '`' for f in fields]
# connect to database and fetch data
statement = ('SELECT ' + ', '.join(database_fields)
+ ' FROM `cassiope_ephemeris`.`eclipse`'
+ ' WHERE `met_start` <= ' + str(end_met) + ' AND `met_end` >=' + str(start_met)
+ ' ORDER BY `met_start` ASC')
connection = database._connect_to_database()
cursor = connection.cursor()
cursor.execute(statement)
data = cursor.fetchall()
cursor.close()
connection.close()
# Convert eclipse_id to eclipse_status
# (eclipse_id is an internal database reference and has no meaning)
data = [(start, end, ECLIPSE_STATUS_STRING[id]) for (start, end, id) in data]
# return results
dtype_list = [('met_start', 'float64'),
('met_end', 'float64'),
('eclipse_status', 'U8')
]
results = numpy.array(data, dtype=dtype_list)
return results
Functions
def get_eclipse(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], range_times: bool = True)
-
Get eclipse state information.
Args
start_time
- int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time
- int, float, or string representing met time. datetime object or iso formatted date string.
range_times
- bool. Default True. If true return eclipse information as time ranges. If False, return data at 1Hz cadence.
Return
data: Numpy Structured array. if range_times == True: names=[met_start, met_end, eclipse_status] dtypes=[float64, float64, Unicode[8]]
if range_times == False: names=[met, eclipse_status] dtypes=[float64, Unicode[8]]
Expand source code
def get_eclipse(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], range_times: bool = True): ''' Get eclipse state information. Args: start_time: int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second. end_time: int, float, or string representing met time. datetime object or iso formatted date string. range_times: bool. Default True. If true return eclipse information as time ranges. If False, return data at 1Hz cadence. Return: data: Numpy Structured array. if range_times == True: names=[met_start, met_end, eclipse_status] dtypes=[float64, float64, Unicode[8]] if range_times == False: names=[met, eclipse_status] dtypes=[float64, Unicode[8]] ''' # Check version common._check_version() # verify time inputs start_time = common._verify_time(start_time) end_time = common._verify_time(end_time) # Round start time down to start of second. start_time = int(start_time) if end_time < start_time: raise ValueError('Error: end_time before start_time.') data = _get_eclipse(start_time, end_time) if len(data) == 0: return data # Trim times to requested start/end times. if data['met_start'][0] < start_time: data['met_start'][0] = start_time if data['met_end'][-1] > end_time: data['met_end'][-1] = end_time if not range_times: hertz_data = [] last_status = None for timespan in data: last_status = timespan['eclipse_status'] hertz_data.extend([(time, timespan['eclipse_status']) for time in range(int(timespan['met_start']), int(timespan['met_end']))]) hertz_data.append((end_time, last_status)) data = numpy.array(hertz_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')]) return data
def interpolate_eclipse(data, time_list)
-
Interpolate eclipse data. Return data structure replaces 'met' column.
Arguments
data: Structrued Array. Only considers 'met' & 'eclipse_status' columns time_list: list of MET-representation of time.
Return
numpy structured array: [ met, eclipse_status] dtypes=[float64, Unicode[8]] Will discard all other columns
Expand source code
def interpolate_eclipse(data, time_list): ''' Interpolate eclipse data. Return data structure replaces 'met' column. Arguments: data: Structrued Array. Only considers 'met' & 'eclipse_status' columns time_list: list of MET-representation of time. Return: numpy structured array: [ met, eclipse_status] dtypes=[float64, Unicode[8]] Will discard all other columns ''' input_columns = data.dtype.names + tuple() # Make a copy of the initial dataframe columns. if not ('met' in input_columns and 'eclipse_status' in input_columns): raise ValueError( "intepolate_eclipse given data arg must contain the following: met, eclipse_status") # Shortcut if time_list is the same as existing met times. if len(data) == len(time_list) and numpy.all(data['met'] == time_list): return data # convert (time, status) array to (time, status, time2, status2) orig_data = data[['met', 'eclipse_status']].copy() orig_data = [(met, status, met2, status2) for (met, status), (met2, status2) in numpy.column_stack((data[0:-1], data[1:]))] orig_data = numpy.array(orig_data, dtype=[('met', 'float64'), ('status', 'U8'), ('met2', 'float64'), ('status2', 'U8') ]) new_data = [] for time in time_list: row = orig_data[(orig_data['met'] <= time) & (orig_data['met2'] > time)] if row['status'] == row['status2']: new_status = row['status'][0].copy() elif (row['status'] == 'Unknown' or row['status2'] == 'Unknown'): new_status = 'Unknown' elif (row['status'] == 'Umbra' or row['status2'] == 'Umbra'): new_status = 'Umbra' elif (row['status'] == 'Penumbra' or row['status2'] == 'Penumbra'): new_status = 'Penumbra' else: # Avoids crash if status is none of the above (e.g. []) new_status = 'Unknown' new_data.append((time, new_status)) return numpy.array(new_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')])