使用MapReduce导出HBase数据

本文总阅读量

一、需要的工具类

1、TableMapReduceUtil

作用:设置传入MR的HBase的表名等信息

2、TableMapper

作用:mapper类继承该类,实现对HBase表的操作

二、实现的版本

版本一

  • 输入:表名,输出路径
  • 输出:该表所有行的列族,列, 值到指定目录

版本二

  • 输入:表名,输出路径,一个列族:一个列
  • 输出:该表所有行的该列族,该列, 值到指定目录

版本三

  • 输入:表名,输出路径,N,列族:列 。。。
  • 输出:该表所有行的N个(列族,该列, 值)到指定目录

三、实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* HBaseToMR.java
*
* Created on: 2016年4月12日 下午7:53:47
* Author: Wayne 13186259527@163.com
*/

package HBase;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
* @author Administrator
*
*/

public class HBaseToMR {

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String tmp = "";
/**
* args: [0] [1] [2] [3]...[ 3+args[2]-1 ]
* 表名 目的地址 N:(输入的列族:列的个数) 列族:列
* 例如:
* N=4 那么列族:列就是args[3...3+4-1]
*
* 这个for循环就是把所有的列族:列都拼接成一个字符串。以\t分隔
*/

for (int i = 3; i < 3+Integer.parseInt(args[2]); i++) {
if(i == 3+Integer.parseInt(args[2])-1){
//最后一次拼接,后面不需要\t分隔。否则后面解析时会出现空字符
tmp = tmp + args[i];
}else{
tmp = tmp + args[i] + "\t";
}
}
conf.set("clms", tmp);

Job job = Job.getInstance(conf, HBaseToMR.class.getSimpleName());
job.setJarByClass(HBaseToMR.class);

Scan scan = new Scan();

TableMapReduceUtil.initTableMapperJob(args[0], scan, MyMapper.class,
Text.class, Text.class, job);

job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}

private static class MyMapper extends TableMapper<Text, Text> {
Text k2 = new Text();
Text v2 = new Text();

@Override
protected void map(
ImmutableBytesWritable key,
Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)

throws IOException, InterruptedException {

k2.set(key.get());
String string = "";

/**
* 版本三
* 输出该行的任意列族,任意列,值
*/

String clms = context.getConfiguration().get("clms");
String[] split = clms.split("\t");
for (String cfAndQ : split) {

String[] split2 = cfAndQ.split(":");
String Family = split2[0];
String Qualifier = split2[1];

Cell cell = value.getColumnLatestCell(Family.getBytes(), Qualifier.getBytes());

string = string + Family+":"+Qualifier + "\t"
+ new String(cell.getValue()) + "\t";

}


/**
* 版本二
* 输出该行的一个列族,一个列,一个值
*//*
String clms = context.getConfiguration().get("clms");
String[] split = clms.split(":");
String Family = split[0];
String Qualifier = split[1];

Cell cell = value.getColumnLatestCell(Family.getBytes(), Qualifier.getBytes());

string = Family + "\t" + Qualifier + "\t"
+ new String(cell.getValue());*/


/**
* 版本一:
* 输出该行的列族,列, 值
*//*
for (KeyValue keyValue : value.raw()) {
string = string + new String(keyValue.getFamily()) + "\t"
+ new String(keyValue.getQualifier()) + "\t"
+ new String(keyValue.getValue());
}*/

v2.set(string);
context.write(k2, v2);
}
}
}

四、提交运行

1、开启HDFS、YARN、HBase。

2、运行下面命令:

1
hadoop jar HBaseToMR.jar 表名 目的地址(HDFS目录) N 列族:列 。。。

eg:

1
hadoop jar HBaseToMR.jar test1 /user/root/HBaseToMR 2 address:home address:school