# -*- coding: utf-8 -*- from mrjob.job import MRJob class HitCounts(MRJob) : def extract_url(self, line) : """Extract the generated url from the log line.""" pass def extract_year_month(self, line) : """Return the year and month portions of the timestamp.""" pass def mapper(self, _, line) : """Parse each log line, extract and transform relevant lines. Emit key value pairs of the form: (2016-01, url0) , 1 (2016-01, url0) , 1 (2016-01, url1) , 1 """ url = self.extract_url(line) period = self.extract_year_month(line) yield (period, url) , 1 def reducer(self, key, values) : """Sum values for each key. (2016-01, url0) , 2 (2016-01, url1) , 1 """ yield key, sum(values) def steps(self) : """Run the map and reduce steps.""" return [ self.mr(mapper=self.mapper, reducer=self.reducer) ] if __name__ == '__main__': HitCounts.run()