About the ANTARES Pipeline

ANTARES processes alerts using a multi-stage pipeline. The pipeline consists of both internal ANTARES modules and user-submitted filters. Filters can be thought of as plugins for ANTARES which allow researchers to run their own computational logic in ANTARES. This page documents the structure of the pipeline and includes the source code of the filters which are currently running.

Inputs

Currently, ANTARES receives alerts from the ZTF survey in real-time. In the future it will process data from additional sources, like LIGO. Its ultimate purpose is to serve as a real-time alert broker for LSST.


Level 1 Filters

Every alert that ANTARES receives is sent through Level 1 (L1) filters. L1 filters exist to slim down the immense input data stream by throwing out alerts that are likely bogus (see the Bad Detection filter) or that were collected in poor conditions (see the Bad Seeing filter). If an alert doesn't meet the criteria imposed by an L1 filter it is discarded and isn't stored in our database.

L1 filters do not have access to object history or catalog associations.

Bad Detection v2

Discard alerts that are likely bogus, have bad pixels or a large difference between aperture and PSF magnitudes.

                
                  def bad_detection(locus_data):
    p = locus_data.get_properties()
    try:
        if p.get('ztf_rb') is not None:
            assert p['ztf_rb'] >= 0.55
        if p.get('ztf_nbad') is not None:
            assert p['ztf_nbad'] == 0
        if p.get('ztf_magdiff') is not None:
            assert -1.0 <= p['ztf_magdiff'] <= 1.0
    except AssertionError:
        raise locus_data.get_halt()
                
              
SSO

Detect alerts which ZTF has flagged as known solar system objects. These objects are crossmatched with JPL databases using astroquery and sent to output streams.

                
                  import numpy as np
from astroquery.jplhorizons import Horizons
from requests.exceptions import ConnectionError, SSLError


def translate_identifier(ztf_name):
    targetname = ztf_name
    # designation

    if len(ztf_name) > 4 and ztf_name[4].isalpha():
        if float(ztf_name[6:]) == 0:
            targetname = ztf_name[:4] + ' ' + ztf_name[4:6]
        else:
            targetname = (ztf_name[:4] + ' ' + ztf_name[4:6] +
                          str(int(float(ztf_name[6:]))))

    # comet designation
    if '/' in ztf_name:
        targetname = ztf_name[:6] + ' ' + ztf_name[6:]
    return targetname


def ztf_known_solar_system(ld):
    p = ld.get_properties()

    # Check for required parameters
    if p.get('ztf_magzpsci') is None or p.get('ztf_nmatches') is None:
        return
    if p.get('ztf_isdiffpos') is None or p.get('ztf_nbad') is None or p.get('ztf_rb') is None or p.get('ztf_ssdistnr') is None:
        return
    if p.get('ztf_ssnamenr') is None or p.get('ztf_jd') is None:
        return

    # reject unreliable detections
    if (p['ztf_isdiffpos'] == 'f' or
        p['ztf_isdiffpos'] == 0 or
        p['ztf_nbad'] > 0 or
        p['ztf_rb'] < 0.65 or
        p['ztf_ssdistnr'] == -999):
        return  # Skip this Alert

    # Send to ztf_sso_candidates
    ld.send_to_stream('ztf_sso_candidates')

    # check positional agreement and positional uncertainties
    targetname = translate_identifier(p['ztf_ssnamenr'])
    epoch = p['ztf_jd']
    hor = Horizons(id=targetname,
                   location='I41',
                   epochs=[epoch + 30/2/86400])
    try:
        eph = hor.ephemerides()
    except ValueError:
        return  # Skip this Alert
    except (ConnectionError, SSLError, Exception):
        return  # Failed to connect to JPL

    # Save Horizons properties in Antares
    ld.set_property('horizons_targetname', targetname)
    new_properties = [
        ('horizons_absolutemag', 'H'),
        ('horizons_slopeparameter', 'G'),
        ('horizons_predictedmagnitude', 'V'),
        ('horizons_heliodist_au', 'r'),
        ('horizons_observerdist_au', 'delta'),
        ('horizons_solarphaseangle', 'alpha'),
        ('horizons_solarelongation', 'elong'),
        ('horizons_ra_posunc_3sig_arcsec', 'RA_3sigma'),
        ('horizons_dec_posunc_3sig_arcsec', 'DEC_3sigma'),
        ('horizons_true_anomaly', 'true_anom'),
    ]
    for prop, var in new_properties:
        try:
            ld.set_property(prop, float(eph[var][0]))
        except (IndexError, ValueError, KeyError):
            pass

    if (np.sqrt((eph['RA'][0] - p['ra']) ** 2 +
                (eph['DEC'][0] - p['dec']) ** 2) * 3600 > 1):
        return  # Skip this Alert

    if np.sqrt(eph['RA_3sigma'][0] ** 2 + eph['DEC_3sigma'][0] ** 2) > 1:
        return  # Skip this Alert

    # Send to ztf_sso_confirmed
    ld.send_to_stream('ztf_sso_confirmed')

                
              
Bad Seeing

Discard alerts with poor seeing or elongated sources.

                
                  def bad_seeing(locus_data):
    p = locus_data.get_properties()
    try:
        if p.get('ztf_fwhm') is not None:
            assert p['ztf_fwhm'] <= 5.0
        if p.get('ztf_elong') is not None:
            assert p['ztf_elong'] <= 1.2
    except AssertionError:
        raise locus_data.get_halt()
                
              

Ingestion and Aggregation

Alerts which pass the L1 filters, or are sent to a stream by an L1 filter, are ingested and stored in the ANTARES alert database. We also ingest the history of past measurements, if such is included in the alert.

After ingestion we associate each measurement with a known locus of past alerts within 1", and with any nearby objects in our catalogs. The catalog search radius is dependent on the specific catalog.


Level 2 Filters

After alerts are ingested, aggregated, and associated with catalogs, ANTARES runs the L2 filters. The purpose of the L2 filters is to detect the interesting science alerts based on custom criteria.

Require 2+ Measurements

Halt the pipeline now if this is the first Alert on this Locus.

              
                def require_2_measurements(ld):
    # Require 2+ ztf_magpsf measurements in the lightcurve.
    # This will exclude upper-limits from the count, which do not have magpsf.
    mags = ld.get_time_series('ztf_magpsf')[-1]
    n = len([m for m in mags if m])
    if n < 2:
        raise ld.get_halt()
              
            
astrorapid v9
              
                import base64
import pickle
import traceback
import time
import numpy as np
from astrorapid.classify import Classify
from astroquery.irsa_dust import IrsaDust
import astropy.coordinates as coord
import astropy.units as u
import dustmaps.sfd


# Send error notifications to Slack
# ERROR_LOG_SLACK_CHANNEL = '#astrorapid'

# Run this filter only once at a time within each ANTARES process
# RUN_IN_MUTEX = True

# Classifiers indexed by bool value of `known_redshift`
classifiers = {}


def setup():
    # Reset Tensorflow
    from keras import backend as K
    K.clear_session()

    # Load models from disk
    print(classifiers)
    classifiers[True] = Classify(known_redshift=True)
    classifiers[True].model._make_predict_function()
    classifiers[False] = Classify(known_redshift=False)
    classifiers[False].model._make_predict_function()
    print(classifiers)

    # Load dustmaps
    from dustmaps.config import config
    config['data_dir'] = '/static_files/'  # production
    #config['data_dir'] = '/tmp/'  # datalab
    dustmaps.sfd.fetch()


def delete_indexes(deleteindexes, *args):
    newarrs = []
    for arr in args:
        newarr = np.delete(arr, deleteindexes)
        newarrs.append(newarr)

    return newarrs


def rapid_stage(locus_data):
    locus_properties = locus_data.get_properties()
    objid = locus_properties['alert_id']
    ra = locus_properties['ra']
    dec = locus_properties['dec']
    
    # Don't classify variable objects. Look for any previous variability at the astro object catalog level
    matching_catalog_names = locus_data.get_astro_object_matches().keys()
    var_cats = ['veron_agn_qso', 'asassn_variable_catalog']
    if set(matching_catalog_names) & set(var_cats):
        # this locus is associated with a previously variable source, even if we don't have history
        locus_data.set_property('astrorapid_skipped', 'obj is known variable')
        return
    
    # Don't classify objects within the galactic plane
    coo = coord.SkyCoord(ra * u.deg, dec * u.deg, frame='icrs')
    b = coo.galactic.b.value
    if abs(b) < 15:
        locus_data.set_property('astrorapid_skipped', 'ignore galactic plane')
        return
    
    # Get Milkyway extinction
    coo = coord.SkyCoord(ra * u.deg, dec * u.deg, frame='icrs')
    sfd = dustmaps.sfd.SFDQuery()
    mwebv = sfd(coo)

    # Get redshift from SDSS_gals
    catalog_matches = locus_data.get_astro_object_matches()
    if 'sdss_gals' in catalog_matches:
        redshift = catalog_matches['sdss_gals'][0]['z']
        known_redshift = True
    elif 'ned' in catalog_matches:
        redshift = catalog_matches['ned'][0]['Redshift_1']
        if redshift is None:
            redshift = catalog_matches['ned'][0]['redshift_2']
        if redshift is None:
            redshift = catalog_matches['ned'][0]['redshift_corrected']
        if redshift is None:
            known_redshift = False
        else:
            known_redshift = True
    else:
        known_redshift = False
        redshift = None
        

    # Get lightcurve data
    alert_id, mjd, passband, mag, magerr, zeropoint = \
        locus_data.get_time_series(
            'ztf_fid', 'ztf_magpsf', 'ztf_sigmapsf', 'ztf_magzpsci',
            require=['ztf_fid', 'ztf_magpsf', 'ztf_sigmapsf'],
        )

    # Require 2 unique passbands
    if len(np.unique(passband)) < 2:
        print("less than 2 bands")
        locus_data.set_property('astrorapid_skipped', '< 2 bands')
        return

    # Ignore lightcurves shorter than 3
    if len(mjd) < 3:
        print("less than 3 points")
        locus_data.set_property('astrorapid_skipped', '< 3 points')
        return

    # Fill in missing zeropoint values
    zeropoint = np.asarray(zeropoint, dtype=float)
    zpt_median = np.median(zeropoint[(zeropoint != None) & (~np.isnan(zeropoint))])
    zeropoint[zeropoint == None] = zpt_median
    zeropoint[np.isnan(zeropoint)] = zpt_median
    zeropoint = np.asarray(zeropoint, dtype=np.float64)
    if np.any(np.isnan(zeropoint)):
        log_id = locus_data.report_error(
            tag='astrorapid_zeropoint_contains_nan',
            data={
                'alert_id': objid,
            },
        )
        locus_data.set_property('astrorapid_error_log_id', log_id)
        locus_data.set_property('astrorapid_error', 'zeropoint_contains_nan')
        return

    # Compute flux
    mag = np.asarray(mag, dtype=np.float64)
    flux = 10. ** (-0.4 * (mag - zeropoint))
    fluxerr = np.abs(flux * magerr * (np.log(10.) / 2.5))

    # Set photflag detections when S/N > 5
    photflag = np.zeros(len(flux))
    photflag[flux / fluxerr > 5] = 4096
    photflag[np.where(mjd == min(mjd[photflag == 4096]))] = 6144

    # Filter out unwanted bands and convert ztf_fid to strings 'g', 'r'
    passband = np.where((passband == 1) | (passband == '1.0'), 'g', passband)
    passband = np.where((passband == 2) | (passband == '2.0'), 'r', passband)
    mjd, passband, flux, fluxerr, zeropoint, photflag = delete_indexes(
        np.where((passband == 3) | (passband == '3.0') | (np.isnan(mag))),
        mjd, passband, flux, fluxerr, zeropoint, photflag
    )

    # Do classification
    light_curves = [
        (mjd, flux, fluxerr, passband, photflag, ra, dec, objid, redshift, mwebv)
    ]
    classifier = classifiers[known_redshift]
    try:
        predictions, time_steps = classifier.get_predictions(light_curves, return_predictions_at_obstime=True)
    except ValueError:
        log_id = locus_data.report_error(
            tag='astrorapid_get_predictions_valueerror',
            data={
                'alert_id': objid,
                'traceback': traceback.format_exc(),
                'light_curves_pickle_b64': base64.b64encode(pickle.dumps(light_curves)).decode('ascii'),
                'known_redshift': known_redshift,
            },
        )
        locus_data.set_property('astrorapid_error_log_id', log_id)
        locus_data.set_property('astrorapid_error', 'get_predictions_valueerror')
        return
    locus_data.set_property('astrorapid_success', 1)

    # Output
    if predictions:
        for i, name in enumerate(classifier.class_names):
            # Store properties
            # p = predictions[0][-1][i]  # The probability at the last time-step
            p = max(predictions[0][:,i])  # The max probability at any point in the light curve
            locus_data.set_property('rapid_class_probability_{}'.format(name), p)

            # Send to output streams
            if name == 'Pre-explosion':
                continue
            if p > 0.6:
                stream = 'astrorapid_{}'.format(name.lower().replace('-', '_'))
                locus_data.send_to_stream(stream)

              
            
Extragalactic v2

Send to stream `extragalactic` if the Alert is associated with a known galaxy from a catalog.

              
                def extragalactic(locus):
    """
    Send alert to stream 'extragalactic' if it matches any extended source catalogs.
    """
    matching_catalog_names = locus.get_astro_object_matches().keys()

    # These are the catalogs (Antares-based names) with extended sources
    xsc_cats = ['2mass_xsc', 'ned', 'nyu_valueadded_gals', 'sdss_gals', 'veron_agn_qso', 'RC3']

    if set(matching_catalog_names) & set(xsc_cats):
        locus.send_to_stream('extragalactic')
              
            
Gravitational Wave EM Counterpart Filter v2

Test loci against any available LVC skymaps to identify possible EM counterparts

              
                # this needs to go inside setup
import astropy.time as atime
import datetime
import numpy as np
import healpy as hp
import ligo.skymap.io
from ligo.skymap.postprocess import find_greedy_credible_levels
from antares.services import ligo
import zlib

def gw_em_counterpart_search(locus_data):
    '''
    Test if a locus is a plausible counterpart to an active GW source 
    - Gautham Narayan (github: gnarayan)
      20190614
    '''

    # this is a call to get the list of currently active (where active is defined by ANTARES team) list of GW alerts
    # this should be a list of dicts
    active_gw_alerts =  ligo.get_events_cached()

    
    # get the current UTC time
    tnow = atime.Time(datetime.datetime.utcnow()).mjd
    
    # get the event time for each of the GW events
    tgw_alerts = np.array([atime.Time(gw_alert['event_timestamp']).mjd for gw_alert in active_gw_alerts])
    
    # event has to be within the last seven days -  source is unlikely to be bright enough to find beyond that
    if not np.any(np.abs(tgw_alerts - tnow) < 7.):
        print('No GW event within 7 days of now')
        locus_data.set_property('gw_last_proc_status', 'No GW event within 7 days of now')
        return
    
    time_series = locus_data.get_time_series('ztf_fid', 'ztf_magpsf', 'ztf_sigmapsf')
    _, mjd, _, _, _ = time_series
    mjd = np.array(mjd)
    
    # we only care about the newest alert
    mjd_latest = mjd.max()
    
    # indices of GW events that are within 7 days of this alert AND within the last 7 days
    plausible_events = np.abs(tgw_alerts - mjd_latest) < 7.
    gw_event_names = np.array([gw_alert['event_name'] for gw_alert in active_gw_alerts])
    
    plausible_event_names = gw_event_names[plausible_events]
    plausible_event_times = tgw_alerts[plausible_events]
    if len(plausible_event_names) == 0:
        # there are no plausible events that this alert could be a counterpart for
        print('No GW event within 7 days of latest alert at this locus and now')
        locus_data.set_property('gw_last_proc_status', 'No GW event within 7 days of latest alert at this locus and now')
        return 
    
    mjd_earliest = mjd.min()
    history_predates_gw_event = []
    for _, event_timestamp in zip(plausible_event_names, plausible_event_times):
        # the earliest alert at this locus is more than a week before the GW trigger
        if (event_timestamp - mjd_earliest) >= 7:
            history_predates_gw_event.append(True)
        else:
            history_predates_gw_event.append(False)
    history_predates_gw_event = np.array(history_predates_gw_event)
    
    # get the subset of events that do not have any locus prehistory,
    # are within 7 days of this alert AND within the last 7 days
    plausible_event_names = plausible_event_names[~history_predates_gw_event]
    if len(plausible_event_names) == 0:
        # there are no plausible events that this alert could be a counterpart for
        print('Locus has history that predates possible GW events')
        locus_data.set_property('gw_last_proc_status', 'Locus has history that predates possible GW events')
        return 
    
    locus_data.set_property('plausible_gw_events_assoc',' '.join(plausible_event_names))
    print('Considering these GW events:', ' '.join(plausible_event_names))
    
    # we don't need to worry about the other events
    useful_gw_event_inds = [np.where(gw_event_names == event)[0][0] for event in plausible_event_names]
    useful_gw_alerts = [active_gw_alerts[ind] for ind in useful_gw_event_inds]

    # look for any previous variability at the astro object catalog level
    matching_catalog_names = locus_data.get_astro_object_matches().keys()
    
    var_cats = ['veron_agn_qso', 'asassn_variable_catalog']
    if set(matching_catalog_names) & set(var_cats):
        # this locus is associated with a previously variable source, even if we don't have history
        print('Locus associated with source in catalog of known variables (Veron/ASASSN)')
        locus_data.set_property('gw_last_proc_status', 'Locus associated with source in catalog of known variables (Veron/ASASSN)')
        return
    
    # check if this source is nuclear - the BNS/BBH/NS-BH events we care about shouldn't be
    streams = locus_data.get_locus_streams()
    streams = set(streams)
    bad_cats = set(['nuclear_transient', 'high_amplitude_variable_stars','ztf_known_solar_system'])
    if not streams.isdisjoint(bad_cats):
        print('Locus associated with stream of nuclear sources, var stars or solar system objects')
        locus_data.set_property('gw_last_proc_status', 'Locus associated with stream of nuclear sources, var stars or solar system objects')
        return

    
    # if we get here we have to look at the map
    locus_properties = locus_data.get_properties()
    ra = locus_properties['ra']
    dec = locus_properties['dec']
        
    # we want the integral of the probability in some circle around the position
    radius = 2. # degrees
    # coordinate conversions
    theta = 0.5 * np.pi - np.deg2rad(dec)
    phi = np.deg2rad(ra)
    radius = np.deg2rad(radius)
    xyz = hp.ang2vec(theta, phi)
    
    # check the skymap for probability
    for gw_event in useful_gw_alerts:
        
        this_event_name = gw_event['event_name']
        this_stream_name = this_event_name + ' possible GW Counterpart'
        this_stream_name = this_stream_name.lower().replace(' ', '_')

        
        # if the false alarm rate indicates that this event is more common than 1 every 10 years, then divert
        # this isn't the LVC threshold, but it's probably realistic 
        # because more common events are really poorly constrained on the sky
        print(1./(gw_event['event_far']*31536000), this_event_name)
        if 1./(gw_event['event_far']*31536000) < 3.:
            print('GW event FAR too low to consider realistic (1 in 3 years)')
            locus_data.set_property('gw_last_proc_status', 'GW event FAR too low to consider realistic (1 in 3 years)')
            continue
        
        gw_event_class = gw_event['event_classification']
        # check if Terrestrial is > 0.5
        if gw_event_class['Terrestrial'] > 0.5:
            print('GW event is probably terrestial')
            locus_data.set_property('gw_last_proc_status', 'GW event is probably terrestial')
            continue 
        
        # check if terrestrial is most likely, even if not 0.5
        event_class, event_prob = zip(*(gw_event_class.items()))
        event_class = np.array(event_class)
        event_prob = np.array(event_prob)
        ind_max = event_prob.argmax()
        if event_class[ind_max] == 'Terrestrial':
            print('GW event is most likely terrestrial')
            locus_data.set_property('gw_last_proc_status', 'GW event is most likely terrestrial')
            continue 
            
        # check if the most likely involves a neutron star - not going to have an EM counterpart otherwise
        # could disable this to check all events
        if event_class[ind_max] not in ('BNS', 'NSBH'):
            print('GW event is not BNS or NSBH as max likelihood. Skipping.')
            locus_data.set_property('gw_last_proc_status', 'GW event is not BNS or NSBH as max likelihood. Skipping.')
            continue 
                            
        # get the NSIDE of this map, and the pixel index of the locus position, as well as area per pixel
        nside = gw_event['event_skymap_fits_nside'] # don't assume the skymaps have the same NSIDE
        ipix = hp.ang2pix(nside, theta, phi)
        area_norm = hp.nside2pixarea(nside, degrees=True)
                
        # convert the skymap from bytes string to numpy array
        gw_event_skymap = np.frombuffer(zlib.decompress(gw_event['event_skymap_fits_data_gzip']),\
                                            dtype=[('PROB', '>f8')])['PROB'].copy()
        
        # note that we're only using flat prob density on the sky, not distances
        # this is because the redshift catalogs are incomplete
        # can potentially check for locus galaxy matches and see if they do have a redshift
        # and if yes, if that redshift corresponds to the right distance given concordance cosmology
        # this is straightforward, but lets get a few more secure EM counterparts before doing this
             
        
        # what's the probability at this location relative to maximum
        ipix_good = np.isfinite(gw_event_skymap)
        ipix_max = np.argmax(gw_event_skymap[ipix_good])
        max_prob = gw_event_skymap[ipix_good][ipix_max]
        if np.isnan(max_prob):
            print('GW event skymap max prob is NaN')
            locus_data.set_property('gw_last_proc_status', 'GW event skymap max prob is NaN')
            continue 
                
        this_prob = gw_event_skymap[ipix]
        if not np.isfinite(this_prob):
            locus_data.set_property('gw_last_proc_status', 'GW event skymap prob at this locus is not finite')
            continue 
        
        # what's the total proability in an error region around this position
        ipix_disc = hp.query_disc(nside, xyz, radius)
        total_prob = gw_event_skymap[ipix_disc].sum()
        if not np.isfinite(total_prob):
            print('GW event skymap integrated prob in error radius is not finite')
            locus_data.set_property('gw_last_proc_status', 'GW event skymap integrated prob in error radius is not finite')
            continue
            
          
        # are we within the 90% contour or the 50% contour
        credible_levels = find_greedy_credible_levels(gw_event_skymap)
        within90 = (credible_levels[ipix] <= 0.9)
        within50 = (credible_levels[ipix] <= 0.5)
        # if we're within the 90% region, then yes, we are also by definition within the 50% region
        # this is mostly to set an annotation because these are the two contours LVC shows usually
        
        
        # demand that the probability in the circle exceeds some threshold per sq. degree
        # this value is just based on playing with a few skymaps of credible events
        # this could probably be updated
        threshold = 0.05
        threshold_area = 1e-7
                       
        # note that you can take ratios of probability density because HEALpix is equal area
        # if it were not, you'd have to get probability density per sq degree and take the ratio of that
        
        # if relative probability is high or we have > 5 sigma confidence
        if this_prob/area_norm >= threshold and \
            (this_prob/max_prob) > 0.5 and \
            (within50 or within90) and \
            (total_prob > threshold_area):
            locus_data.send_to_stream(this_stream_name)
        else:
            print('Locus not plausibly associated with this GW event from skymap')
            locus_data.set_property('gw_last_proc_status', 'Locus not plausibly associated with this GW event from skymap')
        
    return

              
            
High Amp v2

Update to use new mag correction code.

              
                from statsmodels.stats.weightstats import DescrStatsW


def high_amplitude(ld):
   threshold = 1.0  # in magnitude unit, and same for all filters
   p = ld.get_properties()
   fid = p['ztf_fid']

   _, mjd = ld.get_time_series(filters={'ztf_fid': fid})
   if len(mjd) < 2:
       return

   mag, sigmag, mjd, is_var_star, corrected = ld.get_varstar_corrected_mags(fid=fid)
   if is_var_star and corrected:
       stream_name = 'high_amplitude_variable_stars'
   else:
       stream_name = 'high_amplitude'

   W = 1.0 / sigmag**2.0
   des = DescrStatsW(mag, weights=W)
   if des.std > threshold:
       ld.send_to_stream(stream_name)
              
            
High Flux
              
                def high_flux_ratio_stream_v2(locus):
    ref_zps={1:26.325,2:26.275,3:25.660}
    T_pos_neg={"pos":{1:10.5, 2:10.3}, "neg":{1:0.15, 2:0.14}} #this threshold expected to flag ~3% of the total alerts in respective fid's. No i-band filter alerts found
    alert_base_props=locus.get_properties()
    product_sign=2.0*(alert_base_props['ztf_isdiffpos']=='t')-1
    
    if product_sign>0.0:
        threshold=T_pos_neg['pos']
    if product_sign<0.0:
        threshold=T_pos_neg['neg']
        
    if (alert_base_props['ztf_distnr']<=1.0 and alert_base_props['ztf_distnr']>=0.0) and ('ztf_magzpsci' in alert_base_props.keys()) and (alert_base_props['ztf_fid'] in threshold.keys()):        
        ref_flux=10**(0.4*(ref_zps[alert_base_props['ztf_fid']]-alert_base_props['ztf_magnr']))
        difference_flux=10**(0.4*(alert_base_props['ztf_magzpsci']-alert_base_props['ztf_magpsf']))
        sci_flux=ref_flux+product_sign*difference_flux
        flux_ratio=sci_flux/ref_flux

        if (product_sign<0.0 and flux_ratio<threshold[alert_base_props['ztf_fid']]) or (product_sign>0.0 and flux_ratio>threshold[alert_base_props['ztf_fid']]):
            locus.send_to_stream("high_flux_ratio_wrt_nn")
              
            
High SNR v2

              
                def high_snr(locus):
    #should flag ~2-3% of alerts. Again no i-filter alerts found
    snr_threshold = {1: 50.0, 2: 55.0}

    p = locus.get_properties()
    try:
        sigmapsf = p['ztf_sigmapsf']
        fid = p['ztf_fid']
        threshold = snr_threshold[fid]
    except KeyError:
        return  # Skip        

    alert_snr = 1.0 / sigmapsf
    if alert_snr > threshold:
        locus.send_to_stream('high_snr')

              
            
M31

Detect alerts within a square box around M31.

              
                def in_m31(locus):
    ra_max = 11.434793
    ra_min = 9.934793
    dec_max = 42.269065
    dec_min = 40.269065

    alert_props = locus.get_properties()
    ra = alert_props['ra']
    dec = alert_props['dec']

    if ra_max > ra > ra_min \
    and dec_max > dec > dec_min:
        locus.send_to_stream("in_m31")
              
            
Nuclear Transient
              
                def nuclear_transient(locus):
    """
    Send alert to stream 'Nuclear Transient' if it is within 0.6 arcseconds of a
    source in the ZTF reference frame. It is also required that a match within
    1" of a known Pan-STARRS galaxy (ztf_distpsnr1 < 1. and ztf_sgscore1<0.3).
    To further remove small flux fluctuaion, we also require magpsf (alert PSF
    photometry) - magnr (PSF photometry of the nearby source in the reference
    image) > 1.5. The selection criteria are from Sjoert van Velzen et al.
    (2018, arXiv:1809.02608), section 2.1.
    """
    try:
        alert_props = locus.get_properties()
        sgscore = alert_props['ztf_sgscore1']
        distpsnr = alert_props['ztf_distpsnr1']
        magpsf = alert_props['ztf_magpsf']
        magnr = alert_props['ztf_magnr']
        distnr = alert_props['ztf_distnr']
    except KeyError:
        return  # Skip
    if None in (distnr, distpsnr, sgscore, magpsf, magnr):
        return

    if distnr < 0.6 and distpsnr < 1. and sgscore < 0.3 and magpsf - magnr < 1.5:
        locus.send_to_stream("nuclear_transient")

              
            
Potential TDE v1

Checks for crossmatches against French and Zabludoff (2018) excluding variables and AGN

              
                def post_starburst_xmatch(ld):
    """
    Send alert to stream 'tde_candidate' if it matches French and Zabludoff (2018) catalog.
    - Gautham Narayan (github: gnarayan)
    20190911
    """
   
    # need to make sure this filter is run at stage 3 after we've associated solar system sources
    streams = ld.get_locus_streams()
    streams = set(streams)
    bad_streams = set(['high_amplitude_variable_stars','ztf_known_solar_system'])
    
    # if this locus is associated with a var star or a known solar system catalog
    # then even if it is associated with a potential post-starburst, we should ignore
    # TDE are long-lived enough that a single alert associated with a known solar system object
    # will take care of itself on the next observation
    # and if there is a high-amplitude variable at the same xmatch tolerance
    # then assume we're seeing something related to the common variable star acting up
    # rather than rare TDE behavior
    if not streams.isdisjoint(bad_streams):
        print('Locus associated with stream of var stars or solar system objects')
        return

    # if we get to here, we want to make sure the source is actually in the post-starburst catalog
    ld_cats = ld.get_astro_object_matches().keys()
    ld_cats = set(ld_cats)

    # check that this locus is associated with a post-starburst galaxy
    # and is NOT associated with ANY variable (not just high amp) or known QSO
    good_cats = set(['french_post_starburst_gals'])
    bad_cats  = set(['veron_agn_qso', 'asassn_variable_catalog'])

    if (ld_cats & good_cats) and not (ld_cats & bad_cats):
        # if yes, then flag this is a possible TDE candidate
        ld.send_to_stream('tde_candidate')
    else:
        print('Not in TDE cat, or associated with variable source')
        return
    

              
            
siena_mag_coord_cut2

Filters alerts by magnitude and coordinate for Siena College's .7m telescope (by Albany, NY). Updated for mag correction to use for variable stars

              
                def siena_mag_coord_cut2(ld):
    """
    Send alerts to stream 'Siena_mag_coord_cut2' if Alert is less than
    17 apparent magnitude in r, and if RA/Dec limits are met. 
    """
    mag_max = 17
    dec_min = 0
    dec_max = 90
    ra_min = 75
    ra_max = 180
    p = ld.get_properties()
    ra = p['ra']
    dec = p['dec']

    # Get arrays of corrected mag and sigmag, if possible.
    # This function only works on variable stars, otherwise it returns
    # ZTF's `magpsf` and `sigmagpsf` without modification.
    mag, sigmag, mjd, is_var_star, corrected = ld.get_varstar_corrected_mags()
    alert_mag = mag[-1]

    if alert_mag < mag_max \
        and dec_max > dec > dec_min \
        and ra_max > ra > ra_min:
        ld.send_to_stream("siena_mag_coord_cut2")
              
            
refitt_newsources_snrcut v2

New sources with signal-to-noise ratio greater than five appropriate for REFITT

              
                def refitt_newsources_snrcut(locus):
    # check if this source is nuclear, SSO, or known variable star
    streams = set(locus.get_locus_streams())
    bad_cats = {
        'nuclear_transient',
        'high_amplitude_variable_stars',
        #'ztf_sso_candidates',
    }
    if streams & bad_cats:
        return

    snr_threshold = {1: 5.0, 2: 5.0}
    p = locus.get_properties()
    alert_snr = 1.0 / p['ztf_sigmapsf']
    if p['ztf_fid'] in snr_threshold.keys() \
    and alert_snr > snr_threshold[p['ztf_fid']] \
    and p['ztf_distnr'] > 1.0 \
    and p['ztf_distpsnr1'] > 2.0:
        locus.send_to_stream('refitt_newsources_snrcut')

              
            
blue extragalactic transient filter v1.3

Searches for ZTF blue alerts of new sources, not associated with stars, and away from the Galactic plane.

              
                # Imports
import astropy
import numpy as np

ERROR_LOG_SLACK_CHANNEL = 'UMWERJYM8'

def blue_extragalactic_transient_filter_v1p3(locus_data):
    """
    A preliminary filter for blue extragalactic transients
    """
    # Alert is rejected if the absolute value of the galactic latitude (bii) is less than this value (degrees)
    gal_latitude_cut_degrees = 15.
    
    # Alerts are rejected if a previous alert for this object was found more than this number of days previous to the sampled alert.
    previous_alert_cut_days = 14.
    
    # Star criteria based on conversation with Adam Miller on October 2nd, 2019
    #     Must be > ztf_sgscore_cut and proximity <= ztf_distpsnr_cut
    ztf_sgscore_cut = 0.5 # > 0.5 likely star; = 0.5 unknown due to Pan-STARRS data quality issues
    ztf_distpsnr_cut = 1.0 # arcseconds from closest Pan-STARRS source. 
    
    # Screen out g-R comparisons when alerts are from more than this number of days apart
    delta_days_in_corresponding_filter = 4.
    
    # Color selection criteria for correlated alerts.
    #     Rejecting if color >= ztf_g_R_cut 
    ztf_g_R_cut = 0.
    
    
    
    print('`blue_extragalactic_transient_filter_v1p3` is running...')
    
    
    ##################################################
    #
    # Get a dict of all properties on the new alert.
    #
    
    p = locus_data.get_properties()

    #
    ##################################################
    
    
    ##################################################
    #
    # Is this source near the Galactic Plane?
    #
    
    p_coordinate_object = astropy.coordinates.SkyCoord(ra=p['ra'], dec=p['dec'], unit='deg', frame=astropy.coordinates.FK5, equinox='J2000.0')
    
    # Stop if this is a Galactic target
    if abs( p_coordinate_object.galactic.b.deg ) <= gal_latitude_cut_degrees:
        #print( 'ZTF alert within {0:.1f} degrees of the Galactic plane: {1:.1f} degrees'.format( gal_latitude_cut_degrees, p_coordinate_object.galactic.b.deg ) )
        return
    
    #
    ##################################################
    

    ##################################################
    #
    # Ignore if a ZTF alert has been made more than 14 days prior to this alert.
    #
    
    history_check_ld = locus_data.get_time_series('ztf_magpsf')
    
    # There is no other alert related to this ld
    #     Cannot:
    #             Track rise / decay
    #             Create a color based classification
    
    if history_check_ld[0].size == 1:
        #print( 'This is the only ZTF alert for this locus.' )
        return
    
    # Previous listings could be due to non-detections. Remove these from consideration.
    historical_detection_indices = np.arange( history_check_ld[2].size )[~np.isnan( history_check_ld[2].astype(np.float64) ) ]
    if historical_detection_indices.size == 0:
        #print( 'No other detections exist for this locus.' )
        return

    elif ( historical_detection_indices.size > 0 ) and ( p['mjd'] - np.min( history_check_ld[1][ historical_detection_indices ] ) > previous_alert_cut_days ):
        #print( 'Previous ZTF alerts found more than {0:.1f} days prior to this alert: {1:.1f} days'.format( previous_alert_cut_days, p['mjd'] - np.min( history_check_ld[1] ) ) )
        return

    #
    ##################################################
    
    
    ##################################################
    #
    # Use Tachibana & Miller (2018) to filter out stars
    #
    
    if 'ztf_sgscore1' in p.keys() and p['ztf_sgscore1'] > ztf_sgscore_cut and 'ztf_distpsnr1' in p.keys() and p['ztf_distpsnr1'] <= ztf_distpsnr_cut:
        #print('Satisfies star classification criteria from Tachibana & Miller (2018): ztf_sgscore1={0:.2f}, ztf_distpsnr1={1:.2f}'.format( p['ztf_sgscore1'], p['ztf_distpsnr1'] ))
        return
    
    #
    ##################################################
    
    
    ##################################################
    #
    # Are there ZTF alerts in the other filter?
    #
    
    
    # If g check for r
    if p['ztf_fid'] == 1:
        check_fid = 2

    
    # If r check for g
    else:
        check_fid = 1
    
    
    #
    # Query associations in the complementary filter
    #     check_ld.size will be a multiple of 2 + the number of fields requested
    #     Indexing is required on this array since it does not have keys, unlike the locus_data.get_properties()
    #
    check_ld = locus_data.get_time_series('ra', 'dec', 'ztf_magpsf', 'ztf_sigmapsf', filters={'ztf_fid': check_fid})
    
    
    #
    # Ignore nan entries
    #
    check_ld_good_indices = np.arange( check_ld[4].size )[~np.isnan( check_ld[4].astype(np.float64) ) ]
    
    
    #
    # No corresponding detections in the alternative filter
    #
    if check_ld[0][ check_ld_good_indices ].size == 0:
        #print('No detections in the other filter ({0}).'.format(check_passband))
        return
    

    else:
        

        ##############################################
        #
        # Find index of temporally closest observation in the other filter
        #     This is needed for the following steps
        #
        temporally_closest_previous_alert_index = check_ld_good_indices[ np.argmin( p['mjd'] - check_ld[1][ check_ld_good_indices ] ) ]
        
        
        #
        # Ensure that we are comparing [z]-[R] when taken with a relatively close temporal proximity
        #
        if np.abs( p['mjd'] - check_ld[1][ temporally_closest_previous_alert_index ] ) > delta_days_in_corresponding_filter:
            #print('Closest z and R alerts were taken too far apart: {0:.1f} > {1:.1f}'.format( np.abs( p['mjd'] - check_ld[1][ temporally_closest_previous_alert_index ] ), delta_days_in_corresponding_filter))
            return
        

        #
        # Confirmed detections in this alternative filter previous to this alert and taken within a resonable time period.
        #
        else:
            
            
            #print( 'Alerts {0} days apart.'.format( p['mjd'] - check_ld[1][ temporally_closest_previous_alert_index ] ) )
            
            
            ##########################################
            #
            # Look for cases where g and R were taken within 4 days of each other
            #
            # Create ZTF g-R:
            #
            #print( 'Check filter mag: {0:f}'.format( check_ld[4][ temporally_closest_previous_alert_index ] ) )
            if p['ztf_fid'] == 1:
                ztf_g_R = p['ztf_magpsf'] - check_ld[4][ temporally_closest_previous_alert_index ]
            else:
                ztf_g_R = check_ld[4][ temporally_closest_previous_alert_index ] - p['ztf_magpsf']
            
            #
            # Always keep track of error propogation
            #
            ztf_g_R_error = np.sqrt(p['ztf_sigmapsf'] ** 2 + check_ld[5][ temporally_closest_previous_alert_index ] ** 2)
            
            
            #
            # This is a blue source
            #
            if ztf_g_R >= ztf_g_R_cut:

                return
            

            #
            ##########################################
    
        #
        ##############################################
    
    #
    ##################################################
    
    
    print('Potential match {0}'.format(p['alert_id']))
    
    
    ##################################################
    #
    # Record useful information and send to the stream
    #
        
    #
    # Add color properties
    #
    locus_data.set_property( 'g_minus_r',     ztf_g_R )
    locus_data.set_property( 'g_minus_r_err', ztf_g_R_error )
       
    #
    # Send to stream
    #
    locus_data.send_to_stream('blue_extragalactic_transient_filter')

    #
    ##################################################

              
            
wyrzykowski_bright_microlensing_v9

Bright microlensing candidates mag<17 from ZTF

              
                ERROR_LOG_SLACK_CHANNEL = 'UP414JK1D' #my slack id, instead of name

#wyrzykowski_bright_microlensing_v9
#uses g and r light curves, requires 10 points in each band
#excludes known variable stars from ASASSN and high amplitude variables from other stream
#fits microlensing curve

from scipy.stats import skew
import numpy as np
from scipy.optimize import leastsq

def ulens_fixedbl(t, t0, te, u0, I0, dummy):
    fs=1.
    tau=(t-t0)/te
    x=tau
    y=u0
    u=np.sqrt(x**2+y**2)
    ampl= (u**2 + 2)/(u*np.sqrt(u**2+4))
    F = ampl*fs + (1-fs)
    I = I0 - 2.5*np.log10(F)
    return I

def fit_ulensfixedbl(epoch, avmag, err):
    #catching short light curves:
    if (len(epoch)<10):
        return [999,999,999,999], 1999999.
    t0=epoch[np.argmin(avmag)]
    te=50.
    u0=0.1
    I0=np.amax(avmag)
    x=epoch
    y=avmag
    ulensparam=[t0, te, u0, I0]
    fp = lambda v, x: ulens_fixedbl(x, v[0],v[1],v[2],v[3])
    e = lambda v, x, y, err: ((fp(v,x)-y)/err)
    v, success = leastsq(e, ulensparam, args=(x,y,err), maxfev=1000000)
    #no solution:
    if (success == 0) : return ulensparam, 999999.
    chi2 = sum(e(v,x,y,err)**2)
    chi2dof=sum(e(v,x,y,err)**2)/(len(x)-len(v))
    out = []
    for t in v:
        out.append(t)
    out.append(chi2)
    out.append(chi2dof)
    return out, chi2dof

#computes merged chi^2 for two functions
def residtwo(avmag1, err1, y1, avmag2, err2, y2):        
    diff1 = (y1 - avmag1)/err1 
    diff2 = (y2 - avmag2)/err2 
    return np.concatenate((diff1, diff2))

#fits two curves from two bands with one model
def fit_ulensfixedbltwo(epoch1, avmag1, err1, epoch2, avmag2, err2):
    #catching short light curves:
    if (len(epoch1)<10 or len(epoch2)<10):
        return [999,999,999,999,999], 1999999.
    t0=epoch2[np.argmin(avmag2)]
    te=50.
    u0=0.1
    G0=np.amax(avmag1)
    R0=np.amax(avmag2)
    x1=epoch1
    y1=avmag1
    x2=epoch2
    y2=avmag2
    ulensparam=[t0, te, u0, G0, R0]
    fp1 = lambda v, x: ulens_fixedbl(x, v[0], v[1], v[2], v[3], v[4])
    fp2 = lambda v, x: ulens_fixedbl(x, v[0], v[1], v[2], v[4], v[3])
    e = lambda v, x1, x2, y1, y2, err1, err2: residtwo(avmag1, err1, fp1(v, epoch1), avmag2, err2, fp2(v, epoch2))
    v, success = leastsq(e, ulensparam, args=(x1, x2, y1, y2, err1, err2), maxfev=1000000)
    #no solution:
    if (success == 0) : return ulensparam, 999999.
    chi2 = sum(e(v,x1, x2 ,y1, y2, err1, err2)**2)
    chi2dof=sum(e(v,x1,x2,y1,y2,err1,err2)**2)/(len(x1)+len(x2)-len(v))
    out = []
    for t in v:
        out.append(t)
    out.append(chi2)
    out.append(chi2dof)
    return out, chi2dof

def bright_microlensing(ld):
    props = ld.get_properties()
    alert_id = props['alert_id']

    skewThreshold = 0
    etaThreshold = 0.3
    chi2Threshold = 10.
    u0Threshold = 0.3
    teThreshold = 1000.

    #r-band only check
    _, mjd = ld.get_time_series(filters={'ztf_fid': 2})
    #these are all measurements above the baseline from difference imaging, so we can ask only for 4
    if len(mjd) < 4:
        print("less than 4 alerts in r-band")
        return

    # need to make sure this filter is run at stage 3 after we've associated solar system sources
    streams = ld.get_locus_streams()
    streams = set(streams)
    bad_streams = set(['high_amplitude_variable_stars','ztf_known_solar_system'])
    # if this locus is associated with a var star or a known solar system catalog
    # we ignore it
    if not streams.isdisjoint(bad_streams):
        print('Locus associated with stream of var stars or solar system objects')
        return
    # if we get to here, we want to make sure the source is actually in the post-starburst catalog
    ld_cats = ld.get_astro_object_matches().keys()
    ld_cats = set(ld_cats)
        
    # check that this locus is NOT associated with ANY variable (not just high amp)
    bad_cats  = set(['asassn_variable_catalog'])
    if (ld_cats & bad_cats):
        print('Not microlensing, associated with variable source')
        return
    
    stream_name = 'bright_microlensing'

    #g-band data
    mags1, errs1, times1, is_var_star1, corrected1 = ld.get_varstar_corrected_mags(fid=1)
    #r-band data
    mags2, errs2, times2, is_var_star2, corrected2 = ld.get_varstar_corrected_mags(fid=2)

    #these measurements will also include baseline so we require at least 10
    if (len(times1)<10 or len(times2)<10):
        print("MICROLENSING FILTER: Requiring at least 10 points with the baseline in g- and r-band")
        return

    #computing stats
    nrtrid1=len(times1)
    eta1=1./(nrtrid1-1.)/np.var(mags1)*np.sum((mags1[1:]-mags1[:-1])*(mags1[1:]-mags1[:-1]))
    #skewness
    skewness1=skew(mags1)
    nrtrid2=len(times2)
    eta2=1./(nrtrid2-1.)/np.var(mags2)*np.sum((mags2[1:]-mags2[:-1])*(mags2[1:]-mags2[:-1]))
    #skewness
    skewness2=skew(mags2)
    rmax=np.amax(mags2)
    gmax=np.amax(mags1)

#    print(times1, mags1)
#    print(times2, mags2)
        
    #brightness cut only in r
    if (rmax>17.0 or skewness1>skewThreshold or eta1>etaThreshold or skewness2>skewThreshold or eta2>etaThreshold) :
        print(alert_id, " MICROLENSING FILTER: alert not bright enough or not skewed enough, ",rmax, skewness1, eta1, gmax, skewness2, eta2)
        return

    #skew+vonN criteria ok, now fitting both bands
    
    fitparams, chi2dof = fit_ulensfixedbltwo(times1, mags1, errs1, times2, mags2, errs2)
    t0=fitparams[0]
    te=np.abs(fitparams[1])
    u0=np.abs(fitparams[2])
    G0=np.abs(fitparams[3])
    R0=np.abs(fitparams[4])
    print(alert_id, " MICROLENSING FILTER: fitting ",fitparams, chi2dof)
    if (np.abs(u0)<u0Threshold and chi2dof<chi2Threshold and te<teThreshold) :
        ld.set_property('t0', t0)
        ld.set_property('te', te)
        ld.set_property('u0', u0)
        ld.set_property('R0', R0)
        ld.set_property('G0', G0)
        ld.set_property('chi2', chi2dof)
        ld.send_to_stream(stream_name)


              
            
young_extragalactic_candidate

A filter for young extragalactic transient candidates. Filters out stars and alerts near the Galactic plane. It casts a very wide net with no catalog cross matching.

              
                # Imports
import astropy
import numpy as np

ERROR_LOG_SLACK_CHANNEL = 'UMWERJYM8'

def young_extragalactic_candidate( locus_data, verbose = False ):

    """
    A filter for young extragalactic transient candidates.
    Filters out stars and alerts near the Galactic plane. This 
    filter casts a very wide net with NO catalog cross matching 
    (as opposed to the `extragalactic` stream), thus downstream 
    filtering will be required.
    Contact: michael.stroh@northwestern.edu
    """

    # Alert is rejected if the absolute value of the galactic latitude (bii) is less than this value (degrees)
    gal_latitude_cut_degrees = 15.
    
    # Alerts are rejected if a previous alert for this object was found more than this number of days previous to the sampled alert.
    previous_alert_cut_days = 14.
    
    # Star criteria based on conversations with Adam Miller
    #     Must be > ztf_sgscore_cut and proximity <= ztf_distpsnr_cut
    ztf_sgscore_cut = 0.5 # > 0.5 likely star; = 0.5 unknown due to Pan-STARRS data quality issues
    ztf_distpsnr_cut = 1.0 # arcseconds from closest Pan-STARRS source. 
    
    
    if verbose:
        print('`young_extragalactic_candidate` is running...')
    
    
    ##################################################
    #
    # Get a dict of all properties on the new alert.
    #
    
    p = locus_data.get_properties()

    #
    ##################################################
    
    
    ##################################################
    #
    # Is this source near the Galactic Plane?
    #
    
    p_coordinate_object = astropy.coordinates.SkyCoord(ra=p['ra'], dec=p['dec'], unit='deg', frame=astropy.coordinates.FK5, equinox='J2000.0')
    
    
    #
    # Stop if this is near the Galactic plane
    #
    if abs( p_coordinate_object.galactic.b.deg ) <= gal_latitude_cut_degrees:
        if verbose:
            print( 'ZTF alert within {0:.1f} degrees of the Galactic plane: {1:.1f} degrees'.format( gal_latitude_cut_degrees, p_coordinate_object.galactic.b.deg ) )
        return
    
    #
    ##################################################
    

    ##################################################
    #
    # Ignore if a ZTF alert has been made more 
    # than 14 days prior to this alert.
    #
    
    history_check_ld = locus_data.get_time_series('ztf_magpsf')

    
    #
    # Previous listings could be due to non-detections. 
    # Remove these from consideration.
    #

    historical_detection_indices = np.arange( history_check_ld[2].size )[~np.isnan( history_check_ld[2].astype(np.float64) ) ]

    
    #
    # Ensure that this is a relatively new transient
    #
    if ( historical_detection_indices.size > 0 ) and ( p['mjd'] - np.min( history_check_ld[1][ historical_detection_indices ] ) > previous_alert_cut_days ):
        if verbose:
            print( 'Previous ZTF alerts found more than {0:.1f} days prior to this alert: {1:.1f} days'.format( previous_alert_cut_days, p['mjd'] - np.min( history_check_ld[1] ) ) )
        return

    #
    ##################################################
    
    
    ##################################################
    #
    # Use Tachibana & Miller (2018) to filter out stars
    #
    
    #
    # It is possible to have multiple PS DR1 associations 
    # within 1", so loop through all to be on the safe 
    # side. Many of these are screened out in PS DR2 (and 
    # have sgscore=0.5 which isn't particularly useful), so 
    # this may be removed if compared against that release.
    #
    for ps_id in range(1,4):
        if 'ztf_sgscore{0}'.format(ps_id) in p.keys() and p['ztf_sgscore{0}'.format(ps_id)] > ztf_sgscore_cut and 'ztf_distpsnr{0}'.format(ps_id) in p.keys() and p['ztf_distpsnr{0}'.format(ps_id)] <= ztf_distpsnr_cut:    
            if verbose:
                print('Satisfies star classification criteria from Tachibana & Miller (2018): ztf_sgscore{0}={1:.2f}, ztf_distpsnr{0}={2:.2f}'.format( ps_id, p['ztf_sgscore{0}'.format(ps_id)], p['ztf_distpsnr{0}'.format(ps_id)] ))
            return
    
    #
    ##################################################    
    
    if verbose:
        print( 'Candidate found: {0}'.format( p['alert_id'] ) )
    
    
    ##################################################
    #
    # Record useful information and send to the stream
    # - None for now
    #
    
    #
    # Send to stream
    #
    locus_data.send_to_stream('young_extragalactic_candidate')

    #
    ##################################################

              
            

Output

When an alert exits the pipeline it has been flagged with catalog matches, arbitrary new data properties generated by the filters, and stream associations. At this point we check alerts for association with user-submitted watched objects, and send Slack notifications accordingly.

Finally, we output the alert to Kafka streams if it was associated with a stream. Downstream systems and users connect to the streams in real-time using the ANTARES client library.