system-design-primer/solutions/system_design/mint/mint_mapreduce.py

58 lines
1.5 KiB
Python
Raw Normal View History

2017-03-05 08:05:31 +03:00
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
2021-03-14 13:08:05 +03:00
class SpendingByCategory(MRJob) :
2017-03-05 08:05:31 +03:00
2021-03-14 13:08:05 +03:00
def __init__(self, categorizer) :
2017-03-05 08:05:31 +03:00
self.categorizer = categorizer
...
2021-03-14 13:08:05 +03:00
def current_year_month(self) :
2017-03-05 08:05:31 +03:00
"""Return the current year and month."""
...
2021-03-14 13:08:05 +03:00
def extract_year_month(self, timestamp) :
2017-03-05 08:05:31 +03:00
"""Return the year and month portions of the timestamp."""
...
2021-03-14 13:08:05 +03:00
def handle_budget_notifications(self, key, total) :
2017-03-05 08:05:31 +03:00
"""Call notification API if nearing or exceeded budget."""
...
2021-03-14 13:08:05 +03:00
def mapper(self, _, line) :
2017-03-05 08:05:31 +03:00
"""Parse each log line, extract and transform relevant lines.
Emit key value pairs of the form:
2021-03-14 13:08:05 +03:00
(2016-01, shopping) , 25
(2016-01, shopping) , 100
(2016-01, gas) , 50
2017-03-05 08:05:31 +03:00
"""
2021-03-14 13:08:05 +03:00
timestamp, category, amount = line.split('\t')
period = self. extract_year_month(timestamp)
if period == self.current_year_month() :
yield (period, category) , amount
2017-03-05 08:05:31 +03:00
2021-03-14 13:08:05 +03:00
def reducer(self, key, values) :
2017-03-05 08:05:31 +03:00
"""Sum values for each key.
2021-03-14 13:08:05 +03:00
(2016-01, shopping) , 125
(2016-01, gas) , 50
2017-03-05 08:05:31 +03:00
"""
2021-03-14 13:08:05 +03:00
total = sum(values)
self.handle_budget_notifications(key, total)
yield key, sum(values)
2017-03-05 08:05:31 +03:00
2021-03-14 13:08:05 +03:00
def steps(self) :
2017-03-05 08:05:31 +03:00
"""Run the map and reduce steps."""
return [
self.mr(mapper=self.mapper,
2021-03-14 13:08:05 +03:00
reducer=self.reducer)
2017-03-05 08:05:31 +03:00
]
if __name__ == '__main__':
2021-03-14 13:08:05 +03:00
SpendingByCategory.run()