pom内容:
org.apache.hbase hbase-server 0.98.6-cdh5.2.0 javax.servlet-api javax.servlet javax.servlet org.eclipse.jetty.orbit servlet-api-2.5 org.mortbay.jetty servlet-api javax.servlet org.apache.spark spark-core_2.10 1.5.2-hdh3.1.0 hadoop-client org.apache.hadoop org.apache.spark spark-sql_2.10 1.5.2-hdh3.1.0 javax.servlet-api javax.servlet
一、Hbase API获取hbase表数据
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.hbase.Cell; 3 import org.apache.hadoop.hbase.HBaseConfiguration; 4 import org.apache.hadoop.hbase.HColumnDescriptor; 5 import org.apache.hadoop.hbase.client.HTable; 6 import org.apache.hadoop.hbase.client.Result; 7 import org.apache.hadoop.hbase.client.ResultScanner; 8 import org.apache.hadoop.hbase.client.Scan; 9 import org.apache.hadoop.hbase.util.Bytes;10 11 import java.io.IOException;12 import java.util.List;13 14 /**15 * 通过HbaseApi获取数据16 */17 public class DataAchieveFromHbaseApi {18 public static void main(String[] args) throws IOException {19 //Hbase配置20 Configuration conf=HBaseConfiguration.create();21 conf.set("hbase.zookeeper.property.clientPort", "2181");//端口22 conf.set("hbase.zookeeper.quorum","hdh1,hdh2,hdh3");//hbase zookeeper地址23 //扫描配置24 Scan scan=new Scan();25 scan.addFamily(Bytes.toBytes("cf"));//列族,可添加多个26 //hbase表27 HTable hTable=new HTable(conf, Bytes.toBytes("test"));//表明28 //获取扫描数据29 ResultScanner rs= hTable.getScanner(scan);30 //hbase表的列族信息31 HColumnDescriptor[] hColDes=hTable.getTableDescriptor().getColumnFamilies();32 for (HColumnDescriptor hColDe : hColDes) {33 System.out.println(Bytes.toString(hColDe.getName()));34 }35 //展示每一行的每一列(这个只有一列)信息36 for (Result r : rs) {37 byte [] bytes= r.getValue(Bytes.toBytes("cf"),Bytes.toBytes("SSID"));//列族和列名38 String str=new String(bytes,"UTF-8");39 if(null!=str&&str.trim().length()>0) {40 System.out.println(str.trim());41 }42 }43 System.out.println("end<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");44 }45 }
二、Spark提供接口获取Hbase表数据:
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.hbase.HBaseConfiguration; 3 import org.apache.hadoop.hbase.client.Result; 4 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 5 import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 6 import org.apache.hadoop.hbase.util.Bytes; 7 import org.apache.spark.SparkConf; 8 import org.apache.spark.api.java.JavaPairRDD; 9 import org.apache.spark.api.java.JavaSparkContext;10 import org.apache.spark.api.java.function.VoidFunction;11 import scala.Tuple2;12 13 import java.io.IOException;14 15 /**16 * 通过hfile形式获取数据17 */18 public class DataAchieveFromHfile {19 private static JavaPairRDDrdd;20 21 public static void main(String[] args) throws IOException {22 Configuration conf= HBaseConfiguration.create();23 conf.set("hbase.zookeeper.property.clientPort", "2181");24 conf.set("hbase.zookeeper.quorum","hdh1,hdh2,hdh3");25 conf.set(TableInputFormat.INPUT_TABLE, "test");26 SparkConf conf1=new SparkConf().setAppName("test").setMaster("local");//设置spark app名称和运行模式(此为local模式)27 JavaSparkContext sc=new JavaSparkContext(conf1);28 //加载数据29 rdd=sc.newAPIHadoopRDD(conf,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);30 System.out.println("读取数据条数:"+rdd.count());31 rdd.foreach(new VoidFunction >() {32 @Override33 public void call(Tuple2 result) throws Exception {34 byte [] bytes= result._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("SSID"));//列族和列名35 String str= new String(bytes,"UTF-8");36 if(null!=str&&str.trim().length()>0) {37 System.out.println(str.trim());38 }39 }40 });41 }42 }