import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import collection.mutable.HashMap
val vertices = ArrayBuffer[(Long, (String, String, String))]()
val vertice_map = new HashMap[String,Long]() { override def default(key:String) = 0 }
val airport_data = scala.io.Source.fromFile("/resources/airports.csv")
var counter = 1
for (line <- airport_data.getLines()) {
val cols = line.split(",")
if (cols(4) != "\\N" && cols(4) != "") {
vertice_map += (cols(4) -> counter)
vertices += ((counter, (cols(4), cols(1), cols(3))))
counter += 1
}
}
for (line <- vertices) {
println(line)
}
val plane_vertexRDD: RDD[(Long, (String, String, String))] = sc.parallelize(vertices)
val edges = ArrayBuffer[Edge[String]]()
val route_data = scala.io.Source.fromFile("/resources/routes.csv")
for (line <- route_data.getLines()) {
val cols = line.split(",")
if (vertice_map(cols(2)) != 0 || vertice_map(cols(4)) != 0) {
edges += (Edge(vertice_map(cols(2)), vertice_map(cols(4)), cols(0)))
}
}
for (line <- edges) {
println(line)
}
val plane_edgeRDD: RDD[Edge[String]] = sc.parallelize(edges)
var default_plane_vertex = ("Location", "Currently", "Unknown")
val plane_graph = Graph(plane_vertexRDD, plane_edgeRDD, default_plane_vertex)
for (triplet <- plane_graph.triplets.collect) {
print(triplet.srcAttr._1)
print(" to ")
print(triplet.dstAttr._1)
print(" with Arline ")
println(triplet.attr)
}
val plane_graph_fixed = plane_graph.subgraph(vpred = (id, user_type) => user_type._1 != "Location")
for (triplet <- plane_graph_fixed.triplets.collect) {
print(triplet.srcAttr._1)
print(" to ")
print(triplet.dstAttr._1)
print(" with Arline ")
println(triplet.attr)
}
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.