Sample pipe task to calculate metrics about collected events per broadcaster
"""
Sample pipe task to calculate metrics about collected events per broadcaster
Metrics included:
- Event Types collected per day
"""
import os
import ujson
import operator
from datetime import datetime
from pipe_algorithms_lib.utils import log, get_influx_client
from ebupipe.decorators import with_spark_context, pipe_task
HDFS_MASTER = '<hdfs_url>'
@with_spark_context
def data(spark_context, event='*', dt=datetime.utcnow()):
"""return data path for given date event"""
path = 'hdfs://{}/recsys/*/realtime/%s/*_%s_%s*.jsonl'.format(HDFS_MASTER)
if dt == '*':
return spark_context.textFile(path % ('*', event, '*'))
return spark_context.textFile(path % (dt.year, event, dt.strftime('%Y-%m-%d')))
def map_filter_none(rdd, func):
"""map given rdd with given function and filters out None elements"""
initial = rdd.count()
res = rdd.map(func).filter(lambda it: it is not None)
filtered = res.count()
log("Items filtered by {}: {}".format(func.__name__, initial - filtered))
return res
def event(it):
"""return event name"""
event = it.get('data', {}).get('event', '') or it.get('event')
return event
def is_play(it):
return it['event'] in ('play', 'playing')
def parse_data(it):
try:
js = ujson.loads(it)
res = js['data']
res['codops'] = js['site_key']
res['user_id'] = js.get('user_id') or js.get('cookie_id')
res['event'] = event(js)
return res
except:
pass
return None
def prod_data(it):
if it['codops'] in ('<prod_codops1>', '<codops2>', ):
return it
else:
return None
def event_types(rdd):
return rdd.map(lambda it: (it['codops'], it['event'])).countByValue()
def metrics(prod, dt):
metrics = []
etypes = event_types(prod)
for key in etypes:
point = {'tags': {'codops': key[0].split('0')[0], 'metric': key[1]}, 'fields': {'value': etypes[key]},
'measurement': 'event_types', 'time': dt.isoformat()}
metrics.append(point)
return metrics
@pipe_task
def task(dt):
data_points = []
rw = data(event='*', dt=dt)
raw_cnt = rw.count()
parsed = map_filter_none(rw, parse_data)
parsed_cnt = parsed.count()
data_points.append({'tags': {}, 'fields': {'parse_errors': raw_cnt - parsed_cnt},
'measurement': 'health_metrics', 'time': dt.isoformat()})
prod = map_filter_none(parsed, prod_data)
data_points.extend(metrics(prod, dt))
if data:
cli = get_influx_client()
cli.write_points(data_points)
Example metrics output
rw = data(event='*').coalesce(1)
jsn = map_filter_none(map_filter_none(rw, parse_data), prod_data)
mtcs = metrics(jsn, datetime.utcnow())
for metric in mtcs:
print('{} {} {}'.format(metric['tags']['codops'], metric['tags']['metric'], metric['fields']['value']))
Items filtered by parse_data: 0
Items filtered by prod_data: 35363
chrts playing 35700
debr media_consumption 1
sesr play 188170
chrts rec_audio_displayed 1809
chrts ended 6140
zzebu pageview 19
zzebu pageview 1
chrts paused 20245
chrts rec_video_displayed 2203
chrts pageview 50659
chrts rec_audio_loaded 12232
ptrtp pageview 19696
chrts rec_video_loaded 12253