def putLogsToHTable(p: Iterator[(HBaseKey,String)],bp:Broadcast[Properties]):Unit = {
val (ugi,hBaseConf): (UserGroupInformation,Configuration) = getUGIAndConf()
UserGroupInformation.setLoginUser(ugi)
val hbaseTableName:TableName = TableName.valueOf(bp.value.getProperty("spark.netlog_hbase_namespace"))
ugi.doAs(new PrivilegedExceptionAction[Unit]{
override def run() = {
val conn = ConnectionFactory.createConnection(hBaseConf)
val hbaseTable = conn.getTable(hbaseTableName)
val puts = new util.ArrayList[Put]()
p.foreach(line => {
val (hRow,hFamily,hQualifier) = line._1
val put = new Put(Bytes.toBytes(hRow)).add(hFamily.getBytes,hQualifier.getBytes,Bytes.toBytes(line._2))
puts.add(put)
})
}
hbaseTable.put(puts)
hbaseTable.close()
conn.close()
}
)
def getUGIAndConf(bp:Broadcast[Properties]):(UserGroupInformation,Configuration) = {
val hBaseConf = getHBaseConf(bp)
val ugi = kinit(hBaseConf,bp)
(ugi,hBaseConf)
}
def getHBaseConf(bp:Broadcast[Properties]):Configuration = {
val hBaseConf = HBaseConfiguration.create()
val hBaseSiteXML = bp.value.getProperty("HBASE_SITE_XML")
val hdfsSiteXML = bp.value.getProperty("HDFS_SITE_XML")
val coreSiteXML = bp.value.getProperty("CORE_SITE_XML")
hBaseConf.addResource(hdfsSite.XML)
hBaseConf.addResource(hBaseSite.XML)
hBaseConf.addResource(coreSite.XML)
hBaseConf.set("hadoop.security.authentication","kerberos")
hBaseConf.set("hbase.security.authentication","kerberos")
}
def kinit(hadoopConfig: Configuration, bp: Broadcast[Properties]): UserGroupInformation = {
UserGroupInformation.setConfiguration(hadoopConfig)
val userName = bp.value.getProperty("HBASE_KEYTAB_USER")
//这里使用的是通过获取当前线程的文件,来加载path路径,如果直接给文件名,可能加载不到,导致连接拒绝
val ketTableFile = Thread.currentThread().getContextClassLoader.getResource(bp.value.getProperty("HBASE_KEYTAB_FILE")).getPath
UserGroupInformation.loginUserFromKeytableAndReturnUGI(userName,keyTableFile)
}