• + 0 comments

    import json import sys from collections import OrderedDict class MapReduce: def init(self): self.intermediate = OrderedDict() self.result = [] self.r=0 def emitIntermediate(self, key, value): self.intermediate.setdefault(key, []) self.intermediate[key].append(value)

    def emit(self, value):
        self.result.append(value) 
    
    def execute(self, data, mapper, reducer):
        for line in data:
            record = json.loads(line)
            mapper(record)
    
        for key in self.intermediate:
            reducer(key, self.intermediate[key])
    
        for item in self.result:
            print item,
    

    mapReducer = MapReduce()

    def mapper(record): # print record key = record["key"] value = record["value"] # ADD THE REQUIRED LOGIC BELOW # You may need to add some lines for the mapper logic to work # At the end, you need to complete the emit intermediate step if key!=1: if key<=mapReducer.r: mapReducer.emitIntermediate(value,'r' ) else: mapReducer.emitIntermediate(value,'q' ) else: mapReducer.r=int(value.split()[0])

    def reducer(key, list_of_values): # ADD THE REQUIRED LOGIC BELOW # You may need to add some lines for the reducer logic to work # At the end, you need to complete the emit step if 'r' in list_of_values and 'q' in list_of_values: mapReducer.emit(key)

    if name == 'main': inputData = [] counter = 0 for line in sys.stdin: counter += 1 inputData.append(json.dumps({"key":counter,"value":line})) mapReducer.execute(inputData, mapper, reducer) mapReducer.execute(inputData, mapper, reducer)