What is it?

Graph algorithms running on spark. Distributed. Fault tolerant. Efficient.

Steps

  1. git checkout ex5
  2. Add the dependency to build.sbt as usual:
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.0",
  "org.apache.spark" %% "spark-sql" % "1.4.0",
  "org.apache.spark" %% "spark-mllib" % "1.4.0",
  "org.apache.spark" %% "spark-graphx" % "1.4.0"
)
  1. Add imports:
import org.apache.spark._
import org.apache.spark.graphx._
  1. Define structures, parsers, etc. [Copy, paste this bit, even if you’ve been coding each step]
case class MyEntry(id:Long, label:String, nodeType:String, x:Float, y: Float)

object MyEntry {
  //Id,Label,type,xcoord,ycoord,category,Modularity Class,Eigenvector Centrality,In Degree,Out Degree,Degree,Weighted Degree

  val pattern = "([^,]+),(\"[^\"]+\"|[^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)*".r

  def apply(line:String) : MyEntry = {

    pattern.findFirstMatchIn(line) map {
      case p =>{
        val parts = p.subgroups
        try {
          MyEntry(parts(0).toLong, parts(1), parts(2), parts(3).toFloat, parts(4).toFloat)
        }
        catch{
          case e:Exception => defaultEntry
        }
      }
    } match {
      case Some(entry) => entry
      case _ => defaultEntry
    }

  }

  val defaultEntry = MyEntry(0, "", "", 0, 0)
}

object MyEdge {
  def apply(line:String) : Edge[Float] = {
    val parts = line.split(",").map(_.trim)

    Edge(parts(0).toLong, parts(1).toLong, parts(5).toFloat)
  }
}

Yes…parsing can be done a lot better. No, it’s not important here.

  1. Start the context, load the rdds:
    val sc = new SparkContext("local[*]", "hello-spark")

    val nodesRdd = sc.textFile("./data/graph/dh11_nodes.csv").filter(!_.startsWith("Id")).cache()

    println(nodesRdd.count)
    
    nodesRdd.take(5)
      .map(MyEntry(_))
      .foreach(println)


    val edgesRDD = sc.textFile("./data/graph/dh11_edges.csv").filter(!_.startsWith("Source"))
    edgesRDD.take(5)
      .map(MyEdge(_))
      .foreach(println)
  1. Get nodes and edges from our rdds, and create a graph:
    val nodes : RDD[(VertexId, MyEntry)] =
      nodesRdd.map(MyEntry(_)).map(x => (x.id, x))

    val edges = edgesRDD.map(MyEdge(_)).cache()

    val graph = Graph(nodes, edges, MyEntry.defaultEntry)
  1. Let’s find the vertice labels for all vertices with an in degree greater than 4:
    val filtered =
      graph.inDegrees.filter {case (_id, inDegrees) => inDegrees > 4}
        .map {case (id, _) => id}

    println("The following have more than 3 in degrees:")
    nodes.join(filtered.map((_, 1))).map(_._2._1.label).collect().foreach(println)
  1. Clean up
    sc.stop()