1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
   |   * MRToHBase.java  *  *  Created on: 2016年4月18日 下午5:00:19  *      Author: Wayne 13186259527@163.com  */ package HBase;
  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
   * @author Administrator  *  */ public class MRToHBase {
  	public static void main(String[] args) throws Exception { 		 		 * 本次使用另一种创建configuration的方式, 		 * 不同于HBaseToMR的直接创建HBaseConfiguration。 		 */ 		final Configuration configuration = new Configuration(); 		 		configuration.set("hbase.zookeeper.quorum", "192.168.255.128"); 		 		configuration.set(TableOutputFormat.OUTPUT_TABLE, args[0]); 		 		configuration.set("dfs.socket.timeout", "180000");
  		final Job job = Job.getInstance(configuration, 				MRToHBase.class.getSimpleName()); 		TableMapReduceUtil.addDependencyJars(job); 		job.setJarByClass(MRToHBase.class);
  		job.setMapperClass(BatchImportMapper.class); 		job.setReducerClass(BatchImportReducer.class); 		 		job.setMapOutputKeyClass(LongWritable.class); 		job.setMapOutputValueClass(Text.class);
  		 		job.setOutputFormatClass(TableOutputFormat.class); 		 		FileInputFormat.setInputPaths(job, args[1]);
  		job.waitForCompletion(true); 	}
  	static class BatchImportMapper extends 			Mapper<LongWritable, Text, LongWritable, Text> { 		protected void map(LongWritable key, Text value, Context context) 				throws java.io.IOException, InterruptedException { 			 			context.write(key, value); 		}; 	}
  	static class BatchImportReducer extends 			TableReducer<LongWritable, Text, NullWritable> { 		 		 * 数据一行中有列族:列这样的数据 		 * 所以要用CF数组,将其分开 		 */ 		String[] CF = {}; 		protected void reduce(LongWritable key, 				java.lang.Iterable<Text> values, Context context) 				throws java.io.IOException, InterruptedException { 			for (Text text : values) { 				final String[] splited = text.toString().split("\t"); 				 				for (int i = 1; i <= 3; i = i + 2) { 					 					final Put put = new Put(Bytes.toBytes(splited[0])); 					 					 					 * 第一个元素的列族和列的下标为1和2 					 * 第二个元素的列族和列的下标为3和4 					 */ 					CF = splited[i].split(":"); 					 					put.add(Bytes.toBytes(CF[0]), Bytes.toBytes(CF[1]), 							Bytes.toBytes(splited[i + 1])); 					 					context.write(NullWritable.get(), put); 				} 			} 		}; 	}
  }
 
  |