import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
case class HBaseRecordAirline(col0: String,Year: Int,Quarter: Int,Month: Int,DayofMonth: Int,DayOfWeek: Int,FlightDate: Int,UniqueCarrier: String,AirlineID: String)
object HBaseRecordAirlineTest {def apply(i: Int): HBaseRecordAirline = {val s = s"""row${"%03d".format(i)}""";HBaseRecordAirline(s,i,i,i,i,i,i,s,s)}}
val cat = s"""{
| |"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
| |"rowkey":"key",
| |"columns":{
| |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
| |"Year":{"cf":"Year", "col":"Year", "type":"int"},
| |"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
| |"Month":{"cf":"Month", "col":"Month", "type":"int"},
| |"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
| |"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
| |"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
| |"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
| |"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
| |}
| |}""".stripMargin
import sqlContext.implicits._
val data = (0 to 8).map { i =>HBaseRecordAirlineTest(i)}
sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
def withCatalog(cat: String): DataFrame = {
| sqlContext
| .read
| .options(Map(HBaseTableCatalog.tableCatalog->cat))
| .format("org.apache.spark.sql.execution.datasources.hbase")
| .load()
| }
val df = withCatalog(cat)
df.registerTempTable("table1")
val c = sqlContext.sql("select AirlineID from table1")
c.show
|
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.