mirror of
				https://github.com/donnemartin/system-design-primer.git
				synced 2025-11-04 10:12:32 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			78 lines
		
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			78 lines
		
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
from mrjob.job import MRJob
 | 
						|
 | 
						|
 | 
						|
class SalesRanker(MRJob):
 | 
						|
 | 
						|
    def within_past_week(self, timestamp):
 | 
						|
        """Return True if timestamp is within past week, False otherwise."""
 | 
						|
        ...
 | 
						|
 | 
						|
    def mapper(self, _, line):
 | 
						|
        """Parse each log line, extract and transform relevant lines.
 | 
						|
 | 
						|
        Emit key value pairs of the form:
 | 
						|
 | 
						|
        (foo, p1), 2
 | 
						|
        (bar, p1), 2
 | 
						|
        (bar, p1), 1
 | 
						|
        (foo, p2), 3
 | 
						|
        (bar, p3), 10
 | 
						|
        (foo, p4), 1
 | 
						|
        """
 | 
						|
        timestamp, product_id, category, quantity = line.split('\t')
 | 
						|
        if self.within_past_week(timestamp):
 | 
						|
            yield (category, product_id), quantity
 | 
						|
 | 
						|
    def reducer(self, key, values):
 | 
						|
        """Sum values for each key.
 | 
						|
 | 
						|
        (foo, p1), 2
 | 
						|
        (bar, p1), 3
 | 
						|
        (foo, p2), 3
 | 
						|
        (bar, p3), 10
 | 
						|
        (foo, p4), 1
 | 
						|
        """
 | 
						|
        yield key, sum(values)
 | 
						|
 | 
						|
    def mapper_sort(self, key, value):
 | 
						|
        """Construct key to ensure proper sorting.
 | 
						|
 | 
						|
        Transform key and value to the form:
 | 
						|
 | 
						|
        (foo, 2), p1
 | 
						|
        (bar, 3), p1
 | 
						|
        (foo, 3), p2
 | 
						|
        (bar, 10), p3
 | 
						|
        (foo, 1), p4
 | 
						|
 | 
						|
        The shuffle/sort step of MapReduce will then do a
 | 
						|
        distributed sort on the keys, resulting in:
 | 
						|
 | 
						|
        (category1, 1), product4
 | 
						|
        (category1, 2), product1
 | 
						|
        (category1, 3), product2
 | 
						|
        (category2, 3), product1
 | 
						|
        (category2, 7), product3
 | 
						|
        """
 | 
						|
        category, product_id = key
 | 
						|
        quantity = value
 | 
						|
        yield (category, quantity), product_id
 | 
						|
 | 
						|
    def reducer_identity(self, key, value):
 | 
						|
        yield key, value
 | 
						|
 | 
						|
    def steps(self):
 | 
						|
        """Run the map and reduce steps."""
 | 
						|
        return [
 | 
						|
            self.mr(mapper=self.mapper,
 | 
						|
                    reducer=self.reducer),
 | 
						|
            self.mr(mapper=self.mapper_sort,
 | 
						|
                    reducer=self.reducer_identity),
 | 
						|
        ]
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    SalesRanker.run()
 |