Sample pipe task to calculate metrics about collected events per broadcaster

"""
Sample pipe task to calculate metrics about collected events per broadcaster

Metrics included:
 - Event Types collected per day
"""
import os
import ujson
import operator
from datetime import datetime

from pipe_algorithms_lib.utils import log, get_influx_client
from ebupipe.decorators import with_spark_context, pipe_task


HDFS_MASTER = '<hdfs_url>'


@with_spark_context
def data(spark_context, event='*', dt=datetime.utcnow()):
    """return data path for given date event"""
    path = 'hdfs://{}/recsys/*/realtime/%s/*_%s_%s*.jsonl'.format(HDFS_MASTER)
    if dt == '*':
        return spark_context.textFile(path % ('*', event, '*'))

    return spark_context.textFile(path % (dt.year, event, dt.strftime('%Y-%m-%d')))


def map_filter_none(rdd, func):
    """map given rdd with given function and filters out None elements"""
    initial = rdd.count()
    res = rdd.map(func).filter(lambda it: it is not None)
    filtered = res.count()
    log("Items filtered by {}: {}".format(func.__name__, initial - filtered))
    return res


def event(it):
    """return event name"""
    event = it.get('data', {}).get('event', '') or it.get('event')
    return event


def is_play(it):
    return it['event'] in ('play', 'playing')


def parse_data(it):
    try:
        js = ujson.loads(it)
        res = js['data']
        res['codops'] = js['site_key']
        res['user_id'] = js.get('user_id') or js.get('cookie_id')
        res['event'] = event(js)
        return res
    except:
        pass

    return None


def prod_data(it):
    if it['codops'] in ('<prod_codops1>', '<codops2>', ):
        return it
    else:
        return None


def event_types(rdd):
    return rdd.map(lambda it: (it['codops'], it['event'])).countByValue()


def metrics(prod, dt):
    metrics = []

    etypes = event_types(prod)
    for key in etypes:
        point = {'tags': {'codops': key[0].split('0')[0], 'metric': key[1]}, 'fields': {'value': etypes[key]}, 
                 'measurement': 'event_types', 'time': dt.isoformat()}
        metrics.append(point)

    return metrics


@pipe_task
def task(dt):
    data_points = []
    rw = data(event='*', dt=dt)
    raw_cnt = rw.count()

    parsed = map_filter_none(rw, parse_data)
    parsed_cnt = parsed.count()

    data_points.append({'tags': {}, 'fields': {'parse_errors': raw_cnt - parsed_cnt},
                        'measurement': 'health_metrics', 'time': dt.isoformat()})

    prod = map_filter_none(parsed, prod_data)


    data_points.extend(metrics(prod, dt))

    if data:
        cli = get_influx_client()
        cli.write_points(data_points)

Example metrics output

rw = data(event='*').coalesce(1)
jsn = map_filter_none(map_filter_none(rw, parse_data), prod_data)
mtcs = metrics(jsn, datetime.utcnow())
for metric in mtcs:
    print('{} {} {}'.format(metric['tags']['codops'], metric['tags']['metric'], metric['fields']['value']))
Items filtered by parse_data: 0
Items filtered by prod_data: 35363
chrts playing 35700
debr media_consumption 1
sesr play 188170
chrts rec_audio_displayed 1809
chrts ended 6140
zzebu pageview 19
zzebu pageview 1
chrts paused 20245
chrts rec_video_displayed 2203
chrts pageview 50659
chrts rec_audio_loaded 12232
ptrtp pageview 19696
chrts rec_video_loaded 12253