Module ephemeris_library.eclipse

Expand source code
import ephemeris_library.common as common
import ephemeris_library.database as database

import datetime
from typing import Union
import numpy
from numpy.lib import recfunctions

ECLIPSE_STATUS_STRING = {1: 'Unknown', 2: 'Sunlit', 3: 'Penumbra', 4: 'Umbra'}


def get_eclipse(start_time: Union[datetime.datetime, str, int, float],
                end_time: Union[datetime.datetime, str, int, float],
                range_times: bool = True):
  '''
  Get eclipse state information.

  Args:
    start_time: int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time: int, float, or string representing met time.
              datetime object or iso formatted date string.

    range_times: bool. Default True. If true return eclipse information as time ranges.
                 If False, return data at 1Hz cadence.

  Return:
    data: Numpy Structured array.
      if range_times == True:
        names=[met_start, met_end, eclipse_status]
        dtypes=[float64, float64,  Unicode[8]]
        
      if range_times == False:
        names=[met, eclipse_status]
        dtypes=[float64,  Unicode[8]]

  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second.
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_eclipse(start_time, end_time)

  if len(data) == 0:
    return data

  # Trim times to requested start/end times.
  if data['met_start'][0] < start_time:
    data['met_start'][0] = start_time

  if data['met_end'][-1] > end_time:
    data['met_end'][-1] = end_time

  if not range_times:
    hertz_data = []
    last_status = None
    for timespan in data:
      last_status = timespan['eclipse_status']
      hertz_data.extend([(time, timespan['eclipse_status'])
                         for time in range(int(timespan['met_start']), int(timespan['met_end']))])
    hertz_data.append((end_time, last_status))
    data = numpy.array(hertz_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')])

  return data


def interpolate_eclipse(data, time_list):
  '''
  Interpolate eclipse data.
  Return data structure replaces 'met' column.

  Arguments:
    data: Structrued Array. Only considers 'met' & 'eclipse_status' columns
    time_list: list of MET-representation of time.

  Return:
    numpy structured array: [ met, eclipse_status]
      dtypes=[float64,  Unicode[8]]
      Will discard all other columns
  '''

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  if not ('met' in input_columns and 'eclipse_status' in input_columns):
    raise ValueError(
        "intepolate_eclipse given data arg must contain the following: met, eclipse_status")

  # Shortcut if time_list is the same as existing met times.
  if len(data) == len(time_list) and numpy.all(data['met'] == time_list):
    return data

  # convert (time, status) array to (time, status, time2, status2)
  orig_data = data[['met', 'eclipse_status']].copy()
  orig_data = [(met, status, met2, status2)
               for (met, status), (met2, status2) in numpy.column_stack((data[0:-1], data[1:]))]
  orig_data = numpy.array(orig_data, dtype=[('met', 'float64'),
                                            ('status', 'U8'),
                                            ('met2', 'float64'),
                                            ('status2', 'U8')
                                            ])
  new_data = []
  for time in time_list:
    row = orig_data[(orig_data['met'] <= time) & (orig_data['met2'] > time)]
    if row['status'] == row['status2']:
      new_status = row['status'][0].copy()

    elif (row['status'] == 'Unknown'
          or row['status2'] == 'Unknown'):
      new_status = 'Unknown'

    elif (row['status'] == 'Umbra'
          or row['status2'] == 'Umbra'):
      new_status = 'Umbra'

    elif (row['status'] == 'Penumbra'
          or row['status2'] == 'Penumbra'):
      new_status = 'Penumbra'

    else:
      # Avoids crash if status is none of the above (e.g. [])
      new_status = 'Unknown'

    new_data.append((time, new_status))

  return numpy.array(new_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')])


def _get_eclipse(start_met: int,
                 end_met: int):
  '''
  Get eclipse state from database.

  Arguments:
    start_met: float representing met time.
    end_met: float representing met time.

  Return:
    numpy structured array: [(met_start, met_end, eclipse_status),
                              dtype=(float64, float64, unicode[8])]
  '''
  fields = ['met_start', 'met_end', 'eclipse_id']
  database_fields = ['`' + f + '`' for f in fields]

  # connect to database and fetch data
  statement = ('SELECT ' + ', '.join(database_fields)
               + ' FROM `cassiope_ephemeris`.`eclipse`'
               + ' WHERE `met_start` <= ' + str(end_met) + ' AND  `met_end` >=' + str(start_met)
               + ' ORDER BY `met_start` ASC')

  connection = database._connect_to_database()
  cursor = connection.cursor()
  cursor.execute(statement)
  data = cursor.fetchall()
  cursor.close()
  connection.close()

  # Convert eclipse_id to eclipse_status
  # (eclipse_id is an internal database reference and has no meaning)
  data = [(start, end, ECLIPSE_STATUS_STRING[id]) for (start, end, id) in data]

  # return results
  dtype_list = [('met_start', 'float64'),
                ('met_end', 'float64'),
                ('eclipse_status', 'U8')
                ]
  results = numpy.array(data, dtype=dtype_list)
  return results

Functions

def get_eclipse(start_time: Union[datetime.datetime, str, int, float], end_time: Union[datetime.datetime, str, int, float], range_times: bool = True)

Get eclipse state information.

Args

start_time
int, float, or string representing met time. datetime object or iso formatted date string. Time will be rounded down to start of second.
end_time
int, float, or string representing met time. datetime object or iso formatted date string.
range_times
bool. Default True. If true return eclipse information as time ranges. If False, return data at 1Hz cadence.

Return

data: Numpy Structured array. if range_times == True: names=[met_start, met_end, eclipse_status] dtypes=[float64, float64, Unicode[8]]

if range_times == False: names=[met, eclipse_status] dtypes=[float64, Unicode[8]]

Expand source code
def get_eclipse(start_time: Union[datetime.datetime, str, int, float],
                end_time: Union[datetime.datetime, str, int, float],
                range_times: bool = True):
  '''
  Get eclipse state information.

  Args:
    start_time: int, float, or string representing met time.
                datetime object or iso formatted date string.
                Time will be rounded down to start of second.

    end_time: int, float, or string representing met time.
              datetime object or iso formatted date string.

    range_times: bool. Default True. If true return eclipse information as time ranges.
                 If False, return data at 1Hz cadence.

  Return:
    data: Numpy Structured array.
      if range_times == True:
        names=[met_start, met_end, eclipse_status]
        dtypes=[float64, float64,  Unicode[8]]
        
      if range_times == False:
        names=[met, eclipse_status]
        dtypes=[float64,  Unicode[8]]

  '''
  # Check version
  common._check_version()

  # verify time inputs
  start_time = common._verify_time(start_time)
  end_time = common._verify_time(end_time)

  # Round start time down to start of second.
  start_time = int(start_time)

  if end_time < start_time:
    raise ValueError('Error: end_time before start_time.')

  data = _get_eclipse(start_time, end_time)

  if len(data) == 0:
    return data

  # Trim times to requested start/end times.
  if data['met_start'][0] < start_time:
    data['met_start'][0] = start_time

  if data['met_end'][-1] > end_time:
    data['met_end'][-1] = end_time

  if not range_times:
    hertz_data = []
    last_status = None
    for timespan in data:
      last_status = timespan['eclipse_status']
      hertz_data.extend([(time, timespan['eclipse_status'])
                         for time in range(int(timespan['met_start']), int(timespan['met_end']))])
    hertz_data.append((end_time, last_status))
    data = numpy.array(hertz_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')])

  return data
def interpolate_eclipse(data, time_list)

Interpolate eclipse data. Return data structure replaces 'met' column.

Arguments

data: Structrued Array. Only considers 'met' & 'eclipse_status' columns time_list: list of MET-representation of time.

Return

numpy structured array: [ met, eclipse_status] dtypes=[float64, Unicode[8]] Will discard all other columns

Expand source code
def interpolate_eclipse(data, time_list):
  '''
  Interpolate eclipse data.
  Return data structure replaces 'met' column.

  Arguments:
    data: Structrued Array. Only considers 'met' & 'eclipse_status' columns
    time_list: list of MET-representation of time.

  Return:
    numpy structured array: [ met, eclipse_status]
      dtypes=[float64,  Unicode[8]]
      Will discard all other columns
  '''

  input_columns = data.dtype.names + tuple()  # Make a copy of the initial dataframe columns.

  if not ('met' in input_columns and 'eclipse_status' in input_columns):
    raise ValueError(
        "intepolate_eclipse given data arg must contain the following: met, eclipse_status")

  # Shortcut if time_list is the same as existing met times.
  if len(data) == len(time_list) and numpy.all(data['met'] == time_list):
    return data

  # convert (time, status) array to (time, status, time2, status2)
  orig_data = data[['met', 'eclipse_status']].copy()
  orig_data = [(met, status, met2, status2)
               for (met, status), (met2, status2) in numpy.column_stack((data[0:-1], data[1:]))]
  orig_data = numpy.array(orig_data, dtype=[('met', 'float64'),
                                            ('status', 'U8'),
                                            ('met2', 'float64'),
                                            ('status2', 'U8')
                                            ])
  new_data = []
  for time in time_list:
    row = orig_data[(orig_data['met'] <= time) & (orig_data['met2'] > time)]
    if row['status'] == row['status2']:
      new_status = row['status'][0].copy()

    elif (row['status'] == 'Unknown'
          or row['status2'] == 'Unknown'):
      new_status = 'Unknown'

    elif (row['status'] == 'Umbra'
          or row['status2'] == 'Umbra'):
      new_status = 'Umbra'

    elif (row['status'] == 'Penumbra'
          or row['status2'] == 'Penumbra'):
      new_status = 'Penumbra'

    else:
      # Avoids crash if status is none of the above (e.g. [])
      new_status = 'Unknown'

    new_data.append((time, new_status))

  return numpy.array(new_data, dtype=[('met', 'float64'), ('eclipse_status', 'U8')])