Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from mrjob.job import MRJob
- from mrjob.step import MRStep
- from collections import Counter
- class MR_Ex3(MRJob):
- def mapper_get_words(self, _, line):
- # split line to get needed values
- (event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session) = line.split(',')
- month = event_time[5:7]
- # if brand is available and type is purchase yield
- if brand and event_type == "purchase":
- yield (user_id,brand), month
- def reducer_sum(self, key, values):
- # variables to save the sums for both months
- sum_nov = 0
- sum_oct = 0
- # create sums for both months
- for v in values:
- if v==10:
- sum_oct+=1
- elif v==11:
- sum_nov+=1
- # only if both sums are larger than 1 create output tupel
- if sum_oct > 0 and sum_nov > 0 :
- yield key[0],(key[1],(sum_oct+sum_nov))
- def mapper_noAction(self, key, value):
- yield key, value
- # this step may be done with counter but i did not figure out how
- def reducer_maxBrand(self, key, brandAndSum):
- # variables to save max value and brand
- maxSum = 0
- maxBrand = ""
- for v in brandAndSum:
- if v[1] > maxSum:
- maxBrand = v[0]
- maxSum = v[1]
- yield maxBrand,1
- # create sum and output
- def reducer_finalSum(self,brand,values):
- yield brand,sum(values)
- def steps(self):
- return [
- MRStep(mapper=self.mapper_get_words, reducer=self.reducer_sum),
- MRStep(mapper=self.mapper_noAction, reducer=self.reducer_maxBrand),
- MRStep(mapper=self.mapper_noAction, reducer=self.reducer_finalSum)
- ]
- if __name__ == '__main__':
- MR_Ex3.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement