Thursday, June 15, 2017

Apache Spark GraphX: deal with relationships involving Strings as their id instead of numbers

  • Example
  • import org.apache.spark._
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    import scala.collection.mutable.ArrayBuffer
    import collection.mutable.HashMap
      
    val vertices = ArrayBuffer[(Long, (String, String, String))]()
    val vertice_map = new HashMap[String,Long]()  { override def default(key:String) = 0 }
    val airport_data = scala.io.Source.fromFile("/resources/airports.csv")
    var counter = 1
    for (line <- airport_data.getLines()) {
        val cols = line.split(",")
        if (cols(4) != "\\N" && cols(4) != "") {
            vertice_map += (cols(4) -> counter)
            vertices += ((counter, (cols(4), cols(1), cols(3))))
            counter += 1
        }
    }
    for (line <- vertices) {
        println(line)
    }
    val plane_vertexRDD: RDD[(Long, (String, String, String))] = sc.parallelize(vertices)
      
    val edges = ArrayBuffer[Edge[String]]()
    val route_data = scala.io.Source.fromFile("/resources/routes.csv")
    for (line <- route_data.getLines()) {
    val cols = line.split(",")
    if (vertice_map(cols(2)) != 0 || vertice_map(cols(4)) != 0) {
    edges += (Edge(vertice_map(cols(2)), vertice_map(cols(4)), cols(0)))
    }
    }
    for (line <- edges) {
    println(line)
    }
    val plane_edgeRDD: RDD[Edge[String]] = sc.parallelize(edges)
      
    var default_plane_vertex = ("Location""Currently""Unknown")
    val plane_graph = Graph(plane_vertexRDD, plane_edgeRDD, default_plane_vertex)
    for (triplet <- plane_graph.triplets.collect) {
    print(triplet.srcAttr._1)
    print(" to ")
    print(triplet.dstAttr._1)
    print(" with Arline ")
    println(triplet.attr)
    }
      
    val plane_graph_fixed = plane_graph.subgraph(vpred = (id, user_type) => user_type._1 != "Location")
    for (triplet <- plane_graph_fixed.triplets.collect) {
    print(triplet.srcAttr._1)
    print(" to ")
    print(triplet.dstAttr._1)
    print(" with Arline ")
    println(triplet.attr)
    }

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.