defrun(self): """Main Function of Mapper. Load text from source files, split to name and score
Returns: save_path(str):return path where the key-value result is saved """ withopen(self.file_path) as f: line = f.readline() while line != "": line = line.split() name, score = line[0], line[1] self._add_item(name, score) line = f.readline()
classReducer(): """Map-Reduce Reducer Args: filepath_lists (List[str]): the path lists to load key-value data save_path (str): the path to save final resuls(don't include filename) worker_id (optional, int): id """
def_merge(self, kv): for k,v in kv.items(): if k notin self.result_dict.keys(): new_entry = {} new_entry["total"] = float(v["total"]) new_entry["count"] = int(v["count"]) self.result_dict[k] = new_entry else: entry1 = self.result_dict[k] entry1["total"] += float(v["total"]) entry1["count"] += int(v["count"])
def_average(self): dd = {} for k,v in self.result_dict.items(): avg = v["total"] / v["count"] dd[k] = avg
self.result_dict = dd
defrun(self): """Main Function of Reducer Reduce key-value results to final averge score for each student """ kv_lists = [] for each in self.filepath_lists: withopen(each, 'rb') as f: kv = pickle.load(f) kv_lists.append(kv)
for kv in kv_lists: self._merge(kv)
self._average()
save_path = os.path.join(self.save_path, "results.txt") f = open(save_path, "w") for k,v in self.result_dict.items(): print(f"{k}: {v}分", file=f)
if __name__ == "__main__": # use Manager context to create shared memory with Manager() as manager: intermediate_l = manager.list() process_list = []
barrier = mp.Barrier(parties=num_mapper, open) for i inrange(num_mapper): process_list.append(mp.Process(target=mapper_task, args=(i+1, intermediate_l, barrier))) process_list[i].start() process_list[i].join()
# 结束同步屏障 # barrier.wait() barrier.abort()
process_list = [] for i inrange(num_reducer): process_list.append(mp.Process(target=reducer_task, args=(intermediate_l,))) process_list[i].start() process_list[i].join()