1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| * MRToHBase.java * * Created on: 2016年4月18日 下午5:00:19 * Author: Wayne 13186259527@163.com */ package HBase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
* @author Administrator * */ public class MRToHBase {
public static void main(String[] args) throws Exception { * 本次使用另一种创建configuration的方式, * 不同于HBaseToMR的直接创建HBaseConfiguration。 */ final Configuration configuration = new Configuration(); configuration.set("hbase.zookeeper.quorum", "192.168.255.128"); configuration.set(TableOutputFormat.OUTPUT_TABLE, args[0]); configuration.set("dfs.socket.timeout", "180000");
final Job job = Job.getInstance(configuration, MRToHBase.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(MRToHBase.class);
job.setMapperClass(BatchImportMapper.class); job.setReducerClass(BatchImportReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, args[1]);
job.waitForCompletion(true); }
static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text> { protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { context.write(key, value); }; }
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable> { * 数据一行中有列族:列这样的数据 * 所以要用CF数组,将其分开 */ String[] CF = {}; protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException { for (Text text : values) { final String[] splited = text.toString().split("\t"); for (int i = 1; i <= 3; i = i + 2) { final Put put = new Put(Bytes.toBytes(splited[0])); * 第一个元素的列族和列的下标为1和2 * 第二个元素的列族和列的下标为3和4 */ CF = splited[i].split(":"); put.add(Bytes.toBytes(CF[0]), Bytes.toBytes(CF[1]), Bytes.toBytes(splited[i + 1])); context.write(NullWritable.get(), put); } } }; }
}
|