Sample PEACH task to calculate metrics about collected events per broadcaster
import ujson
from datetime import datetime
from pipe_algorithms_lib.utils import log, get_influx_client, load_last_events
from pipe_algorithms_lib.connections import get_spark_context
PROD_DATA = ('<prod_codops1>', '<codops2>', )
def map_filter_none(rdd, func):
"""map given rdd with given function and filters out None elements"""
initial = rdd.count()
res = rdd.map(func).filter(lambda it: it is not None)
filtered = res.count()
log("Items filtered by {}: {}".format(func.__name__, initial - filtered))
return res
def event(it):
"""return event name"""
event = it.get('data', {}).get('event', '') or it.get('event')
return event
def is_play(it):
return it['event'] in ('play', 'playing')
def parse_data(it):
try:
js = ujson.loads(it)
res = js['data']
res['codops'] = js['site_key']
res['user_id'] = js.get('user_id') or js.get('cookie_id')
res['event'] = event(js)
return res
except:
pass
def prod_data(it):
if it['codops'] in PROD_DATA:
return it
def event_types(rdd):
return rdd.map(lambda it: (it['codops'], it['event'])).countByValue()
def metrics(prod):
dt = datetime.utcnow()
metrics = []
etypes = event_types(prod)
for key in etypes:
point = {'tags': {'codops': key[0].split('0')[0], 'metric': key[1]}, 'fields': {'value': etypes[key]},
'measurement': 'event_types', 'time': dt.isoformat()}
metrics.append(point)
return metrics
def task(codops='*', debug=False):
data_points = []
sc = get_spark_context()
rw = load_last_events(sc, '*', codops, parser=None, strict_today=False)
raw_cnt = rw.count()
parsed = map_filter_none(rw, parse_data)
parsed_cnt = parsed.count()
data_points.append({'tags': {}, 'fields': {'parse_errors': raw_cnt - parsed_cnt},
'measurement': 'health_metrics', 'time': dt.isoformat()})
prod = map_filter_none(parsed, prod_data)
data_points.extend(metrics(prod))
if debug:
return data_points
if data_points:
cli = get_influx_client()
cli.write_points(data_points)
Example metrics output
metrics = task(debug=True)
for metric in metrics[1:]:
print(f"[{metric['tags']['codops']}] {metric['tags']['metric']} {metric['fields']['value']}")
``` Items filtered by parse_data: 0 Items filtered by prod_data: 35363 [debr] pause 765 [debr] ended 84 [debr] pageView 216 [debr] play 661 [debr] recommendationLoad 319 [debr] recommendationHit 2