Sample pipe task to calculate metrics about collected events per broadcaster

Sample pipe task to calculate metrics about collected events per broadcaster

Metrics included:
 - Event Types collected per day
import os
import ujson
import operator
from datetime import datetime

from pipe_algorithms_lib.utils import log
from ebupipe.decorators import with_spark_context, pipe_task

HDFS_MASTER = '<hdfs_url>'

def data(spark_context, event='*', dt=datetime.utcnow()):
    """return data path for given date event"""
    path = 'hdfs://{}/recsys/*/realtime/%s/*_%s_%s*.jsonl'.format(HDFS_MASTER)
    if dt == '*':
        return spark_context.textFile(path % ('*', event, '*'))

    return spark_context.textFile(path % (dt.year, event, dt.strftime('%Y-%m-%d')))

def map_filter_none(rdd, func):
    """map given rdd with given function and filters out None elements"""
    initial = rdd.count()
    res = it: it is not None)
    filtered = res.count()
    log("Items filtered by {}: {}".format(func.__name__, initial - filtered))
    return res

def event(it):
    """return event name"""
    event = it.get('data', {}).get('event', '') or it.get('event')
    return event

def is_play(it):
    return it['event'] in ('play', 'playing')

def parse_data(it):
        js = ujson.loads(it)
        res = js['data']
        res['codops'] = js['site_key']
        res['user_id'] = js.get('user_id') or js.get('cookie_id')
        res['event'] = event(js)
        return res

    return None

def prod_data(it):
    if it['codops'] in ('<prod_codops1>', '<codops2>', ):
        return it
        return None

def event_types(rdd):
    return it: (it['codops'], it['event'])).countByValue()

def metrics(prod, dt):
    metrics = []

    etypes = event_types(prod)
    for key in etypes:
        point = {'tags': {'codops': key[0].split('0')[0], 'metric': key[1]}, 'fields': {'value': etypes[key]}, 
                 'measurement': 'event_types', 'time': dt.isoformat()}

    return metrics

def task(dt):
    data_points = []
        rw = data(event='*', dt=dt)
        raw_cnt = rw.count()

        parsed = map_filter_none(rw, parse_data)
        parsed_cnt = parsed.count()

        data_points.append({'tags': {}, 'fields': {'parse_errors': raw_cnt - parsed_cnt}, 
                            'measurement': 'health_metrics', 'time': dt.isoformat()})

        prod = map_filter_none(parsed, prod_data)        

        data_points.extend(metrics(prod, dt))

        if data:
            from influxdb import InfluxDBClient
            cli = InfluxDBClient(host='<influx_host>', database='<influx_database>')
    except Exception as ex:                                                                              
        if os.environ.get('SENTRY_DSN'):
            import raven
            sentry_client = raven.Client(os.environ['SENTRY_DSN'])
        log('Excepiton - %s' % ex)

Example metrics output

rw = data(event='*').coalesce(1)
jsn = map_filter_none(map_filter_none(rw, parse_data), prod_data)
mtcs = metrics(jsn, datetime.utcnow())
for metric in mtcs:
    print('{} {} {}'.format(metric['tags']['codops'], metric['tags']['metric'], metric['fields']['value']))
Items filtered by parse_data: 0
Items filtered by prod_data: 35363
chrts playing 35700
debr media_consumption 1
sesr play 188170
chrts rec_audio_displayed 1809
chrts ended 6140
zzebu pageview 19
zzebu pageview 1
chrts paused 20245
chrts rec_video_displayed 2203
chrts pageview 50659
chrts rec_audio_loaded 12232
ptrtp pageview 19696
chrts rec_video_loaded 12253