About the ANTARES Pipeline

ANTARES processes alerts using a multi-stage pipeline. The pipeline consists of both internal ANTARES modules and user-submitted filters. Filters can be thought of as plugins for ANTARES which allow researchers to run their own computational logic in ANTARES. This page documents the structure of the pipeline and includes the source code of the filters which are currently running.

Inputs

Currently, ANTARES receives alerts from the ZTF survey in real-time. In the future it will process data from additional sources, like LIGO. Its ultimate purpose is to serve as a real-time alert broker for LSST.


Level 1 Filters

Every alert that ANTARES receives is sent through Level 1 (L1) filters. L1 filters exist to slim down the immense input data stream by throwing out alerts that are likely bogus (see the Bad Detection filter) or that were collected in poor conditions (see the Bad Seeing filter). If an alert doesn't meet the criteria imposed by an L1 filter it is discarded and isn't stored in our database.

L1 filters do not have access to object history or catalog associations.

Bad Detection v2

Discard alerts that are likely bogus, have bad pixels or a large difference between aperture and PSF magnitudes.

                
                  def bad_detection(locus_data):
    p = locus_data.get_properties()
    try:
        if p.get('ztf_rb') is not None:
            assert p['ztf_rb'] >= 0.55
        if p.get('ztf_nbad') is not None:
            assert p['ztf_nbad'] == 0
        if p.get('ztf_magdiff') is not None:
            assert -1.0 <= p['ztf_magdiff'] <= 1.0
    except AssertionError:
        raise locus_data.get_halt()
                
              
SSO

Detect alerts which ZTF has flagged as known solar system objects. These objects are crossmatched with JPL databases using astroquery and sent to output streams.

                
                  import numpy as np
from astroquery.jplhorizons import Horizons
from requests.exceptions import ConnectionError, SSLError


def translate_identifier(ztf_name):
    targetname = ztf_name
    # designation

    if len(ztf_name) > 4 and ztf_name[4].isalpha():
        if float(ztf_name[6:]) == 0:
            targetname = ztf_name[:4] + ' ' + ztf_name[4:6]
        else:
            targetname = (ztf_name[:4] + ' ' + ztf_name[4:6] +
                          str(int(float(ztf_name[6:]))))

    # comet designation
    if '/' in ztf_name:
        targetname = ztf_name[:6] + ' ' + ztf_name[6:]
    return targetname


def ztf_known_solar_system(ld):
    p = ld.get_properties()

    # Check for required parameters
    if p.get('ztf_magzpsci') is None or p.get('ztf_nmatches') is None:
        return
    if p.get('ztf_isdiffpos') is None or p.get('ztf_nbad') is None or p.get('ztf_rb') is None or p.get('ztf_ssdistnr') is None:
        return
    if p.get('ztf_ssnamenr') is None or p.get('ztf_jd') is None:
        return

    # reject unreliable detections
    if (p['ztf_isdiffpos'] == 'f' or
        p['ztf_isdiffpos'] == 0 or
        p['ztf_nbad'] > 0 or
        p['ztf_rb'] < 0.65 or
        p['ztf_ssdistnr'] == -999):
        return  # Skip this Alert

    # Send to ztf_sso_candidates
    ld.send_to_stream('ztf_sso_candidates')

    # check positional agreement and positional uncertainties
    targetname = translate_identifier(p['ztf_ssnamenr'])
    epoch = p['ztf_jd']
    hor = Horizons(id=targetname,
                   location='I41',
                   epochs=[epoch + 30/2/86400])
    try:
        eph = hor.ephemerides()
    except ValueError:
        return  # Skip this Alert
    except (ConnectionError, SSLError, Exception):
        return  # Failed to connect to JPL

    # Save Horizons properties in Antares
    ld.set_property('horizons_targetname', targetname)
    new_properties = [
        ('horizons_absolutemag', 'H'),
        ('horizons_slopeparameter', 'G'),
        ('horizons_predictedmagnitude', 'V'),
        ('horizons_heliodist_au', 'r'),
        ('horizons_observerdist_au', 'delta'),
        ('horizons_solarphaseangle', 'alpha'),
        ('horizons_solarelongation', 'elong'),
        ('horizons_ra_posunc_3sig_arcsec', 'RA_3sigma'),
        ('horizons_dec_posunc_3sig_arcsec', 'DEC_3sigma'),
        ('horizons_true_anomaly', 'true_anom'),
    ]
    for prop, var in new_properties:
        try:
            ld.set_property(prop, float(eph[var][0]))
        except (IndexError, ValueError, KeyError):
            pass

    if (np.sqrt((eph['RA'][0] - p['ra']) ** 2 +
                (eph['DEC'][0] - p['dec']) ** 2) * 3600 > 1):
        return  # Skip this Alert

    if np.sqrt(eph['RA_3sigma'][0] ** 2 + eph['DEC_3sigma'][0] ** 2) > 1:
        return  # Skip this Alert

    # Send to ztf_sso_confirmed
    ld.send_to_stream('ztf_sso_confirmed')

                
              
Bad Seeing

Discard alerts with poor seeing or elongated sources.

                
                  def bad_seeing(locus_data):
    p = locus_data.get_properties()
    try:
        if p.get('ztf_fwhm') is not None:
            assert p['ztf_fwhm'] <= 5.0
        if p.get('ztf_elong') is not None:
            assert p['ztf_elong'] <= 1.2
    except AssertionError:
        raise locus_data.get_halt()
                
              

Ingestion and Aggregation

Alerts which pass the L1 filters, or are sent to a stream by an L1 filter, are ingested and stored in the ANTARES alert database. We also ingest the history of past measurements, if such is included in the alert.

After ingestion we associate each measurement with a known locus of past alerts within 1", and with any nearby objects in our catalogs. The catalog search radius is dependent on the specific catalog.


Level 2 Filters

After alerts are ingested, aggregated, and associated with catalogs, ANTARES runs the L2 filters. The purpose of the L2 filters is to detect the interesting science alerts based on custom criteria.

Require 2+ Measurements

Halt the pipeline now if this is the first Alert on this Locus.

              
                def require_2_measurements(ld):
    # Require 2+ ztf_magpsf measurements in the lightcurve.
    # This will exclude upper-limits from the count, which do not have magpsf.
    mags = ld.get_time_series('ztf_magpsf')[-1]
    n = len([m for m in mags if m])
    if n < 2:
        raise ld.get_halt()
              
            
Extragalactic v2

Send to stream `extragalactic` if the Alert is associated with a known galaxy from a catalog.

              
                def extragalactic(locus):
    """
    Send alert to stream 'extragalactic' if it matches any extended source catalogs.
    """
    matching_catalog_names = locus.get_astro_object_matches().keys()

    # These are the catalogs (Antares-based names) with extended sources
    xsc_cats = ['2mass_xsc', 'ned', 'nyu_valueadded_gals', 'sdss_gals', 'veron_agn_qso', 'RC3']

    if set(matching_catalog_names) & set(xsc_cats):
        locus.send_to_stream('extragalactic')
              
            
Gravitational Wave EM Counterpart Filter

Test loci against any available LVC skymaps to identify possible EM counterparts

              
                import astropy.time as atime
import datetime
import numpy as np
import healpy as hp
import ligo.skymap.io
from ligo.skymap.postprocess import find_greedy_credible_levels
from antares.services import ligo
import zlib


def gw_em_counterpart_search(locus_data):
    '''
    Test if a locus is a plausible counterpart to an active GW source 
    - Gautham Narayan (github: gnarayan)
      20190614
    '''

    # this is a call to get the list of currently active (where active is defined by ANTARES team) list of GW alerts
    # this should be a list of dicts
    active_gw_alerts =  ligo.get_events_cached()

    
    # get the current UTC time
    tnow = atime.Time(datetime.datetime.utcnow()).mjd
    
    # get the event time for each of the GW events
    tgw_alerts = np.array([atime.Time(gw_alert['event_timestamp']).mjd for gw_alert in active_gw_alerts])
    
    # event has to be within the last seven days -  source is unlikely to be bright enough to find beyond that
    if not np.any(np.abs(tgw_alerts - tnow) < 7.):
        return
    
    time_series = locus_data.get_time_series('ztf_fid', 'ztf_magpsf', 'ztf_sigmapsf')
    _, mjd, _, _, _ = time_series
    mjd = np.array(mjd)
    
    # we only care about the newest alert
    mjd_latest = mjd.max()
    
    # indices of GW events that are within 7 days of this alert AND within the last 7 days
    plausible_events = np.abs(tgw_alerts - mjd_latest) < 7.
    gw_event_names = np.array([gw_alert['event_name'] for gw_alert in active_gw_alerts])
    
    plausible_event_names = gw_event_names[plausible_events]
    plausible_event_times = tgw_alerts[plausible_events]
    if len(plausible_event_names) == 0:
        # there are no plausible events that this alert could be a counterpart for
        return 
    
    mjd_earliest = mjd.min()
    history_predates_gw_event = []
    for _, event_timestamp in zip(plausible_event_names, plausible_event_times):
        # the earliest alert at this locus is more than a week before the GW trigger
        if (event_timestamp - mjd_earliest) >= 7:
            history_predates_gw_event.append(True)
        else:
            history_predates_gw_event.append(False)
    history_predates_gw_event = np.array(history_predates_gw_event)
    
    # get the subset of events that do not have any locus prehistory,
    # are within 7 days of this alert AND within the last 7 days
    plausible_event_names = plausible_event_names[~history_predates_gw_event]
    if len(plausible_event_names) == 0:
        # there are no plausible events that this alert could be a counterpart for
        return 
    
    # we don't need to worry about the other events
    useful_gw_event_inds = [np.where(gw_event_names == event)[0][0] for event in plausible_event_names]
    useful_gw_alerts = [active_gw_alerts[ind] for ind in useful_gw_event_inds]

    # look for any previous variability at the astro object catalog level
    matching_catalog_names = locus_data.get_astro_object_matches().keys()
    
    var_cats = ['veron_agn_qso', 'asassn_variable_catalog']
    if set(matching_catalog_names) & set(var_cats):
        # this locus is associated with a previously variable source, even if we don't have history
        return
    
    # check if this source is nuclear - the BNS/BBH/NS-BH events we care about shouldn't be
    streams = locus_data.get_locus_streams()
    streams = set(streams)
    bad_cats = set(['nuclear_transient', 'high_amplitude_variable_stars','ztf_known_solar_system'])
    if not streams.isdisjoint(bad_cats):
        return

    
    # if we get here we have to look at the map
    locus_properties = locus_data.get_properties()
    ra = locus_properties['ra']
    dec = locus_properties['dec']
        
    # we want the integral of the probability in some circle around the position
    radius = 2. # degrees
    # coordinate conversions
    theta = 0.5 * np.pi - np.deg2rad(dec)
    phi = np.deg2rad(ra)
    radius = np.deg2rad(radius)
    xyz = hp.ang2vec(theta, phi)
    
    # check the skymap for probability
    for gw_event in useful_gw_alerts:
        
        this_event_name = gw_event['event_name']
        this_stream_name = this_event_name + ' possible GW Counterpart'
        this_stream_name = this_stream_name.lower().replace(' ', '_')

        
        # if the false alarm rate indicates that this event is more common than 1 every 10 years, then divert
        # this isn't the LVC threshold, but it's probably realistic 
        # because more common events are really poorly constrained on the sky
        print(1./(gw_event['event_far']*31536000), this_event_name)
        if 1./(gw_event['event_far']*31536000) < 10.:
            continue
        
        gw_event_class = gw_event['event_classification']
        # check if Terrestrial is > 0.5
        if gw_event_class['Terrestrial'] > 0.5:
            continue 
        
        # check if terrestrial is most likely, even if not 0.5
        event_class, event_prob = zip(*(gw_event_class.items()))
        event_class = np.array(event_class)
        event_prob = np.array(event_prob)
        ind_max = event_prob.argmax()
        if event_class[ind_max] == 'Terrestrial':
            continue 
            
        # check if the most likely involves a neutron star - not going to have an EM counterpart otherwise
        # could disable this to check all events
        if event_class[ind_max] in ('BNS', 'NSBH'):
            continue 
                            
        # get the NSIDE of this map, and the pixel index of the locus position, as well as area per pixel
        nside = gw_event['event_skymap_fits_nside'] # don't assume the skymaps have the same NSIDE
        ipix = hp.ang2pix(nside, theta, phi)
        area_norm = hp.nside2pixarea(nside, degrees=True)
                
        # convert the skymap from bytes string to numpy array
        gw_event_skymap = np.frombuffer(zlib.decompress(gw_event['event_skymap_fits_data_gzip']),\
                                            dtype=[('PROB', '>f8')])['PROB'].copy()
        
        # note that we're only using flat prob density on the sky, not distances
        # this is because the redshift catalogs are incomplete
        # can potentially check for locus galaxy matches and see if they do have a redshift
        # and if yes, if that redshift corresponds to the right distance given concordance cosmology
        # this is straightforward, but lets get a few more secure EM counterparts before doing this
             
        
        # what's the probability at this location relative to maximum
        ipix_good = np.isfinite(gw_event_skymap)
        ipix_max = np.argmax(gw_event_skymap[ipix_good])
        max_prob = gw_event_skymap[ipix_good][ipix_max]
        if np.isnan(max_prob):
            continue 
                
        this_prob = gw_event_skymap[ipix]
        if not np.isfinite(this_prob):
            continue 
        
        # what's the total proability in an error region around this position
        ipix_disc = hp.query_disc(nside, xyz, radius)
        total_prob = gw_event_skymap[ipix_disc].sum()
        if not np.isfinite(total_prob):
            continue
            
          
        # are we within the 90% contour or the 50% contour
        credible_levels = find_greedy_credible_levels(gw_event_skymap)
        within90 = (credible_levels[ipix] <= 0.9)
        within50 = (credible_levels[ipix] <= 0.5)
        # if we're within the 90% region, then yes, we are also by definition within the 50% region
        # this is mostly to set an annotation because these are the two contours LVC shows usually
        
        
        # demand that the probability in the circle exceeds some threshold per sq. degree
        # this value is just based on playing with a few skymaps of credible events
        # this could probably be updated
        threshold = 0.05
        threshold_area = 1e-7
                       
        # note that you can take ratios of probability density because HEALpix is equal area
        # if it were not, you'd have to get probability density per sq degree and take the ratio of that
        
        # if relative probability is high or we have > 5 sigma confidence
        if this_prob/area_norm >= threshold and \
            (this_prob/max_prob) > 0.5 and \
            (within50 or within90) and \
            (total_prob > threshold_area):
            locus_data.send_to_stream(this_stream_name)
        
    return
              
            
High Amp v2

Update to use new mag correction code.

              
                from statsmodels.stats.weightstats import DescrStatsW


def high_amplitude(ld):
   threshold = 1.0  # in magnitude unit, and same for all filters
   p = ld.get_properties()
   fid = p['ztf_fid']

   _, mjd = ld.get_time_series(filters={'ztf_fid': fid})
   if len(mjd) < 2:
       return

   mag, sigmag, mjd, is_var_star, corrected = ld.get_varstar_corrected_mags(fid=fid)
   if is_var_star and corrected:
       stream_name = 'high_amplitude_variable_stars'
   else:
       stream_name = 'high_amplitude'

   W = 1.0 / sigmag**2.0
   des = DescrStatsW(mag, weights=W)
   if des.std > threshold:
       ld.send_to_stream(stream_name)
              
            
High Flux
              
                def high_flux_ratio_stream_v2(locus):
    ref_zps={1:26.325,2:26.275,3:25.660}
    T_pos_neg={"pos":{1:10.5, 2:10.3}, "neg":{1:0.15, 2:0.14}} #this threshold expected to flag ~3% of the total alerts in respective fid's. No i-band filter alerts found
    alert_base_props=locus.get_properties()
    product_sign=2.0*(alert_base_props['ztf_isdiffpos']=='t')-1
    
    if product_sign>0.0:
        threshold=T_pos_neg['pos']
    if product_sign<0.0:
        threshold=T_pos_neg['neg']
        
    if (alert_base_props['ztf_distnr']<=1.0 and alert_base_props['ztf_distnr']>=0.0) and ('ztf_magzpsci' in alert_base_props.keys()) and (alert_base_props['ztf_fid'] in threshold.keys()):        
        ref_flux=10**(0.4*(ref_zps[alert_base_props['ztf_fid']]-alert_base_props['ztf_magnr']))
        difference_flux=10**(0.4*(alert_base_props['ztf_magzpsci']-alert_base_props['ztf_magpsf']))
        sci_flux=ref_flux+product_sign*difference_flux
        flux_ratio=sci_flux/ref_flux

        if (product_sign<0.0 and flux_ratio<threshold[alert_base_props['ztf_fid']]) or (product_sign>0.0 and flux_ratio>threshold[alert_base_props['ztf_fid']]):
            locus.send_to_stream("high_flux_ratio_wrt_nn")
              
            
High SNR
              
                def high_snr(locus):
    #should flag ~2-3% of alerts. Again no i-filter alerts found
    snr_threshold = {1: 50.0, 2: 55.0}

    p = locus.get_properties()
    alert_snr = 1.0 / p['ztf_sigmapsf']
    if p['ztf_fid'] in snr_threshold.keys() \
    and alert_snr > snr_threshold[p['ztf_fid']]:
        locus.send_to_stream('high_snr')
              
            
M31

Detect alerts within a square box around M31.

              
                def in_m31(locus):
    ra_max = 11.434793
    ra_min = 9.934793
    dec_max = 42.269065
    dec_min = 40.269065

    alert_props = locus.get_properties()
    ra = alert_props['ra']
    dec = alert_props['dec']

    if ra_max > ra > ra_min \
    and dec_max > dec > dec_min:
        locus.send_to_stream("in_m31")
              
            
Nuclear Transient
              
                def nuclear_transient(locus):
    """
    Send alert to stream 'Nuclear Transient' if it is within 0.6 arcseconds of a
    source in the ZTF reference frame. It is also required that a match within
    1" of a known Pan-STARRS galaxy (ztf_distpsnr1 < 1. and ztf_sgscore1<0.3).
    To further remove small flux fluctuaion, we also require magpsf (alert PSF
    photometry) - magnr (PSF photometry of the nearby source in the reference
    image) > 1.5. The selection criteria are from Sjoert van Velzen et al.
    (2018, arXiv:1809.02608), section 2.1.
    """
    alert_props = locus.get_properties()
    sgscore = alert_props['ztf_sgscore1']
    distpsnr = alert_props['ztf_distpsnr1']
    magpsf = alert_props['ztf_magpsf']
    magnr = alert_props['ztf_magnr']
    distnr = alert_props['ztf_distnr']

    if None in (distnr, distpsnr, sgscore, magpsf, magnr):
        return
    if distnr < 0.6 and distpsnr < 1. and sgscore < 0.3 and magpsf - magnr < 1.5:
        locus.send_to_stream("nuclear_transient")
              
            
siena_mag_coord_cut2

Filters alerts by magnitude and coordinate for Siena College's .7m telescope (by Albany, NY). Updated for mag correction to use for variable stars

              
                def siena_mag_coord_cut2(ld):
    """
    Send alerts to stream 'Siena_mag_coord_cut2' if Alert is less than
    17 apparent magnitude in r, and if RA/Dec limits are met. 
    """
    mag_max = 17
    dec_min = 0
    dec_max = 90
    ra_min = 75
    ra_max = 180
    p = ld.get_properties()
    ra = p['ra']
    dec = p['dec']

    # Get arrays of corrected mag and sigmag, if possible.
    # This function only works on variable stars, otherwise it returns
    # ZTF's `magpsf` and `sigmagpsf` without modification.
    mag, sigmag, mjd, is_var_star, corrected = ld.get_varstar_corrected_mags()
    alert_mag = mag[-1]

    if alert_mag < mag_max \
        and dec_max > dec > dec_min \
        and ra_max > ra > ra_min:
        ld.send_to_stream("siena_mag_coord_cut2")
              
            

Output

When an alert exits the pipeline it has been flagged with catalog matches, arbitrary new data properties generated by the filters, and stream associations. At this point we check alerts for association with user-submitted watched objects, and send Slack notifications accordingly.

Finally, we output the alert to Kafka streams if it was associated with a stream. Downstream systems and users connect to the streams in real-time using the ANTARES client library.