一种简单优雅的Map-Reduce手工实现方式(Python)

一种简单优雅的Map-Reduce手工实现方式(Python)

参考链接: https://blog.csdn.net/weixin_45203607/article/details/127369408 Python-多进程(进程,锁,通讯,进程池) https://blog.csdn.net/HaidongDU/article/details/112795797 Python 多线程中的join用法

Map-Reudce 原理

Map-Reduce 是谷歌提出的,用于处理大数据文本输入进行统计的计算框架。其总体思路简单巧妙,所有的输入文本统一通过mapreduce两段计算过程便可以得到自己想要的统计结果(如词频统计,排序分类等等)。map的计算过程是将输入的流式数据(如文本流,访问流等等)通过某种规则映射到key-value键值对的形式。例如

词频分析:Map计算过程

输入:“It was the best of times, it was the worst of times. ”

输出: <it, 2>, <was, 2>, <the, 2>, <of, 2>, <times, 2>

这个过程可以在成千上百个分布式节点上部署,每个节点分别处理海量的文件,然后得到如上键值对形式。经过中间层和网络通信将各个节点的键值对数据shuffle在一起(比如最简单的保存在本地临时存储文件,然后传输这谢谢文件)。

reduce的过程便是将这些中间的键值对结果合并、分类,得到最终的统计结果。这个过程也可以在多个计算节点完成,例如有26个节点负责处理reduce过程,每个节点代表a~z的一个首字母。key所代表的单词首字母传输到符合的reduce节点里,然后将对应的value(例如这里是词频)合并。当然reduce节点的划分要考虑因素比较多,比如考虑负载均衡的问题。因为有些节点可能分到的任务比其他的节点多,(假如b开头的单词会比u开头的单词多,那么上述的划分方式就会导致负载不均衡。

Map节点1输出: <it, 2>, <was, 2>, <the, 2>, <of, 2>, <times, 2>

Map节点2输出:<he, 1>, <was, 1>, <taller,1>, <than, 1>, <Mike, 1>

Reduce节点1结果(负责a~n):<he, 1>, <it, 2>, <Mike, 1>

Reduce 节点2结果(负责o~z):<of, 2>, <taller, 1>, <than, 1>, <the, 2>, <times, 2>, <was, 3>

本demo的实现原理

本demo实现一个成绩平均分计算功能。输入文件格式如下:

File1.txt:

小红 98.3

小明 85.7

...

小倩 86.4

有多组如上图格式的输入文件代表每个学生在不同科目中取得的成绩。要求输出每个学生的平均分(\(\frac{\text{总成绩}}{\text{参加的科目数量}}\))。

实现结构式multi-mapper one reducer的结构,并利用多进程编程的方式模拟多个mapper处理节点。整个程序由Mapper类,Reducer类和Manager三部分构成。Manager脚本负责实例化多个Mapper和一个Reducer,并开启多个进程,每个进程负责一个计算节点。为方便演示,本demo中只设置了一个Reducer。若设置多个Reducer,需要额外实现对key值的划分功能。此外,中间网络传输部分采用通过字典格式保存到本地路径,再将路径的列表由Mapper -> Manager -> Reducer传递模拟网络传递过程。

Mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import os
import random
import pickle


class Mapper():
"""Map-Reduce Mapper

Args:
filepath (str): the path to load single text file for mapper
mid_save_path (str): the path to save intermediate result(don't include filename)
worker_id (optional, int): id
"""
def __init__(self, filepath:str, mid_save_path:str, worker_id:int=-1) -> None:
self.file_path = filepath
self.mid_save_path = mid_save_path
self.score_dict = {} # {"Lucy": {"total_score": 180, "count": 2}}
if worker_id > 0:
self.worker_id = worker_id
else:
self.worker_id = random.randint(100000,10000000) + random.randint(12,100)

def _add_item(self, name:str, score:int):
if name not in self.score_dict.keys():
entry = {"total": score, "count": 1}
else:
entry = self.score_dict[name]
entry["total"] += score
entry["count"] += 1
self.score_dict[name] = entry

def run(self):
"""Main Function of Mapper.
Load text from source files, split to name and score

Returns:
save_path(str):return path where the key-value result is saved
"""
with open(self.file_path) as f:
line = f.readline()
while line != "":
line = line.split()
name, score = line[0], line[1]
self._add_item(name, score)
line = f.readline()

filename = str(self.worker_id) + "_intermediate_results.pkl"
save_path = os.path.join(self.mid_save_path, filename)
with open(save_path, 'wb') as f:
pickle.dump(self.score_dict, f, pickle.HIGHEST_PROTOCOL)
return save_path


if __name__ == "__main__":
mapper = Mapper(filepath='file{}.txt'.format(1), mid_save_path="./")
print(f"Mapper {mapper.worker_id} started...")
filename=mapper.run()

Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import os
import random
import pickle

class Reducer():
"""Map-Reduce Reducer

Args:
filepath_lists (List[str]): the path lists to load key-value data
save_path (str): the path to save final resuls(don't include filename)
worker_id (optional, int): id
"""

def __init__(self, filepath_lists, save_path:str, worker_id:int=-1) -> None:
self.filepath_lists = filepath_lists
self.save_path = save_path
self.result_dict = {}
if worker_id > 0:
self.worker_id = worker_id
else:
self.worker_id = random.randint(100000,10000000) + random.randint(12,100)


def _merge(self, kv):
for k,v in kv.items():
if k not in self.result_dict.keys():
new_entry = {}
new_entry["total"] = float(v["total"])
new_entry["count"] = int(v["count"])
self.result_dict[k] = new_entry
else:
entry1 = self.result_dict[k]
entry1["total"] += float(v["total"])
entry1["count"] += int(v["count"])


def _average(self):
dd = {}
for k,v in self.result_dict.items():
avg = v["total"] / v["count"]
dd[k] = avg

self.result_dict = dd

def run(self):
"""Main Function of Reducer
Reduce key-value results to final averge score for each student
"""
kv_lists = []
for each in self.filepath_lists:
with open(each, 'rb') as f:
kv = pickle.load(f)
kv_lists.append(kv)

for kv in kv_lists:
self._merge(kv)

self._average()

save_path = os.path.join(self.save_path, "results.txt")
f = open(save_path, "w")
for k,v in self.result_dict.items():
print(f"{k}: {v}分", file=f)


if __name__ == "__main__":
reducer = Reducer(filepath_lists=['2173983_intermediate_results.pkl'], save_path="./")
print(f"Reducer {reducer.worker_id} started...")
reducer.run()

Manager 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import multiprocessing as mp
import time

from multiprocessing import Manager
from MapClass import Mapper
from ReduceClass import Reducer

num_mapper = 3
num_reducer = 1


def mapper_task(filenum:int, d:list, barrier):
"""Mapper Starter FUnction
Args:
filenum (int): e.g. 1 indicates "file1.txt"
d (list): parameters for saving intermediate file paths.
"""
mapper = Mapper(filepath='file{}.txt'.format(filenum), mid_save_path="./")
# mapper = Mapper(filepath='file{}.txt'.format(filenum), mid_save_path="./", worker_id=filenum)
print(f"Mapper {mapper.worker_id} started...")
filename=mapper.run()
d.append(filename)
barrier.wait()
print(f"Worker {mapper.worker_id} job completed!")


def reducer_task(d:list):
reducer = Reducer(filepath_lists=d, save_path="./")
print(f"Reducer {reducer.worker_id} started...")
reducer.run()

if __name__ == "__main__":
# use Manager context to create shared memory
with Manager() as manager:
intermediate_l = manager.list()
process_list = []

barrier = mp.Barrier(parties=num_mapper, open)
for i in range(num_mapper):
process_list.append(mp.Process(target=mapper_task, args=(i+1, intermediate_l, barrier)))
process_list[i].start()
process_list[i].join()

# 结束同步屏障
# barrier.wait()
barrier.abort()

process_list = []
for i in range(num_reducer):
process_list.append(mp.Process(target=reducer_task, args=(intermediate_l,)))
process_list[i].start()
process_list[i].join()

To-do

  • barrier同步屏障这一块设置有bug,会进入死锁。暂时未解决该问题。

一种简单优雅的Map-Reduce手工实现方式(Python)
https://oier99.cn/posts/77084934/
作者
Oier99
发布于
2022年12月28日
许可协议