[Scala] Scala操作HBase

 

 


package com.huawei.bigdata.spark.examples

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Admin
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory

import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

import com.huawei.hadoop.security.LoginUtil
/**
Run Command:
nohup spark-submit --class com.huawei.bigdata.spark.examples.scalaHBase --master yarn-client /home/ap/dip/appjob/shelljob/dzyh/jobScala/ebda_mllib.jar scala_HBase > scala_hbase.log &
*/
object scalaHBase {
def main (args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: CollectFemaleInfo <file>")
System.exit(1)
}
/**STEP1: Variables Initialized**/

val userPrincipal = "ebda"
val userKeytabPath = "/home/ap/dip/appjob/shelljob/dzyh/jobScala/user.keytab"
val krb5ConfPath = "/home/ap/dip/appjob/shelljob/dzyh/jobScala/krb5.conf"
val hadoopConf: Configuration = new Configuration()
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);

// Configure the Spark application name.
val conf = new SparkConf().setAppName(args(0))
val sc=new SparkContext(conf)

val conf_HBase = HBaseConfiguration.create()
conf_HBase.addResource("/home/ap/dip/appjob/shelljob/dzyh/jobScala/hdfs-site.xml")
conf_HBase.addResource("/home/ap/dip/appjob/shelljob/dzyh/jobScala/core-site.xml")
conf_HBase.addResource("/home/ap/dip/appjob/shelljob/dzyh/jobScala/hbase-site.xml")
conf_HBase.addResource("/home/ap/dip/appjob/shelljob/dzyh/jobScala/user.keytab")
conf_HBase.addResource("/home/ap/dip/appjob/shelljob/dzyh/jobScala/krb5.conf")

//2、验证登陆信息
val PRNCIPAL_NAME = "ebda"
val LOGIN_CONTEXT_NAME = "Client";
val PATH_TO_KEYTAB = "/home/ap/dip/appjob/shelljob/dzyh/jobScala/user.keytab"
val PATH_TO_KRB5_CONF = "/home/ap/dip/appjob/shelljob/dzyh/jobScala/krb5.conf"

val ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal"
val ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop.hadoop.com"


LoginUtil.setJaasConf(LOGIN_CONTEXT_NAME, PRNCIPAL_NAME, PATH_TO_KEYTAB)
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY,ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf_HBase);

//Connection
val conn = ConnectionFactory.createConnection(conf_HBase)
val admin = conn.getAdmin

/**STEP3: Data Analysis**/

val userTable = TableName.valueOf("tbl_scala_user")

//STEP3.1 创建 user 表
val tableDescr = new HTableDescriptor(userTable)
tableDescr.addFamily(new HColumnDescriptor("basic".getBytes))
admin.createTable(tableDescr)
println("Creating table `tbl_scala_user`. ")


//STEP3.2 插入数据
val table=conn.getTable(userTable)
val p=new Put("ID01".getBytes())
//为put操作指定 column 和 value (以前的 put.add 方法被弃用了)
p.addColumn("basic".getBytes,"Name".getBytes,"zacks".getBytes)
table.put(p)
val p2=new Put("ID02".getBytes())
p2.addColumn("basic".getBytes,"Name".getBytes,"aerois".getBytes)
table.put(p2)


//STEP3.3查询数据
val g=new Get("ID01".getBytes)
val result=table.get(g)
val name=Bytes.toString(result.getValue("basic".getBytes,"Name".getBytes))
println("GET ID01:"+name)

//STEP3.4删除user表
if(admin.tableExists(userTable)) {
admin.disableTable(userTable)
admin.deleteTable(userTable)
}
println("JOB Done!")

}
}

分类上一篇:     分类下一篇:

Leave a Reply