本文最后更新于:星期二, 八月 2日 2022, 9:32 晚上
给出一个Map和Reduce的具体实现,去除了所有分布式的特性【可能今后会添加】
MapReduce文章最后的附录A有一段C++代码,描述了Map和Reduce函数的编写和使用方法。但是仅靠看代码总是不能深入理解MapReduce的实现细节,阅读其他人的MapReduce学习曲线又太过陡峭。因此我决定自己实现Map函数和Reduce函数,并尽可能使用替代方法将那些没有给出的api实现。
促使我写这篇文章的另一个原因是,网络上的大部分实现Word Count的文章都是依赖于某某框架的,比如依赖Hadoop。但我觉得过早依赖于某个平台不利于深入理解背后的原理,因此我决定自己实现各种api,体会程序设计者可能遇到的问题。今后在学习分布式系统、分布式框架时,便能够对症下药、有的放矢。
不过还请各位原谅,这里的Map和Reduce只能运行在单机环境啦。
下面给出原文中的C++代码:
#include "mapreduce/mapreduce.h"
//用户map函数
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
//跳过前导空格
while ((i < n) && isspace(text[i]))
i++;
// 查找单词的结束位置
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
//用户的reduce函数
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
//迭代具有相同key的所有条目,并且累加它们的value
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
//提交这个输入key的综合
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 把输入文件列表存入"spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
//指定输出文件:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可选操作:在map任务中做部分累加工作,以便节省带宽
out->set_combiner_class("Adder");
// 调整参数: 使用2000台机器,每个任务100MB内存
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 运行
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成: 'result'结构包含计数,花费时间,和使用机器的信息
return 0;
}
抽象基类 Mapper、Reducer
首先我打算实现WordCounter
的父类Mapper
和Adder
的父类Reducer
。
class Mapper {
public:
virtual void Map(const MapInput& input) = 0;
};
class Reducer {
public:
virtual void Reduce(ReduceInput* input) = 0;
};
简单实现Map和Reduce接口,并把它们设置成纯虚方法。
WordCounter类
class WordCounter : public Mapper {
public:
void Map(const MapInput& input) override {
const string& text = input.value(); // 读取一行文本
const int n = text.size();
for (int i = 0; i < n; ) {
// 跳过行首空白
while ((i < n) && isspace(text[i])) i++;
// 确定单词的开头和结尾
int start = i;
while ((i < n) && !isspace(text[i])) i++;
if (start < i)
Emit(text.substr(start, i-start), "1");
}
}
};
这一部分相较于原文没什么变化。其主要作用在于分词,然后每个单词组建成一个键值对,以(word, 1)的结构发射出去。发射到哪里呢?我就偷懒直接持久化到本地的消息存储装置了。
static void Emit(const string& key, const string& value) {
mw.put(key, value);
}
mw 是 MiddleWare 的实例,是一个用于保存Emit输出的键值对的全局变量。后续会继续讲解。
用于保存 Mapper 得到的键值对的 MiddleWare 类
class MiddleWare {
private:
vector<pair<string, string>> kv_pairs;
static bool compare_pair(const pair<string, string>& lhs, const pair<string, string>& rhs) {
return lhs.first < rhs.first;
}
public:
MiddleWare() = default;
void put(const string& key, const string& value) {
kv_pairs.emplace_back(key, value);
}
vector<pair<string, string>> get() {
std::sort(kv_pairs.begin(), kv_pairs.end(), compare_pair); // 将其按照key相同的一组来排序
return kv_pairs;
}
};
MiddleWare mw; // 全局变量:消息队列
这是我为本地运行顺利而凭空构建出来的类,作用是储存(key, value)对。为了方便使用,内部有get和put方法。其中如果Reducer需要get数据了,那么首先会按照key对这些pair进行排序。这也是与MapReduce的流程相吻合的。
既然是排序,那么就要定义比较器 compare_pair。我的比较器直接使用string的比较,确保相同key值的pair在相邻位置。
Adder类
// 用户自定义 Reduce 函数
class Adder : public Reducer {
public:
void Reduce(ReduceInput* input) override {
// 迭代所有拥有相同key的键值对,把它们的values加起来
int64_t value = 0;
string currentKey = input->key();
while (!input->end() && currentKey == input->key()) { // 直到下一个键值对的key与当前键值对的key不同为止
value += std::stoi(input->value());
input->NextValue(); // 找到下一个拥有相同key的键值对
}
// Emit sum for input->key()
Emit(to_string(value));
}
};
与论文中的Adder有逻辑出入,主要变化在把同样key分成不同组的逻辑上,我直接保存了当前组的key。原来论文里是没有这种操作的。
Map 的输入 MapInput
观察一下Map的参数里有一个MapInput类型的对象,那么第二步就是新建一个MapInput类。要想跑通Map函数的代码,这个类必须实现value()方法。
猜测一下,MapInput是Map的输入,而MapReduce框架的输入输出都应该是键值对的形式。因此每个MapInput都应该包含一个key和一个value成员。
class MapInput {
private:
string map_value;
string map_key;
public:
explicit MapInput(string filename, string text) : map_key(std::move(filename)), map_value(std::move(text)) { }
[[nodiscard]] const string& value() const {
return map_value;
}
};
MapInput的构造函数接收两个参数,第一个参数是文本文件名,第二个参数是文件的内容。其实第一个参数在我们的程序中没啥作用,但是为了格式的统一,就写上吧。
explicit修饰构造函数,代表该类的对象禁止发生隐式类型转换,要想转换必须以明确的(explicit)方式进行显式类型转换。
冒号后面的初始化列表中,使用了move特性,避免了函数传参导致的变量复制。
[[nodiscard]] 含义是该函数的返回值必须被使用,不能丢弃。C++ 17版本新增了几个中括号标识的提示符,当代码不符合要求的时候,编译器也会真的警告。相当于把以前的注释加强了。除[[nodiscard]]之外,还有表示switch语句中不必加break的[[fallthrough]]、变量定义之后没有使用也没关系的标识符[[maybe_unused]]。
Reduce 的输入 ReduceInput
ReduceInput的设计就比较麻烦了。首先Reduce函数的输入是ReduceInput的指针,使用到的接口有done()/value()/NextValue()/key(),然后根据Reduce函数的使用方法,感觉ReduceInput像是一个迭代器。
class ReduceInput {
private:
vector<pair<string, string>> data;
int currentKey = 0;
public:
explicit ReduceInput(vector<pair<string, string>> _data) : data(std::move(_data)) { }
// bool done() {
// // 直到下一个键值对的key与当前键值对的key不同为止
// // 如果到了末尾,或者下一个key不一样,都是done
// if (currentKey == 0) return false;
// if (end() || data[currentKey].first != data[currentKey-1].first) return true;
// return false;
// }
const string& value() {
return data[currentKey].second;
}
const string& key() {
return data[currentKey].first;
}
void NextValue() {
currentKey++;
}
bool end() {
return currentKey >= data.size();
}
};
上面是我实现的ReduceInput,偷个懒把所有数据存放到ReduceInput中方便遍历,在真实场景的设计中不会像我这样的。
此外,Done函数的逻辑是有问题的。关键在于Reduce函数中的这句话:
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
表面上看起来是希望input作为一个迭代器,当它迭代到key与下一个key不同时,终止迭代(即done返回true表明迭代完成),然而下次迭代的开始还是从这个位置,其结果从程序逻辑上来讲,却又希望返回false。同一个位置,我们希望返回两个不同的值,这显然是说不通的。因此我在Reduce最终实现的主代码部分做了适当的逻辑修改。
输出结果的单参数Emit
为了输出方便,最终我定义了单参数的重载Emit,不保存Reducer的计算结果,直接输出:
static void Emit(const string& key) {
cout << "Sum of values:" << key << endl;
}
最后的main
int main(int argc, char* argv[]) {
ifstream in(R"(C:\Users\zyt\CLionProjects\leetcode_2021\lyrics.txt)");
string content((istreambuf_iterator<char>(in)), istreambuf_iterator<char>());
MapInput minput("lyrics.txt", content);
cout << "minput:\n" << minput.value() << endl;
WordCounter wc;
wc.Map(minput);
auto *rinput = new ReduceInput(mw.get());
while (!rinput->end()) {
cout << "Key: " << rinput->key() << "\t";
Adder adder; // 模拟很多 adder
adder.Reduce(rinput);
rinput->NextValue();
}
return 0;
}
ifstream读入本地文本文档,注意ifstream的参数得是绝对路径(相对路径不知道为什么读取不出来东西)。
然后WordCount把分词结果的键值对保存在全局变量mw中,使用mw构建ReduceInput,再把ReduceInput输入进Adder里面。
注意一个Reducer处理一个Group(我把key相同的一组键值对称之为Group),那么我就以While循环来代替啦。
这就是代码的所有内容了!
#include "stdafx.h"
using namespace std;
class MiddleWare {
private:
vector<pair<string, string>> kv_pairs;
static bool compare_pair(const pair<string, string>& lhs, const pair<string, string>& rhs) {
return lhs.first < rhs.first;
}
public:
MiddleWare() = default;
void put(const string& key, const string& value) {
kv_pairs.emplace_back(key, value);
}
vector<pair<string, string>> get() {
std::sort(kv_pairs.begin(), kv_pairs.end(), compare_pair); // 将其按照key相同的一组来排序
return kv_pairs;
}
};
MiddleWare mw; // 全局变量:消息队列
class MapInput {
private:
string map_value;
string map_key;
public:
explicit MapInput(string filename, string text) : map_key(std::move(filename)), map_value(std::move(text)) { }
[[nodiscard]] const string& value() const {
return map_value;
}
};
class ReduceInput {
private:
vector<pair<string, string>> data;
int currentKey = 0;
public:
explicit ReduceInput(vector<pair<string, string>> _data) : data(std::move(_data)) { }
// bool done() {
// // 直到下一个键值对的key与当前键值对的key不同为止
// // 如果到了末尾,或者下一个key不一样,都是done
// if (currentKey == 0) return false;
// if (end() || data[currentKey].first != data[currentKey-1].first) return true;
// return false;
// }
const string& value() {
return data[currentKey].second;
}
const string& key() {
return data[currentKey].first;
}
void NextValue() {
currentKey++;
}
bool end() {
return currentKey >= data.size();
}
};
static void Emit(const string& key, const string& value) {
mw.put(key, value);
}
static void Emit(const string& key) {
cout << "Sum of values:" << key << endl;
}
class Mapper {
public:
virtual void Map(const MapInput& input) = 0;
};
class Reducer {
public:
virtual void Reduce(ReduceInput* input) = 0;
};
class WordCounter : public Mapper {
public:
void Map(const MapInput& input) override {
const string& text = input.value(); // 读取一行文本
const int n = text.size();
for (int i = 0; i < n; ) {
// 跳过行首空白
while ((i < n) && isspace(text[i])) i++;
// 确定单词的开头和结尾
int start = i;
while ((i < n) && !isspace(text[i])) i++;
if (start < i)
Emit(text.substr(start, i-start), "1");
}
}
};
// 用户自定义 Reduce 函数
class Adder : public Reducer {
public:
void Reduce(ReduceInput* input) override {
// 迭代所有拥有相同key的键值对,把它们的values加起来
int64_t value = 0;
string currentKey = input->key();
while (!input->end() && currentKey == input->key()) { // 直到下一个键值对的key与当前键值对的key不同为止
value += std::stoi(input->value());
input->NextValue(); // 找到下一个拥有相同key的键值对
}
// Emit sum for input->key()
Emit(to_string(value));
}
};
int main(int argc, char* argv[]) {
ifstream in(R"(C:\Users\zyt\CLionProjects\leetcode_2021\lyrics.txt)");
string content((istreambuf_iterator<char>(in)), istreambuf_iterator<char>());
MapInput minput("lyrics.txt", content);
cout << "minput:\n" << minput.value() << endl;
WordCounter wc;
wc.Map(minput);
auto *rinput = new ReduceInput(mw.get());
while (!rinput->end()) {
cout << "Key: " << rinput->key() << "\t";
Adder adder; // 模拟很多 adder
adder.Reduce(rinput);
rinput->NextValue();
}
return 0;
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!