Skip to content

Sample PEACH task to calculate metrics about collected events per broadcaster

import ujson
from datetime import datetime

from pipe_algorithms_lib.utils import log, get_influx_client, load_last_events
from pipe_algorithms_lib.connections import get_spark_context 


PROD_DATA = ('<prod_codops1>', '<codops2>', )


def map_filter_none(rdd, func):
    """map given rdd with given function and filters out None elements"""
    initial = rdd.count()
    res = rdd.map(func).filter(lambda it: it is not None)
    filtered = res.count()
    log("Items filtered by {}: {}".format(func.__name__, initial - filtered))
    return res


def event(it):
    """return event name"""
    event = it.get('data', {}).get('event', '') or it.get('event')
    return event


def is_play(it):
    return it['event'] in ('play', 'playing')


def parse_data(it):
    try:
        js = ujson.loads(it)
        res = js['data']
        res['codops'] = js['site_key']
        res['user_id'] = js.get('user_id') or js.get('cookie_id')
        res['event'] = event(js)
        return res
    except:
        pass


def prod_data(it):
    if it['codops'] in PROD_DATA:
        return it


def event_types(rdd):
    return rdd.map(lambda it: (it['codops'], it['event'])).countByValue()


def metrics(prod):
    dt = datetime.utcnow()
    metrics = []

    etypes = event_types(prod)
    for key in etypes:
        point = {'tags': {'codops': key[0].split('0')[0], 'metric': key[1]}, 'fields': {'value': etypes[key]}, 
                 'measurement': 'event_types', 'time': dt.isoformat()}
        metrics.append(point)

    return metrics


def task(codops='*', debug=False):
    data_points = []

    sc = get_spark_context()
    rw = load_last_events(sc, '*', codops, parser=None, strict_today=False)

    raw_cnt = rw.count()

    parsed = map_filter_none(rw, parse_data)
    parsed_cnt = parsed.count()

    data_points.append({'tags': {}, 'fields': {'parse_errors': raw_cnt - parsed_cnt},
                        'measurement': 'health_metrics', 'time': dt.isoformat()})

    prod = map_filter_none(parsed, prod_data)

    data_points.extend(metrics(prod))

    if debug:
        return data_points

    if data_points:
        cli = get_influx_client()
        cli.write_points(data_points)

Example metrics output

metrics = task(debug=True)
for metric in metrics[1:]:
    print(f"[{metric['tags']['codops']}] {metric['tags']['metric']} {metric['fields']['value']}")

``` Items filtered by parse_data: 0 Items filtered by prod_data: 35363 [debr] pause 765 [debr] ended 84 [debr] pageView 216 [debr] play 661 [debr] recommendationLoad 319 [debr] recommendationHit 2