About the ANTARES Pipeline

ANTARES processes alerts using a multi-stage pipeline. The pipeline consists of both internal ANTARES modules and user-submitted filters. Filters can be thought of as plugins for ANTARES which allow researchers to run their own computational logic in ANTARES. This page documents the structure of the pipeline and includes the source code of the filters which are currently running.

Inputs

Currently, ANTARES receives alerts from the ZTF survey in real-time. In the future it will process data from additional sources, like LIGO. Its ultimate purpose is to serve as a real-time alert broker for LSST.


Level 1 Filters

Every alert that ANTARES receives is sent through Level 1 (L1) filters. L1 filters exist to slim down the immense input data stream by throwing out alerts that are likely bogus (see the Bad Detection filter) or that were collected in poor conditions (see the Bad Seeing filter). If an alert doesn't meet the criteria imposed by an L1 filter it is discarded and isn't stored in our database.

L1 filters do not have access to object history or catalog associations.

Bad Detection v2

Discard alerts that are likely bogus, have bad pixels or a large difference between aperture and PSF magnitudes.

                
                  def bad_detection(locus_data):
    p = locus_data.get_properties()
    try:
        if p.get('ztf_rb') is not None:
            assert p['ztf_rb'] >= 0.55
        if p.get('ztf_nbad') is not None:
            assert p['ztf_nbad'] == 0
        if p.get('ztf_magdiff') is not None:
            assert -1.0 <= p['ztf_magdiff'] <= 1.0
    except AssertionError:
        raise locus_data.get_halt()
                
              
SSO

Detect alerts which ZTF has flagged as known solar system objects. These objects are crossmatched with JPL databases using astroquery and sent to output streams.

                
                  import numpy as np
from astroquery.jplhorizons import Horizons
from requests.exceptions import ConnectionError, SSLError


def translate_identifier(ztf_name):
    targetname = ztf_name
    # designation

    if len(ztf_name) > 4 and ztf_name[4].isalpha():
        if float(ztf_name[6:]) == 0:
            targetname = ztf_name[:4] + ' ' + ztf_name[4:6]
        else:
            targetname = (ztf_name[:4] + ' ' + ztf_name[4:6] +
                          str(int(float(ztf_name[6:]))))

    # comet designation
    if '/' in ztf_name:
        targetname = ztf_name[:6] + ' ' + ztf_name[6:]
    return targetname


def ztf_known_solar_system(ld):
    p = ld.get_properties()

    # Check for required parameters
    if p.get('ztf_magzpsci') is None or p.get('ztf_nmatches') is None:
        return
    if p.get('ztf_isdiffpos') is None or p.get('ztf_nbad') is None or p.get('ztf_rb') is None or p.get('ztf_ssdistnr') is None:
        return
    if p.get('ztf_ssnamenr') is None or p.get('ztf_jd') is None:
        return

    # reject unreliable detections
    if (p['ztf_isdiffpos'] == 'f' or
        p['ztf_isdiffpos'] == 0 or
        p['ztf_nbad'] > 0 or
        p['ztf_rb'] < 0.65 or
        p['ztf_ssdistnr'] == -999):
        return  # Skip this Alert

    # Send to ztf_sso_candidates
    ld.send_to_stream('ztf_sso_candidates')

    # check positional agreement and positional uncertainties
    targetname = translate_identifier(p['ztf_ssnamenr'])
    epoch = p['ztf_jd']
    hor = Horizons(id=targetname,
                   location='I41',
                   epochs=[epoch + 30/2/86400])
    try:
        eph = hor.ephemerides()
    except ValueError:
        return  # Skip this Alert
    except (ConnectionError, SSLError, Exception):
        return  # Failed to connect to JPL

    # Save Horizons properties in Antares
    ld.set_property('horizons_targetname', targetname)
    new_properties = [
        ('horizons_absolutemag', 'H'),
        ('horizons_slopeparameter', 'G'),
        ('horizons_predictedmagnitude', 'V'),
        ('horizons_heliodist_au', 'r'),
        ('horizons_observerdist_au', 'delta'),
        ('horizons_solarphaseangle', 'alpha'),
        ('horizons_solarelongation', 'elong'),
        ('horizons_ra_posunc_3sig_arcsec', 'RA_3sigma'),
        ('horizons_dec_posunc_3sig_arcsec', 'DEC_3sigma'),
        ('horizons_true_anomaly', 'true_anom'),
    ]
    for prop, var in new_properties:
        try:
            ld.set_property(prop, float(eph[var][0]))
        except (IndexError, ValueError, KeyError):
            pass

    if (np.sqrt((eph['RA'][0] - p['ra']) ** 2 +
                (eph['DEC'][0] - p['dec']) ** 2) * 3600 > 1):
        return  # Skip this Alert

    if np.sqrt(eph['RA_3sigma'][0] ** 2 + eph['DEC_3sigma'][0] ** 2) > 1:
        return  # Skip this Alert

    # Send to ztf_sso_confirmed
    ld.send_to_stream('ztf_sso_confirmed')

                
              
Bad Seeing

Discard alerts with poor seeing or elongated sources.

                
                  def bad_seeing(locus_data):
    p = locus_data.get_properties()
    try:
        if p.get('ztf_fwhm') is not None:
            assert p['ztf_fwhm'] <= 5.0
        if p.get('ztf_elong') is not None:
            assert p['ztf_elong'] <= 1.2
    except AssertionError:
        raise locus_data.get_halt()
                
              

Ingestion and Aggregation

Alerts which pass the L1 filters, or are sent to a stream by an L1 filter, are ingested and stored in the ANTARES alert database. We also ingest the history of past measurements, if such is included in the alert.

After ingestion we associate each measurement with a known locus of past alerts within 1", and with any nearby objects in our catalogs. The catalog search radius is dependent on the specific catalog.


Level 2 Filters

After alerts are ingested, aggregated, and associated with catalogs, ANTARES runs the L2 filters. The purpose of the L2 filters is to detect the interesting science alerts based on custom criteria.

Require 2+ Measurements

Halt the pipeline now if this is the first Alert on this Locus.

              
                def require_2_measurements(ld):
    # Require 2+ ztf_magpsf measurements in the lightcurve.
    # This will exclude upper-limits from the count, which do not have magpsf.
    mags = ld.get_time_series('ztf_magpsf')[-1]
    n = len([m for m in mags if m])
    if n < 2:
        raise ld.get_halt()
              
            
Extragalactic v2

Send to stream `extragalactic` if the Alert is associated with a known galaxy from a catalog.

              
                def extragalactic(locus):
    """
    Send alert to stream 'extragalactic' if it matches any extended source catalogs.
    """
    matching_catalog_names = locus.get_astro_object_matches().keys()

    # These are the catalogs (Antares-based names) with extended sources
    xsc_cats = ['2mass_xsc', 'ned', 'nyu_valueadded_gals', 'sdss_gals', 'veron_agn_qso', 'RC3']

    if set(matching_catalog_names) & set(xsc_cats):
        locus.send_to_stream('extragalactic')
              
            
High Amp v2

Update to use new mag correction code.

              
                from statsmodels.stats.weightstats import DescrStatsW


def high_amplitude(ld):
   threshold = 1.0  # in magnitude unit, and same for all filters
   p = ld.get_properties()
   fid = p['ztf_fid']

   _, mjd = ld.get_time_series(filters={'ztf_fid': fid})
   if len(mjd) < 2:
       return

   mag, sigmag, mjd, is_var_star, corrected = ld.get_varstar_corrected_mags(fid=fid)
   if is_var_star and corrected:
       stream_name = 'high_amplitude_variable_stars'
   else:
       stream_name = 'high_amplitude'

   W = 1.0 / sigmag**2.0
   des = DescrStatsW(mag, weights=W)
   if des.std > threshold:
       ld.send_to_stream(stream_name)
              
            
High Flux
              
                def high_flux_ratio_stream_v2(locus):
    ref_zps={1:26.325,2:26.275,3:25.660}
    T_pos_neg={"pos":{1:10.5, 2:10.3}, "neg":{1:0.15, 2:0.14}} #this threshold expected to flag ~3% of the total alerts in respective fid's. No i-band filter alerts found
    alert_base_props=locus.get_properties()
    product_sign=2.0*(alert_base_props['ztf_isdiffpos']=='t')-1
    
    if product_sign>0.0:
        threshold=T_pos_neg['pos']
    if product_sign<0.0:
        threshold=T_pos_neg['neg']
        
    if (alert_base_props['ztf_distnr']<=1.0 and alert_base_props['ztf_distnr']>=0.0) and ('ztf_magzpsci' in alert_base_props.keys()) and (alert_base_props['ztf_fid'] in threshold.keys()):        
        ref_flux=10**(0.4*(ref_zps[alert_base_props['ztf_fid']]-alert_base_props['ztf_magnr']))
        difference_flux=10**(0.4*(alert_base_props['ztf_magzpsci']-alert_base_props['ztf_magpsf']))
        sci_flux=ref_flux+product_sign*difference_flux
        flux_ratio=sci_flux/ref_flux

        if (product_sign<0.0 and flux_ratio<threshold[alert_base_props['ztf_fid']]) or (product_sign>0.0 and flux_ratio>threshold[alert_base_props['ztf_fid']]):
            locus.send_to_stream("high_flux_ratio_wrt_nn")
              
            
High SNR
              
                def high_snr(locus):
    #should flag ~2-3% of alerts. Again no i-filter alerts found
    snr_threshold = {1: 50.0, 2: 55.0}

    p = locus.get_properties()
    alert_snr = 1.0 / p['ztf_sigmapsf']
    if p['ztf_fid'] in snr_threshold.keys() \
    and alert_snr > snr_threshold[p['ztf_fid']]:
        locus.send_to_stream('high_snr')
              
            
M31

Detect alerts within a square box around M31.

              
                def in_m31(locus):
    ra_max = 11.434793
    ra_min = 9.934793
    dec_max = 42.269065
    dec_min = 40.269065

    alert_props = locus.get_properties()
    ra = alert_props['ra']
    dec = alert_props['dec']

    if ra_max > ra > ra_min \
    and dec_max > dec > dec_min:
        locus.send_to_stream("in_m31")
              
            
Nuclear Transient
              
                def nuclear_transient(locus):
    """
    Send alert to stream 'Nuclear Transient' if it is within 0.6 arcseconds of a
    source in the ZTF reference frame. It is also required that a match within
    1" of a known Pan-STARRS galaxy (ztf_distpsnr1 < 1. and ztf_sgscore1<0.3).
    To further remove small flux fluctuaion, we also require magpsf (alert PSF
    photometry) - magnr (PSF photometry of the nearby source in the reference
    image) > 1.5. The selection criteria are from Sjoert van Velzen et al.
    (2018, arXiv:1809.02608), section 2.1.
    """
    alert_props = locus.get_properties()
    sgscore = alert_props['ztf_sgscore1']
    distpsnr = alert_props['ztf_distpsnr1']
    magpsf = alert_props['ztf_magpsf']
    magnr = alert_props['ztf_magnr']
    distnr = alert_props['ztf_distnr']

    if None in (distnr, distpsnr, sgscore, magpsf, magnr):
        return
    if distnr < 0.6 and distpsnr < 1. and sgscore < 0.3 and magpsf - magnr < 1.5:
        locus.send_to_stream("nuclear_transient")
              
            
Potential TDE v1

Checks for crossmatches against French and Zabludoff (2018) excluding variables and AGN

              
                def post_starburst_xmatch(ld):
    """
    Send alert to stream 'tde_candidate' if it matches French and Zabludoff (2018) catalog.
    - Gautham Narayan (github: gnarayan)
    20190911
    """
   
    # need to make sure this filter is run at stage 3 after we've associated solar system sources
    streams = ld.get_locus_streams()
    streams = set(streams)
    bad_streams = set(['high_amplitude_variable_stars','ztf_known_solar_system'])
    
    # if this locus is associated with a var star or a known solar system catalog
    # then even if it is associated with a potential post-starburst, we should ignore
    # TDE are long-lived enough that a single alert associated with a known solar system object
    # will take care of itself on the next observation
    # and if there is a high-amplitude variable at the same xmatch tolerance
    # then assume we're seeing something related to the common variable star acting up
    # rather than rare TDE behavior
    if not streams.isdisjoint(bad_streams):
        print('Locus associated with stream of var stars or solar system objects')
        return

    # if we get to here, we want to make sure the source is actually in the post-starburst catalog
    ld_cats = ld.get_astro_object_matches().keys()
    ld_cats = set(ld_cats)

    # check that this locus is associated with a post-starburst galaxy
    # and is NOT associated with ANY variable (not just high amp) or known QSO
    good_cats = set(['french_post_starburst_gals'])
    bad_cats  = set(['veron_agn_qso', 'asassn_variable_catalog'])

    if (ld_cats & good_cats) and not (ld_cats & bad_cats):
        # if yes, then flag this is a possible TDE candidate
        ld.send_to_stream('tde_candidate')
    else:
        print('Not in TDE cat, or associated with variable source')
        return
    

              
            
siena_mag_coord_cut2

Filters alerts by magnitude and coordinate for Siena College's .7m telescope (by Albany, NY). Updated for mag correction to use for variable stars

              
                def siena_mag_coord_cut2(ld):
    """
    Send alerts to stream 'Siena_mag_coord_cut2' if Alert is less than
    17 apparent magnitude in r, and if RA/Dec limits are met. 
    """
    mag_max = 17
    dec_min = 0
    dec_max = 90
    ra_min = 75
    ra_max = 180
    p = ld.get_properties()
    ra = p['ra']
    dec = p['dec']

    # Get arrays of corrected mag and sigmag, if possible.
    # This function only works on variable stars, otherwise it returns
    # ZTF's `magpsf` and `sigmagpsf` without modification.
    mag, sigmag, mjd, is_var_star, corrected = ld.get_varstar_corrected_mags()
    alert_mag = mag[-1]

    if alert_mag < mag_max \
        and dec_max > dec > dec_min \
        and ra_max > ra > ra_min:
        ld.send_to_stream("siena_mag_coord_cut2")
              
            
refitt_newsources_snrcut v2

New sources with signal-to-noise ratio greater than five appropriate for REFITT

              
                def refitt_newsources_snrcut(locus):
    # check if this source is nuclear, SSO, or known variable star
    streams = set(locus.get_locus_streams())
    bad_cats = {
        'nuclear_transient',
        'high_amplitude_variable_stars',
        #'ztf_sso_candidates',
    }
    if streams & bad_cats:
        return

    snr_threshold = {1: 5.0, 2: 5.0}
    p = locus.get_properties()
    alert_snr = 1.0 / p['ztf_sigmapsf']
    if p['ztf_fid'] in snr_threshold.keys() \
    and alert_snr > snr_threshold[p['ztf_fid']] \
    and p['ztf_distnr'] > 1.0 \
    and p['ztf_distpsnr1'] > 2.0:
        locus.send_to_stream('refitt_newsources_snrcut')

              
            
blue extragalactic transient filter v1.0

Searches for ZTF blue alerts of new sources, not associated with stars, and away from the Galactic plane.

              
                # Imports
import astropy
import numpy as np

ERROR_LOG_SLACK_CHANNEL = 'UMWERJYM8'


def blue_extragalactic_transient_filter_v1p0(locus_data):
    """
    A preliminary filter for blue extragalactic transients
    """
    # Alert is rejected if the absolute value of the galactic latitude (bii) is less than this value (degrees)
    gal_latitude_cut_degrees = 15.
    
    # Alerts are rejected if a previous alert for this object was found more than this number of days previous to the sampled alert.
    previous_alert_cut_days = 14.
    
    # Star criteria based on conversation with Adam Miller on October 2nd, 2019
    #     Must be > ztf_sgscore_cut and proximity <= ztf_distpsnr_cut
    ztf_sgscore_cut = 0.5 # > 0.5 likely star; = 0.5 unknown due to Pan-STARRS data quality issues
    ztf_distpsnr_cut = 1.0 # arcseconds from closest Pan-STARRS source. 
    
    # Screen out g-R comparisons when alerts are from more than this number of days apart
    delta_days_in_corresponding_filter = 4.
    
    # Color selection criteria for correlated alerts.
    #     Rejecting if color >= ztf_g_R_cut 
    ztf_g_R_cut = 0.
    
    
    
    print('`blue_extragalactic_transient_filter_v0p2` is running...')
    
    
    ##################################################
    #
    # Get a dict of all properties on the new alert.
    #
    
    p = locus_data.get_properties()

    #
    ##################################################
    
    
    ##################################################
    #
    # Is this source near the Galactic Plane?
    #
    
    p_coordinate_object = astropy.coordinates.SkyCoord(ra=p['ra'], dec=p['dec'], unit='deg', frame=astropy.coordinates.FK5, equinox='J2000.0')
    
    # Stop if this is a Galactic target
    if abs( p_coordinate_object.galactic.b.deg ) <= gal_latitude_cut_degrees:
        #print( 'ZTF alert within {0:.1f} degrees of the Galactic plane: {1:.1f} degrees'.format( gal_latitude_cut_degrees, p_coordinate_object.galactic.b.deg ) )
        return
    
    #
    ##################################################
    

    ##################################################
    #
    # Ignore if a ZTF alert has been made more than 14 days prior to this alert.
    #
    
    history_check_ld = locus_data.get_time_series()
    
    # There is no other alert related to this ld
    #     Cannot:
    #             Track rise / decay
    #             Create a color based classification
    
    if history_check_ld[0].size == 1:
        #print( 'This is the only ZTF alert for this locus.' )
        return
    
    if p['mjd'] - np.min( history_check_ld[1] ) > previous_alert_cut_days:
        #print( 'Previous ZTF alerts found more than {0:.1f} days prior to this alert: {1:.1f} days'.format( previous_alert_cut_days, p['mjd'] - np.min( history_check_ld[1] ) ) )
        return

    #
    ##################################################
    
    
    ##################################################
    #
    # Use Tachibana & Miller (2018) to filter out stars
    #
    
    if p['ztf_sgscore1'] > ztf_sgscore_cut and p['ztf_distpsnr1'] <= ztf_distpsnr_cut:
        #print('Satisfies star classification criteria from Tachibana & Miller (2018): ztf_sgscore1={0:.2f}, ztf_distpsnr1={1:.2f}'.format( p['ztf_sgscore1'], p['ztf_distpsnr1'] ))
        return
    
    #
    ##################################################
    
    
    ##################################################
    #
    # Are there ZTF alerts in the other filter?
    #
    
    
    # If g check for r
    if p['ztf_fid'] == 1:
        check_fid = 2
        #check_passband = 'r'

    
    # If r check for g
    else:
        check_fid = 1 
        #check_passband = 'g'
    
    
    #
    # Query associations in the complementary filter
    #     check_ld.size will be a multiple of 2 + the number of fields requested
    #     Indexing is required on this array since it does not have keys, unlike the locus_data.get_properties()
    #
    check_ld = locus_data.get_time_series('ra', 'dec', 'ztf_magpsf', 'ztf_sigmapsf', filters={'ztf_fid': check_fid})
    
    
    #
    # Ignore nan entries
    #
    check_ld_good_indices = np.arange( check_ld[4].size )[~np.isnan( check_ld[4].astype(np.float64) ) ]
    
    
    #
    # No corresponding detections in the alternative filter
    #
    if check_ld[0][ check_ld_good_indices ].size == 0:
        #print('No detections in the other filter ({0}).'.format(check_passband))
        return
    

    else:
        

        ##############################################
        #
        # Search for detections *before* p['mjd']
        #     Don't include after, or else filter will produce doubles of every trigger.
        #     Also filtering out nan entries while we're at it.
        check_before_indices = np.intersect1d( check_ld_good_indices, np.where(check_ld[1] < p['mjd'])[0] )
        
        
        #
        # No *previous detections* in this alternative filter.
        # This should be picked-up later when the latter triggers are iterated over
        #
        if check_before_indices.size == 0:
            #print('No previous detections in the other filter ({0} prior to {1:.1f} MJD). This should be reconsidered when the other filter is loaded.'.format(check_passband, p['mjd']))
            return

        
        #
        # Find index of temporally closest observation in the other filter (needs to be taken before this alert timestamp)
        #     This is needed for the following steps
        #
        temporally_closest_previous_alert_index = np.argmin( p['mjd'] - check_ld[1][ check_before_indices ] )
        
        
        #
        # Ensure that we are comparing [z]-[R] when taken with a relatively close temporal proximity
        #
        if p['mjd'] - check_ld[1][ temporally_closest_previous_alert_index ] > delta_days_in_corresponding_filter:
            #print('Closest z and R alerts were taken too far apart: {0:.1f} > {1:.1f}'.format( p['mjd'] - check_ld[1][ temporally_closest_previous_alert_index ], delta_days_in_corresponding_filter))
            return
        

        #
        # Confirmed detections in this alternative filter previous to this alert and taken within a resonable time period.
        #
        else:
            
            
            #print( 'Alerts {0} days apart.'.format( p['mjd'] - check_ld[1][ temporally_closest_previous_alert_index ] ) )
            
            
            ##########################################
            #
            # Look for cases where g and R were taken within 4 days of each other
            #
            # Create ZTF g-R:
            #
            #print( 'Check filter mag: {0:f}'.format( check_ld[4][ temporally_closest_previous_alert_index ] ) )
            if p['ztf_fid'] == 1:
                ztf_g_R = p['ztf_magpsf'] - check_ld[4][ temporally_closest_previous_alert_index ]
            else:
                ztf_g_R = check_ld[4][ temporally_closest_previous_alert_index ] - p['ztf_magpsf']
            
            #
            # Always keep track of error propogation
            #
            ztf_g_R_error = np.sqrt(p['ztf_sigmapsf'] ** 2 + check_ld[5][ temporally_closest_previous_alert_index ] ** 2)
            
            
            #
            # Put this in one place for consistency when using in multiple places
            #
            #color_output_string = '[g]-[r] = {0:.2f} +/- {1:.2f}'.format(ztf_g_R, ztf_g_R_error)
            
            
            #
            # This is a blue source
            #
            if ztf_g_R >= ztf_g_R_cut:
                #print("This is a 'red' source: {0}".format( color_output_string ))

                return
            
            #else:
                
                #print("This is a 'blue' source: {0}".format( color_output_string ))

            #
            ##########################################
    
        #
        ##############################################
    
    #
    ##################################################
    
    
    #print('Potential match {0}'.format(p['alert_id']))
    
    
    ##################################################
    #
    # Record useful information and send to the stream
    #
    
    
    #
    # Add Galactic coordinates
    #
    locus_data.set_property( 'lii', p_coordinate_object.galactic.l.deg )
    locus_data.set_property( 'bii', p_coordinate_object.galactic.b.deg )

    
    #
    # Add color properties
    #
    locus_data.set_property( 'g_minus_r',     ztf_g_R )
    locus_data.set_property( 'g_minus_r_err', ztf_g_R_error )
       
    #
    # Send to stream
    #
    locus_data.send_to_stream('blue_extragalactic_transient_filter')

    #
    ##################################################

              
            
wyrzykowski_bright_microlensing_v4

Bright microlensing candidates mag<17

              
                ERROR_LOG_SLACK_CHANNEL = 'UP414JK1D' #my slack id, instead of name

#wyrzykowski_bright_microlensing_v4

from scipy.stats import skew
import numpy as np
from scipy.optimize import leastsq


def ulens_fixedbl(t, t0, te, u0, I0):
    fs=1.
    tau=(t-t0)/te
    x=tau
    y=u0
    
    u=np.sqrt(x**2+y**2)
    ampl= (u**2 + 2)/(u*np.sqrt(u**2+4))
    F = ampl*fs + (1-fs)
    I = I0 - 2.5*np.log10(F)
    return I

def fit_ulensfixedbl(epoch, avmag, err):

    #catching short light curves:
    if (len(epoch)<10): 
        return [999,999,999,999], 1999999.

    t0=epoch[np.argmin(avmag)]
    te=50.
    u0=0.1
    I0=np.amax(avmag)
    x=epoch
    y=avmag
    
    ulensparam=[t0, te, u0, I0]
    fp = lambda v, x: ulens_fixedbl(x, v[0],v[1],v[2],v[3])
    e = lambda v, x, y, err: ((fp(v,x)-y)/err)
    v, success = leastsq(e, ulensparam, args=(x,y,err), maxfev=1000000)
    #no solution:
    if (success == 0) : return ulensparam, 999999.
    chi2 = sum(e(v,x,y,err)**2)
    chi2dof=sum(e(v,x,y,err)**2)/(len(x)-len(v))
    out = []
    for t in v:
        out.append(t)
    out.append(chi2)
    out.append(chi2dof)
    return out, chi2dof

def bright_microlensing(ld):
    skewThreshold = 0
    etaThreshold = 0.3
    chi2Threshold = 1000.
    u0Threshold = 0.3
    teThreshold = 1000.

    p = ld.get_properties()
    fid = p['ztf_fid']

    _, mjd = ld.get_time_series(filters={'ztf_fid': fid})
    if len(mjd) < 10:
        return

    mags, errs, times, is_var_star, corrected = ld.get_varstar_corrected_mags(fid=fid)
    if is_var_star and corrected:
        stream_name = 'bright_microlensing_variable_star'
    else:
        stream_name = 'bright_microlensing'

    #computing stats
    nrtrid=len(mjd)
    eta=1./(nrtrid-1.)/np.var(mags)*np.sum((mags[1:]-mags[:-1])*(mags[1:]-mags[:-1]))
    #skewness
    skewness=skew(mags)    
    rmax=np.amax(mags)

    if (rmax<17.0 and skewness<skewThreshold and eta<etaThreshold) :
        fitparams, chi2dof = fit_ulensfixedbl(times, mags, errs)
#        t0=fitparams[0]
        te=np.abs(fitparams[1])
        u0=np.abs(fitparams[2])
        if (np.abs(u0)<u0Threshold and chi2dof<chi2Threshold and te<teThreshold) : 
            ld.send_to_stream(stream_name)
          
              
            

Output

When an alert exits the pipeline it has been flagged with catalog matches, arbitrary new data properties generated by the filters, and stream associations. At this point we check alerts for association with user-submitted watched objects, and send Slack notifications accordingly.

Finally, we output the alert to Kafka streams if it was associated with a stream. Downstream systems and users connect to the streams in real-time using the ANTARES client library.