# -*- coding: utf-8 -*- from mrjob.job import MRJob class SalesRanker(MRJob) : def within_past_week(self, timestamp) : """Return True if timestamp is within past week, False otherwise.""" ... def mapper(self, _, line) : """Parse each log line, extract and transform relevant lines. Emit key value pairs of the form: (foo, p1) , 2 (bar, p1) , 2 (bar, p1) , 1 (foo, p2) , 3 (bar, p3) , 10 (foo, p4) , 1 """ timestamp, product_id, category, quantity = line.split('\t') if self.within_past_week(timestamp) : yield (category, product_id) , quantity def reducer(self, key, values) : """Sum values for each key. (foo, p1) , 2 (bar, p1) , 3 (foo, p2) , 3 (bar, p3) , 10 (foo, p4) , 1 """ yield key, sum(values) def mapper_sort(self, key, value) : """Construct key to ensure proper sorting. Transform key and value to the form: (foo, 2) , p1 (bar, 3) , p1 (foo, 3) , p2 (bar, 10) , p3 (foo, 1) , p4 The shuffle/sort step of MapReduce will then do a distributed sort on the keys, resulting in: (category1, 1) , product4 (category1, 2) , product1 (category1, 3) , product2 (category2, 3) , product1 (category2, 7) , product3 """ category, product_id = key quantity = value yield (category, quantity) , product_id def reducer_identity(self, key, value) : yield key, value def steps(self) : """Run the map and reduce steps.""" return [ self.mr(mapper=self.mapper, reducer=self.reducer) , self.mr(mapper=self.mapper_sort, reducer=self.reducer_identity) , ] if __name__ == '__main__': SalesRanker.run()