使用MapReduce向HBase导入数据

本文总阅读量

一、需要的工具类

1、TableReducer

作用:Reducer类继承该类,实现对HBase表的操作

二、需要导入的数据


三、实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/*
* MRToHBase.java
*
* Created on: 2016年4月18日 下午5:00:19
* Author: Wayne 13186259527@163.com
*/

package HBase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

/**
* @author Administrator
*
*/

public class MRToHBase {

public static void main(String[] args) throws Exception {
/**
* 本次使用另一种创建configuration的方式,
* 不同于HBaseToMR的直接创建HBaseConfiguration。
*/

final Configuration configuration = new Configuration();
// 设置zookeeper
configuration.set("hbase.zookeeper.quorum", "192.168.255.128");
// 设置hbase表名称,从参数中读取
configuration.set(TableOutputFormat.OUTPUT_TABLE, args[0]);
// 将该值改大,防止hbase超时退出
configuration.set("dfs.socket.timeout", "180000");

final Job job = Job.getInstance(configuration,
MRToHBase.class.getSimpleName());
TableMapReduceUtil.addDependencyJars(job);
job.setJarByClass(MRToHBase.class);

job.setMapperClass(BatchImportMapper.class);
job.setReducerClass(BatchImportReducer.class);
// 设置map的输出,不设置reduce的输出类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

// 不再设置输出路径,而是设置输出格式类型
job.setOutputFormatClass(TableOutputFormat.class);
//设置输入的路径
FileInputFormat.setInputPaths(job, args[1]);

job.waitForCompletion(true);
}

static class BatchImportMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {

protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {

//Mapper类中,什么也不做,直接将数据写出
context.write(key, value);
};
}

static class BatchImportReducer extends
TableReducer<LongWritable, Text, NullWritable> {

/**
* 数据一行中有列族:列这样的数据
* 所以要用CF数组,将其分开
*/

String[] CF = {};
protected void reduce(LongWritable key,
java.lang.Iterable<Text> values, Context context)

throws java.io.IOException, InterruptedException {

for (Text text : values) {
final String[] splited = text.toString().split("\t");
//因为一行有两个数据所以要遍历
for (int i = 1; i <= 3; i = i + 2) {
//设置rowkey
final Put put = new Put(Bytes.toBytes(splited[0]));
//用“:”分隔开列族和列
/**
* 第一个元素的列族和列的下标为1和2
* 第二个元素的列族和列的下标为3和4
*/

CF = splited[i].split(":");
//动态的设置列族、列和值
put.add(Bytes.toBytes(CF[0]), Bytes.toBytes(CF[1]),
Bytes.toBytes(splited[i + 1]));
//将数据写出
context.write(NullWritable.get(), put);
}
}
};
}

}

四、提交运行

1、开启HDFS、YARN、HBase。

2、运行下面命令:

1
hadoop jar MRToHBase.jar 表名 输入数据地址(HDFS目录)

eg:

1
hadoop jar MRToHBase.jar MRIN /user/root/HBaseToMR34
  • 注意点1:运行命令会启动MR程序,结束时会报一个错误,但是没关系。只要出现了successfully的字眼,就成功了。
  • 注意点2:如果HBase中指定的表没有导入的数据的列族的话,也会报错。所以要保证,被导入数据的HBase表要有对应的列族。

五、结果