system-design-primer/solutions/system_design/sales_rank/sales_rank_mapreduce.py

78 lines
1.9 KiB
Python

# -*- coding: utf-8 -*-
from mrjob.job import MRJob
class SalesRanker(MRJob) :
def within_past_week(self, timestamp) :
"""Return True if timestamp is within past week, False otherwise."""
...
def mapper(self, _, line) :
"""Parse each log line, extract and transform relevant lines.
Emit key value pairs of the form:
(foo, p1) , 2
(bar, p1) , 2
(bar, p1) , 1
(foo, p2) , 3
(bar, p3) , 10
(foo, p4) , 1
"""
timestamp, product_id, category, quantity = line.split('\t')
if self.within_past_week(timestamp) :
yield (category, product_id) , quantity
def reducer(self, key, values) :
"""Sum values for each key.
(foo, p1) , 2
(bar, p1) , 3
(foo, p2) , 3
(bar, p3) , 10
(foo, p4) , 1
"""
yield key, sum(values)
def mapper_sort(self, key, value) :
"""Construct key to ensure proper sorting.
Transform key and value to the form:
(foo, 2) , p1
(bar, 3) , p1
(foo, 3) , p2
(bar, 10) , p3
(foo, 1) , p4
The shuffle/sort step of MapReduce will then do a
distributed sort on the keys, resulting in:
(category1, 1) , product4
(category1, 2) , product1
(category1, 3) , product2
(category2, 3) , product1
(category2, 7) , product3
"""
category, product_id = key
quantity = value
yield (category, quantity) , product_id
def reducer_identity(self, key, value) :
yield key, value
def steps(self) :
"""Run the map and reduce steps."""
return [
self.mr(mapper=self.mapper,
reducer=self.reducer) ,
self.mr(mapper=self.mapper_sort,
reducer=self.reducer_identity) ,
]
if __name__ == '__main__':
SalesRanker.run()