system-design-primer/solutions/system_design/sales_rank/sales_rank_mapreduce.py

78 lines
1.9 KiB
Python
Raw Normal View History

2017-03-05 08:06:17 +03:00
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
2021-03-26 19:50:38 +03:00
class SalesRanker(MRJob):
2017-03-05 08:06:17 +03:00
2021-03-26 19:50:38 +03:00
def within_past_week(self, timestamp):
2017-03-05 08:06:17 +03:00
"""Return True if timestamp is within past week, False otherwise."""
...
2021-03-26 19:50:38 +03:00
def mapper(self, _, line):
2017-03-05 08:06:17 +03:00
"""Parse each log line, extract and transform relevant lines.
Emit key value pairs of the form:
2021-03-26 19:50:38 +03:00
(foo, p1), 2
(bar, p1), 2
(bar, p1), 1
(foo, p2), 3
(bar, p3), 10
(foo, p4), 1
2017-03-05 08:06:17 +03:00
"""
2021-03-26 19:50:38 +03:00
timestamp, product_id, category, quantity = line.split('\t')
if self.within_past_week(timestamp):
yield (category, product_id), quantity
2017-03-05 08:06:17 +03:00
2021-03-26 19:50:38 +03:00
def reducer(self, key, values):
2017-03-05 08:06:17 +03:00
"""Sum values for each key.
2021-03-26 19:50:38 +03:00
(foo, p1), 2
(bar, p1), 3
(foo, p2), 3
(bar, p3), 10
(foo, p4), 1
2017-03-05 08:06:17 +03:00
"""
2021-03-26 19:50:38 +03:00
yield key, sum(values)
2017-03-05 08:06:17 +03:00
2021-03-26 19:50:38 +03:00
def mapper_sort(self, key, value):
2017-03-05 08:06:17 +03:00
"""Construct key to ensure proper sorting.
Transform key and value to the form:
2021-03-26 19:50:38 +03:00
(foo, 2), p1
(bar, 3), p1
(foo, 3), p2
(bar, 10), p3
(foo, 1), p4
2017-03-05 08:06:17 +03:00
The shuffle/sort step of MapReduce will then do a
distributed sort on the keys, resulting in:
2021-03-26 19:50:38 +03:00
(category1, 1), product4
(category1, 2), product1
(category1, 3), product2
(category2, 3), product1
(category2, 7), product3
2017-03-05 08:06:17 +03:00
"""
category, product_id = key
quantity = value
2021-03-26 19:50:38 +03:00
yield (category, quantity), product_id
2017-03-05 08:06:17 +03:00
2021-03-26 19:50:38 +03:00
def reducer_identity(self, key, value):
2017-03-05 08:06:17 +03:00
yield key, value
2021-03-26 19:50:38 +03:00
def steps(self):
2017-03-05 08:06:17 +03:00
"""Run the map and reduce steps."""
return [
self.mr(mapper=self.mapper,
2021-03-26 19:50:38 +03:00
reducer=self.reducer),
2017-03-05 08:06:17 +03:00
self.mr(mapper=self.mapper_sort,
2021-03-26 19:50:38 +03:00
reducer=self.reducer_identity),
2017-03-05 08:06:17 +03:00
]
if __name__ == '__main__':
2021-03-26 19:50:38 +03:00
SalesRanker.run()