Map Reduce Advanced - Count number of friends

Sort by

recency

|

21 Discussions

|

  • + 0 comments

    MapReduce Advanced allows efficient counting of the number of friends by parallelizing tasks across multiple nodes, where the Map phase assigns key-value pairs of users and friends, and the Reduce phase aggregates the total count for each user. This approach enables handling large-scale datasets by distributing computation, optimizing speed and scalability. Lion567

  • + 0 comments

    I think the mapper function extracts friend pairs from the input record and emits each friend pair with count 1. It also logs the emitted values to the error stream for better understanding. The main driver code reads input data, blue light filter, applies the mapper, and then sorts and groups the emitted values. The reducer function is called for each group and counts the number of occurrences for each friend. It emits the result in the required format. Note: This code assumes that the input is read from stdin and the output is printed to stdout. Also, it uses sys.stderr to log intermediate steps to the error stream. Make sure to adjust the input/output handling based on the actual requirements of your MapReduce framework.

  • + 0 comments

    !/usr/bin/env python2.7

    from collections import OrderedDict import sys

    class MapReduce: def init(self): self.intermediate = OrderedDict() self.result = []

    def emitIntermediate(self, key, value):
        self.intermediate.setdefault(key, [])       
        self.intermediate[key].append(value)
    
    def emit(self, value):
        self.result.append(value) 
    
    def execute(self, data, mapper, reducer):
        for record in data:
            mapper(record)
    
        for key in self.intermediate:
            reducer(key, self.intermediate[key])
    
        self.result.sort()
        for item in self.result:
            print item
    

    mapReducer = MapReduce()

    def mapper(record): #Start writing the Map code here fields = record.replace('\n', '').split(',')

    if fields[0] == 'Employee':
        person_name = fields[1]
        ssn = fields[2]
        mapReducer.emitIntermediate(ssn, (fields[0], person_name))
    else:
        ssn = fields[1]
        department_name = fields[2]
        mapReducer.emitIntermediate(ssn, (fields[0], department_name))
    

    def reducer(key, list_of_values): #Start writing the Reduce code here person_names = [] department_names = [] for label, value in list_of_values: if label == 'Employee': person_names.append(value) else: department_names.append(value)

    for person_name in person_names:
        for department_name in department_names:
            mapReducer.emit((key, person_name, department_name))
    

    if name == 'main': inputData = [] for line in sys.stdin: inputData.append(line) mapReducer.execute(inputData, mapper, reducer)

  • + 0 comments
    import sys
    from collections import OrderedDict
    class MapReduce:
        def __init__(self):
            self.intermediate = OrderedDict()
            self.result = []
       
    
        def emitIntermediate(self, key, value):
       	self.intermediate.setdefault(key, [])       
            self.intermediate[key].append(value)
    
        def emit(self, value):
            self.result.append(value) 
    
        def execute(self, data, mapper, reducer):
            for record in data:
                mapper(record)
    
            for key in self.intermediate:
                reducer(key, self.intermediate[key])
    
            self.result.sort()
            for item in self.result:
                print "{\"key\":\""+item[0]+"\",\"value\":\"" + str(item[1]) + "\"}"
    
    mapReducer = MapReduce()
    
    def mapper(record):
        people = record.rstrip().split(" ")
        mapReducer.emitIntermediate(people[0], 1)
        mapReducer.emitIntermediate(people[1], 1)
    
    def reducer(key, list_of_values):
        total = sum(list_of_values)
        mapReducer.emit((key, total))
    
    if __name__ == '__main__':
        inputData = []
        for line in sys.stdin:
            inputData.append(line)
        mapReducer.execute(inputData, mapper, reducer)
    
  • + 0 comments

    Simple way:

    def mapper(record):
        f1, f2 = record.replace('\n', '').split(' ')
        mapReducer.emitIntermediate(f1, f2)
        mapReducer.emitIntermediate(f2, f1)
    
    def reducer(key, list_of_values):
        mapReducer.emit([key, len(list_of_values)])