import
org.apache.spark.
_
import
org.apache.spark.graphx.
_
import
org.apache.spark.rdd.RDD
import
scala.collection.mutable.ArrayBuffer
import
collection.mutable.HashMap
val
vertices
=
ArrayBuffer[(Long, (String, String, String))]()
val
vertice
_
map
=
new
HashMap[String,Long]() {
override
def
default(key
:
String)
=
0
}
val
airport
_
data
=
scala.io.Source.fromFile(
"/resources/airports.csv"
)
var
counter
=
1
for
(line <- airport
_
data.getLines()) {
val
cols
=
line.split(
","
)
if
(cols(
4
) !
=
"\\N"
&& cols(
4
) !
=
""
) {
vertice
_
map +
=
(cols(
4
) -> counter)
vertices +
=
((counter, (cols(
4
), cols(
1
), cols(
3
))))
counter +
=
1
}
}
for
(line <- vertices) {
println(line)
}
val
plane
_
vertexRDD
:
RDD[(Long, (String, String, String))]
=
sc.parallelize(vertices)
val
edges
=
ArrayBuffer[Edge[String]]()
val
route
_
data
=
scala.io.Source.fromFile(
"/resources/routes.csv"
)
for
(line <- route
_
data.getLines()) {
val
cols
=
line.split(
","
)
if
(vertice
_
map(cols(
2
)) !
=
0
|| vertice
_
map(cols(
4
)) !
=
0
) {
edges +
=
(Edge(vertice
_
map(cols(
2
)), vertice
_
map(cols(
4
)), cols(
0
)))
}
}
for
(line <- edges) {
println(line)
}
val
plane
_
edgeRDD
:
RDD[Edge[String]]
=
sc.parallelize(edges)
var
default
_
plane
_
vertex
=
(
"Location"
,
"Currently"
,
"Unknown"
)
val
plane
_
graph
=
Graph(plane
_
vertexRDD, plane
_
edgeRDD, default
_
plane
_
vertex)
for
(triplet <- plane
_
graph.triplets.collect) {
print(triplet.srcAttr.
_
1
)
print(
" to "
)
print(triplet.dstAttr.
_
1
)
print(
" with Arline "
)
println(triplet.attr)
}
val
plane
_
graph
_
fixed
=
plane
_
graph.subgraph(vpred
=
(id, user
_
type)
=
> user
_
type.
_
1
!
=
"Location"
)
for
(triplet <- plane
_
graph
_
fixed.triplets.collect) {
print(triplet.srcAttr.
_
1
)
print(
" to "
)
print(triplet.dstAttr.
_
1
)
print(
" with Arline "
)
println(triplet.attr)
}
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.