import
org.apache.spark.SparkContext
import
org.apache.spark.SparkContext.
_
import
org.apache.spark.sql.
_
import
org.apache.hadoop.hbase.client.HBaseAdmin
import
org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
import
org.apache.hadoop.hbase.mapred.TableOutputFormat
import
org.apache.hadoop.hbase.util.Bytes
import
org.apache.hadoop.hbase.client.Put
import
org.apache.hadoop.mapred.JobConf
import
org.apache.spark.rdd.PairRDDFunctions
object
BigData
02
{
def
main(args
:
Array[String])
:
Unit
=
{
val
sc
=
new
SparkContext(
"local"
,
"Chapter 8"
)
println(s
"Running Spark Version ${sc.version}"
)
val
conf
=
HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT
_
TABLE,
"test"
)
val
admin
=
new
HBaseAdmin(conf)
println(admin.isTableAvailable(
"test"
))
val
hBaseRDD
=
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
println(hBaseRDD.count())
hBaseRDD.foreach(println)
hBaseRDD.foreach(e
=
> ( println(
"%s | %s |"
.format( Bytes.toString(e.
_
1
.get()),e.
_
2
) ) ) )
println(
"** Read Done **"
)
val
testMap
=
Map(
"row4"
->
"value4"
)
val
pairs
=
sc.parallelize(List((
"row4"
,
"value4"
)))
pairs.foreach(println)
def
convert(triple
:
(String, String))
=
{
val
p
=
new
Put(Bytes.toBytes(triple.
_
1
))
p.add(Bytes.toBytes(
"cf"
), Bytes.toBytes(
"d"
), Bytes.toBytes(triple.
_
2
))
(
new
org.apache.hadoop.hbase.io.ImmutableBytesWritable, p)
}
val
jobConfig
:
JobConf
=
new
JobConf(conf,
this
.getClass)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT
_
TABLE,
"test"
)
new
PairRDDFunctions(pairs.map(convert)).saveAsHadoopDataset(jobConfig)
println(
"** Write Done **"
)
val
status
=
admin.getClusterStatus();
println(
"HBase Version : "
+status.getHBaseVersion())
println(
"Average Load : "
+status.getAverageLoad())
println(
"Backup Master Size : "
+ status.getBackupMastersSize())
println(
"Balancer On : "
+ status.getBalancerOn())
println(
"Cluster ID : "
+ status.getClusterId())
println(
"Server Info : "
+ status.getServerInfo())
}
}
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.