1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| * HBaseToMR.java * * Created on: 2016年4月12日 下午7:53:47 * Author: Wayne 13186259527@163.com */ package HBase;
import java.io.IOException; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
* @author Administrator * */ public class HBaseToMR {
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String tmp = ""; * args: [0] [1] [2] [3]...[ 3+args[2]-1 ] * 表名 目的地址 N:(输入的列族:列的个数) 列族:列 * 例如: * N=4 那么列族:列就是args[3...3+4-1] * * 这个for循环就是把所有的列族:列都拼接成一个字符串。以\t分隔 */ for (int i = 3; i < 3+Integer.parseInt(args[2]); i++) { if(i == 3+Integer.parseInt(args[2])-1){ tmp = tmp + args[i]; }else{ tmp = tmp + args[i] + "\t"; } } conf.set("clms", tmp);
Job job = Job.getInstance(conf, HBaseToMR.class.getSimpleName()); job.setJarByClass(HBaseToMR.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(args[0], scan, MyMapper.class, Text.class, Text.class, job);
job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
private static class MyMapper extends TableMapper<Text, Text> { Text k2 = new Text(); Text v2 = new Text();
@Override protected void map( ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { k2.set(key.get()); String string = "";
* 版本三 * 输出该行的任意列族,任意列,值 */ String clms = context.getConfiguration().get("clms"); String[] split = clms.split("\t"); for (String cfAndQ : split) { String[] split2 = cfAndQ.split(":"); String Family = split2[0]; String Qualifier = split2[1]; Cell cell = value.getColumnLatestCell(Family.getBytes(), Qualifier.getBytes()); string = string + Family+":"+Qualifier + "\t" + new String(cell.getValue()) + "\t"; } * 版本二 * 输出该行的一个列族,一个列,一个值 */ String clms = context.getConfiguration().get("clms"); String[] split = clms.split(":"); String Family = split[0]; String Qualifier = split[1]; Cell cell = value.getColumnLatestCell(Family.getBytes(), Qualifier.getBytes()); string = Family + "\t" + Qualifier + "\t" + new String(cell.getValue());*/ * 版本一: * 输出该行的列族,列, 值 */ for (KeyValue keyValue : value.raw()) { string = string + new String(keyValue.getFamily()) + "\t" + new String(keyValue.getQualifier()) + "\t" + new String(keyValue.getValue()); }*/ v2.set(string); context.write(k2, v2); } } }
|