Advertisement
kieni17

Untitled

Apr 18th, 2020
420
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.95 KB | None | 0 0
  1. from mrjob.job import MRJob
  2. from mrjob.step import MRStep
  3. from collections import Counter
  4.  
  5. class MR_Ex3(MRJob):
  6.  
  7.  
  8.     def mapper_get_words(self, _, line):
  9.         # split line to get needed values
  10.         (event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session) = line.split(',')
  11.        
  12.         month = event_time[5:7]
  13.        
  14.         # if brand is available and type is purchase yield
  15.         if brand and event_type == "purchase":
  16.             yield (user_id,brand), month
  17.                
  18.     def reducer_sum(self, key, values):
  19.         # variables to save the sums for both months
  20.         sum_nov = 0
  21.         sum_oct = 0
  22.        
  23.         # create sums for both months
  24.         for v in values:
  25.             if v=="10":
  26.                 sum_oct+=1
  27.             elif v=="11":
  28.                 sum_nov+=1
  29.                
  30.         # only if both sums are larger than 1 create output tupel
  31.         if sum_oct > 0 and sum_nov > 0 :
  32.             yield key[0],(key[1],(sum_oct+sum_nov))
  33.    
  34.  
  35.     def mapper_noAction(self, key, value):
  36.         yield key, value
  37.    
  38.     # this step may be done with counter but i did not figure out how
  39.     def reducer_maxBrand(self, key, brandAndSum):
  40.         # variables to save max value and brand
  41.         maxSum = 0
  42.         maxBrand = ""
  43.         for v in brandAndSum:
  44.             if v[1] > maxSum:
  45.                 maxBrand = v[0]
  46.                 maxSum = v[1]
  47.                
  48.         yield maxBrand,1
  49.    
  50.    
  51.     # create sum and output
  52.     def reducer_finalSum(self,brand,values):
  53.         yield brand,sum(values)
  54.  
  55.            
  56.     def steps(self):
  57.         return [
  58.             MRStep(mapper=self.mapper_get_words, reducer=self.reducer_sum),
  59.             MRStep(mapper=self.mapper_noAction, reducer=self.reducer_maxBrand),
  60.             MRStep(mapper=self.mapper_noAction, reducer=self.reducer_finalSum)
  61.         ]
  62.  
  63.  
  64. if __name__ == '__main__':
  65.     MR_Ex3.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement