初探MapReduce

什么是MapReduce

对于一般的数据统计,简单到诸如数组排序,列求和等等,只需要将数据流指向内存,便可通过一系列JDK api操作,对数据进行汇总计算即可。可现实之中往往场景不止于此。先提出两个问题:

  1. 当一个日志文件有4GB大小,让你去对error级别日志进行汇总分析,你会放到内存中去做吗?
  2. 如果各个日志文件分布在不同的服务器,又该怎么办呢?把它们全部拷贝到一台主机再进行统一计算,然后重新分发给每台机器吗?

诸如这样的问题还有很多,这些都是单点计算无法解决的问题,或者说,在目前大数据的时代而言必然会成为一种瓶颈,因为海量数据是要存储在集群服务器上,这时,便需要一个分布式计算平台,将这些任务以一定的规律分散开,各自以统一的规则执行任务,最终再将结果汇总起来。

Hadoop生态体系下,广泛使用HDFS作为分布式数据存储,将数据切块,存放在多台hdfs集群设备中,提高了数据存储的扩展性,并实现冗余备份,解决了大文件存储的问题。再这基础上,我们需要对其中文件进行分析计算,MapReduce便为此诞生。

[这段摘自百度百科,大概了解一下就行]

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

MapReduce执行流程

作为初探,过于深入研究执行流程似乎有种路都走不好就想跑的感觉。。。所以,这里只是非常简单的了解了以下它的执行过程,大部分内容也是摘自各家的博客,其中每步的含义暂不做深究。

从它的名字来说,简单粗暴的两步:Map,以及Reduce。对于一般开发人员来说,这是最需要关心的两步。

Map的作用是读取原始数据并进行解析,形成键值对。

Reduce则对这些键值对进行汇总计算,输出结果。

img

看似简单,其实二者中间Hadoop框架为我们封装了很多操作。可以看看这张图:

img

这里先不做深入学习,咱先做到会用,然后再在每个实例中去好好理解,或许是为一种高效的方法。

简单的MapReduce使用实例

就用网上教程中使用最多的“字数统计”来初次认识一下这个分布式计算框架吧。

需求:有一个文本文件,我们需要统计这个文件中各个单词的出现次数。

1
2
3
Welcome to Hadoop Class
Hadoop is good
Hadoop is bad

思路:在Map中,逐行读取文本,对单词进行分割,每个单词计1次。在Reduce中,对相同的key进行汇总,求和,输出。

img

实施:

WCMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.skywater.study.hadoop.mr.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* Mapping步骤实现
* create by skywater at 2019/7/12 16:39
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {


/**
* 每读一行会调用一次该方法
* @param key 每行数据的起始偏移量
* @param value 这行文本内容
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String row = value.toString();
String[] words = row.split("\\s+");
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}


其中数据类型要注意,不要用jdk自带的数据类型,而是用hadoop封装的数据类型。

  • BooleanWritable:标准布尔型数值
  • ByteWritable:单字节数值
  • DoubleWritable:双字节数值
  • FloatWritable:浮点数
  • IntWritable:整型数
  • LongWritable:长整型数
  • Text:使用UTF8格式存储的文本
  • NullWritable:当<key, value>中的key或value为空时使用

WCReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.skywater.study.hadoop.mr.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.math.BigDecimal;

/**
* create by skywater at 2019/7/12 16:39
*/
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

/**
* 等待hadoop框架对map处理完成,将所有KV缓存起来,进行分组,然后传递一个key,values,调用一次reduce方法
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Long count = 0L;
for (LongWritable value : values) {
Long currentValue = value.get();
count = BigDecimal.valueOf(count).add(BigDecimal.valueOf(currentValue)).longValue();
}
context.write(key, new LongWritable(count));
}
}

最后需要一个Runner去把Mapper和Reducer组装起来,并定义输入输出路径等。

WCRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.skywater.study.hadoop.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 用来描述一个特定的Job
* 指定改作业用的哪个map/reduce
* 指定作业所需要的数据路径
* 指定结果输出路径
* ...
* create by skywater at 2019/7/15 11:55
*/
public class WCRunner {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 设置整个job所用的类在那里
job.setJarByClass(WCRunner.class);

job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
// 如果map和reduce输出类型一直,可同时设置输出key类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

// 设置map相关输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

// 指定原始数据存放位置
FileInputFormat.setInputPaths(job,new Path("/wc/src"));
// 指定结果数据输出位置
FileOutputFormat.setOutputPath(job,new Path("/wc/output"));

job.waitForCompletion(true);
}
}

这样就完成了一个MR的逻辑编写工作。

准备数据

首先使用jps命令确定ResourceManager是否启动,如未启动需检查yarn是否正常运行。

这里用的文件数据并非上面例子中的数据,数据本身可自行调整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@master ~]# jps
17761 Jps
7287 NameNode
7641 ResourceManager
7483 SecondaryNameNode
[root@master ~]# cd /root
[root@master ~]# ll
total 9708
-rw-------. 1 root root 1420 Dec 25 2018 anaconda-ks.cfg
drwxr-xr-x. 2 root root 42 Jul 15 01:47 data
-rw-r--r--. 1 root root 1952 Apr 30 13:39 erlang-solutions-1.0-1.noarch.rpm
-rw-r--r--. 1 root root 9929748 Apr 30 13:30 rabbitmq-server-3.7.14-1.el7.noarch.rpm
[root@master ~]# cd data
[root@master data]# ls
ok.sql word-count.jar
[root@master data]# hadoop fs -mkdir /wc
^[[A[root@master data]# hadoop fs -mkdir /wc/src
[root@master data]# hadoop fs -put
ok.sql word-count.jar
[root@master data]# hadoop fs -put ok.sql /wc/src

执行Jar包

此时需要将刚刚写好的逻辑打成Jar包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
[root@master data]# hadoop jar word-count.jar com.skywater.study.hadoop.mr.wordcount.WCRunner
19/07/15 01:51:10 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.192.142:8032
19/07/15 01:51:11 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/07/15 01:51:11 INFO input.FileInputFormat: Total input paths to process : 1
19/07/15 01:51:11 INFO mapreduce.JobSubmitter: number of splits:1
19/07/15 01:51:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1562908973274_0001
19/07/15 01:51:12 INFO impl.YarnClientImpl: Submitted application application_1562908973274_0001
19/07/15 01:51:12 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1562908973274_0001/
19/07/15 01:51:12 INFO mapreduce.Job: Running job: job_1562908973274_0001
19/07/15 01:51:20 INFO mapreduce.Job: Job job_1562908973274_0001 running in uber mode : false
19/07/15 01:51:20 INFO mapreduce.Job: map 0% reduce 0%
19/07/15 01:51:27 INFO mapreduce.Job: map 100% reduce 0%
19/07/15 01:51:32 INFO mapreduce.Job: map 100% reduce 100%
19/07/15 01:51:32 INFO mapreduce.Job: Job job_1562908973274_0001 completed successfully
19/07/15 01:51:32 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=8434
FILE: Number of bytes written=262349
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=4087
HDFS: Number of bytes written=1221
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3085
Total time spent by all reduces in occupied slots (ms)=2336
Total time spent by all map tasks (ms)=3085
Total time spent by all reduce tasks (ms)=2336
Total vcore-milliseconds taken by all map tasks=3085
Total vcore-milliseconds taken by all reduce tasks=2336
Total megabyte-milliseconds taken by all map tasks=3159040
Total megabyte-milliseconds taken by all reduce tasks=2392064
Map-Reduce Framework
Map input records=125
Map output records=502
Map output bytes=7424
Map output materialized bytes=8434
Input split bytes=97
Combine input records=0
Combine output records=0
Reduce input groups=98
Reduce shuffle bytes=8434
Reduce input records=502
Reduce output records=98
Spilled Records=1004
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=109
CPU time spent (ms)=1890
Physical memory (bytes) snapshot=446320640
Virtual memory (bytes) snapshot=4262146048
Total committed heap usage (bytes)=278396928
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=3990
File Output Format Counters
Bytes Written=1221

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
[root@master data]# hadoop fs -ls /wc/output
Found 2 items
-rw-r--r-- 2 root supergroup 0 2019-07-15 01:51 /wc/output/_SUCCESS
-rw-r--r-- 2 root supergroup 1221 2019-07-15 01:51 /wc/output/part-r-00000
[root@master data]# hadoop fs -cat /wc/output/part-r-00000
107
' 38
'%ENTID%') 1
'%ENT_ID%' 2
'''b47b7578-f4d6-4092-a829-6d4a75161adc'')'; 3
'''b47b7578-f4d6-4092-a829-6d4a75161adc'';'); 1
'FROM 1
'LZETS_CHECK'; 4
'SELECT 4
'b47b7578-f4d6-4092-a829-6d4a75161adc'); 2
(COLUMN_NAME 1
(SELECT 3
(execute 1
* 1
-- 4
---------------------------------------------------------------------------------------- 4
0 3
:= 3
= 5
> 3
AND 2
BEGIN 4
CLOSE 3
COLNAME 11
COLNAME; 6
COLUMN_NAME 5
COUNT(*) 4
CURSOR 3
DBA_TAB_COLUMNS 4
DBMS_OUTPUT.PUT_LINE('select 1
DBMS_OUTPUT.PUT_LINE('tableName: 2
DBMS_OUTPUT.PUT_LINE('总数:' 2
DECLARE 4
END 6
END; 3
EXECUTE 3
FETCH 6
FOR 1
FROM 10
IF 3
IF; 3
IMMEDIATE 4
IN 1
INTO 9
IS 3
LIKE 3
LOOP 3
LOOP; 3
NUMBER; 3
OPEN 4
OR 1
OWNER 3
SELECT 4
STR 9
TABLENAME 8
TABLENAME, 6
TABLE_NAME, 3
TEST_CURSOR 9
TEST_CURSOR%FOUND 3
TEST_CURSOR; 6
THEN 3
Table_Name,column_name 1
VARCHAR2(200); 3
VARCHAR2(30); 8
V_COUNT 6
V_COUNT); 2
V_COUNT; 3
WHERE 3
WHILE 3
close 1
colName 3
columnName: 2
cursor 1
dbms_output.put_line(obj); 1
dual; 4
end 2
end; 1
ent_id: 2
fetch 2
from 6
into 2
is 1
like 3
loop 2
loop; 2
obj 1
owner 1
s_sql 4
select 5
tableName 2
tableName) 1
tableName,colName; 2
test_cursor 3
test_cursor%found 1
test_cursor; 2
where 5
while 1
|| 45

总结

到这步也算是完成了初探,这几天会集中精力研究这块,为接下来的BI相关工作做准备