- Connectors
- SHC (Spark Hbase Conector)
- https://www.youtube.com/watch?v=kU45zMl7TqA&feature=youtu.be
- https://github.com/hortonworks-spark/shc
- Developed by Hortonworks working with Bloomberg
- For spark 2.0
- spark-hbase-connector
- hbase-spark
- Comparison
- Blogs about SHC
- Blog about hbase-spark
- Test of SHC with spark 1.6
- git clone https://github.com/hortonworks-spark/shc.git
- Remove the following profile from the pom.xml file for spark 1.6 shell test
<profile> <id>scala-2.11</id> <activation> <property><name>!scala-2.10</name></property> </activation> <properties> <scala.version>2.11.8</scala.version> <scala.macros.version>2.1.0</scala.macros.version> <scala.binary.version>2.11</scala.binary.version> </properties> </profile>
- mvn package -DskipTests / mvn clean package test
- Start the spark-shell
- Option 1
- spark-shell --driver-class-path /usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase-client/lib/hbase-server.jar:/usr/hdp/currnt/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar:/etc/hbase/conf --files /usr/hdp/current/hbase-client/conf/hbase-site.xml --jars shc-core-1.1.0-1.6-s_2.10-SNAPSHOT.jar,/usr/hdp/current/phoenix-client/phoenix-server.jar
- Option 2
- spark-shell --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /usr/hdp/current/hbase-client/conf/hbase-site.xml
- Option 1
- Run the example
import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} case class HBaseRecordAirline(col0: String,Year: Int,Quarter: Int,Month: Int,DayofMonth: Int,DayOfWeek: Int,FlightDate: Int,UniqueCarrier: String,AirlineID: String) object HBaseRecordAirlineTest {def apply(i: Int): HBaseRecordAirline = {val s = s"""row${"%03d".format(i)}""";HBaseRecordAirline(s,i,i,i,i,i,i,s,s)}} val cat = s"""{ | |"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"}, | |"rowkey":"key", | |"columns":{ | |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, | |"Year":{"cf":"Year", "col":"Year", "type":"int"}, | |"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"}, | |"Month":{"cf":"Month", "col":"Month", "type":"int"}, | |"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"}, | |"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"}, | |"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"}, | |"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"}, | |"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"} | |} | |}""".stripMargin import sqlContext.implicits._ val data = (0 to 8).map { i =>HBaseRecordAirlineTest(i)} sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save() def withCatalog(cat: String): DataFrame = { | sqlContext | .read | .options(Map(HBaseTableCatalog.tableCatalog->cat)) | .format("org.apache.spark.sql.execution.datasources.hbase") | .load() | } val df = withCatalog(cat) df.registerTempTable("table1") val c = sqlContext.sql("select AirlineID from table1") c.show
- Check hbase
- hbase shell
- list
- get 'airdelaydata_scv_Test1','row000'
Thursday, June 15, 2017
Spark Hbase Connector
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.