Turning Scala code into Spark

by Anthony Cros, September 19 2016

The other day I found myself explaining to a bioinformatician in our team how we could process some data at scale. She is mostly used to processing data in-memory due to the relative small size of the data she typically deals with, but this time we were going to have to leverage a cluster in order to make it happen. The size of the data wasn't all that big, somewhere in the vicinity of 1TB, but just too big to fit in the most powerful machine we had handy. An HPC-based solution could also have been considered here, but would have required more low-level manipulations[1]. So I thought it'd be a good occasion to introduce her to Apache Spark. But because she was mostly used to processing data in an imperative manner (as opposed to declarative here), I figured I would first show her what the processing flow would look like in Scala[2]. That way I would not introduce both a new computational paradigm and an entirely new framework at once.


Scala approach

The original task at hand, part of a Stand Up 2 Cancer effort, consisted of turning a bunch of CSV files[3] into a highly nested structure and annotating nested documents from various web services. The most important part was to do two nested GROUP BYs, so I only kept that part. I made up two dummy input files that vaguely resembled the original ones:

f1.tsv:

gene  sample  other1
g1    s1      a1
g1    s2      b1
g1    s3a     c1
g2    s4      d1

And

f2.tsv:

gene  sample  other21  other22
g1    s1      a21      a22
g1    s2      b21      b22
g1    s3b     c21      c22
g2    s4      d21      d22
g3    s5      f21      f22

And came up with the following Scala code:

package demo.scala

// intentionally verbose so it reads more easily
// shouldn't actually be all in one file either
// ===========================================================================
object Inputs {

    trait HasGene   { val gene:   String } 
    trait HasSample { val sample: String }

    sealed trait F
      extends HasGene
         with HasSample

    // ---------------------------------------------------------------------------
    case class F1(
            gene:   String,
            sample: String,
            other1: String)
        extends F

        object F1 {

            def apply(line: String): F1 = { // factory of F1s
                val it = line.split("\t").iterator

                F1(
                    gene   = it.next, // TODO: should add checks
                    sample = it.next,
                    other1 = it.next)
            }

        }

    // ---------------------------------------------------------------------------
    case class F2(
            gene:    String,
            sample:  String,
            other21: String,
            other22: String)
        extends F

        object F2 {

            def apply(line: String): F2 = { // factory of F2s
                val it = line.split("\t").iterator

                F2(
                    gene    = it.next,
                    sample  = it.next,
                    other21 = it.next,
                    other22 = it.next)
            }

        }

}

// ===========================================================================
object Outputs {

  import Inputs._

    // ---------------------------------------------------------------------------
    case class GeneCentric(
        gene: String,
        samples: Iterable[SampleCentric])

    // ---------------------------------------------------------------------------
    case class SampleCentric(
        sample: String,
        extras: Iterable[Extra])

    // ---------------------------------------------------------------------------
    sealed trait Extra

        object Extra {

            case class Extra1(
                other1: String)
                extends Extra

            case class Extra2(
                other21: String,
                other22: String)
                extends Extra

            // factory of "extras" (either as Extra1 or Extra2,
            // based on the type of f we get)
            def apply(f: F): Extra =
                // this pattern matching is safe because F is sealed
                // (compiler will warn if we forget a case)
                // pattern matching is one of the most powerful scala constructs, see
                // alvinalexander.com/scala/using-match-expression-like-switch-statement
                f match {               
                    case F1(_, _, other1) =>           Extra1(other1)
                    case F2(_, _, other21, other22) => Extra2(other21, other22)
                }

        }

}

// ===========================================================================
object Demo extends App { // i.e. main ("App" trait)

    import Inputs._
    import Outputs._
    import Outputs.Extra._

    // ---------------------------------------------------------------------------
    // These are simply type aliases used for illustration purposes here
    type GenericSeq[A]      = Seq[A]
    type GenericIterable[A] = Iterable[A]

    // ---------------------------------------------------------------------------
    // read lines and transform each into a F1/F2
    // "apply()" is an implicit factory that case classes all have
    val f1s: GenericSeq[F] = readIn(args(0)).map(F1.apply)
    val f2s: GenericSeq[F] = readIn(args(1)).map(F2.apply)

    // ---------------------------------------------------------------------------
    val genes: GenericSeq[(String /* genes */, Map[String /* sample */, Iterable[F]])] =
        f1s

            // combine both file contents
            .union(f2s)

            // group by "gene" since they both are guaranteed
            // to have this property (they transitively extend HasGene via F)
            .groupBy(f => f.gene)

            // ignore key for now and focus on values (the groupings)
            .mapValues(geneGroup =>
                geneGroup

                    // within each such grouping,
                    // do the same thing but with "sample" 
                    .groupBy(f => f.sample))
            .toSeq

    //---------------------------------------------------------------------------
    val geneCentrics: GenericIterable[GeneCentric] =
        genes
            .map { case (genes, samples) => genes ->
                samples
                    .mapValues(f =>

                        // lastly extract last bits of interest
                        f.map(Extra.apply))

                    // we can now build the sample-centric object
                    // (does not capture the parent gene, though it could)
                    // note that this uses a scala trick to be able to pass
                    // a tuple to the primary constructor
                    .map((SampleCentric.apply _).tupled) }

            // we can now build the gene-centric object
            .map((GeneCentric.apply _).tupled)

    // ---------------------------------------------------------------------------
    // write all as giant json array
    val fw = new java.io.FileWriter("/tmp/demo.json")
        fw.write( // TODO: a scalable solution should stream instead
                geneCentrics
                    .map(geneCentric =>
                            net.liftweb.json.Serialization.writePretty
                                (geneCentric)
                                (net.liftweb.json.DefaultFormats))
                    .mkString("[", ",", "]"))
            fw.close()

    println("done.")

  // ===========================================================================
  def readIn(filePath: String): GenericSeq[String] =
    scala.io.Source
          .fromFile(filePath) // TODO: should actually be closed
            .getLines()
            .drop(1) // drop header (TODO: ideally would read schema from it)
            .toSeq // TODO: a scalable solution should stream instead

}

The code produces the following JSON ouput (slightly reformatted, order not guaranteed):

[{
  "gene":"g3",
  "samples":[
    {
      "sample":"s5",
      "extras":[
        { "other21":"f21", "other22":"f22" } ]
    }
  ]
},{
  "gene":"g2",
  "samples":[
    {
      "sample":"s4",
      "extras":[
        { "other1":"d1" },
        { "other21":"d21", "other22":"d22" }
      ]
    }
  ]
},{
  "gene":"g1",
  "samples":[
    {
      "sample":"s2",
      "extras":[
        { "other1":"b1" },
        { "other21":"b21", "other22":"b22" }
      ]
    },
    {
      "sample":"s1",
      "extras":[
        { "other1":"a1" },
        { "other21":"a21", "other22":"a22" }
      ]
    },
    {
      "sample":"s3b",
      "extras":[
        { "other21":"c21", "other22":"c22" } ]
    },
    {
      "sample":"s3a",
      "extras":[
        { "other1":"c1" } ]
    }
  ]
}]

Now obviously the above code would be way overkill for such a trivial task, but I also thought it had value in showing the strength of the Scala type system[4]. If one pays close attention to the code, the readIn method and its call-sites are really the only places where a runtime error can actually happen. The rest is guaranteed safe by the compiler. Now it doesn't guarantee that the program is semantically correct, but it guarantees that we've wired the various entities in a compatible manner. So the idea is to imagine how this could be part of a much more complex pipeline, where having such compile-time support would really pay dividends (otherwise I still like a good old python or groovy script for such simpler tasks).

So after walking her through that Scala code, it seems she was able to understand roughly how the different pieces fit together. She was still a bit unsettled by this new way of thinking about computation, but seemed to see the value in a more declarative approach to data processing.

The next logical step was to show her the Spark equivalent, albeit with its Scala API since I'm not as a familiar with the python one. All that would be left for her to do was to "translate" it to the python API, since that's the language she was most comfortable with. So I set out to rewrite the code with Spark. But slowly, I realized I was almost exactly replicating what I had just written in Scala... So upon completion, I tried reorganizing my Spark code in order to make it resemble the Scala one. And that's when it really struck me, how similar Scala collections and Spark really are... I was aware of the similarities, but it's not until I finished this little exercise that I realized how much so![5]

So for the rest of this post, I propose to walk you through the few steps required to turn the above Scala code into Spark code that can run locally (not all that useful for big computation but that's beside the point here).


Spark approach

Here is the Resulting file diffed with the original file:

Let's walk through the differences.

At the top, we need to import the relevant Spark entities:

import org.apache.spark
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.rdd.RDD

We then need to change the collection type of the - admittedly contrieved - collection aliases I created for illustration purposes:

- type GenericSeq[A]      = Seq[A]
+ type GenericSeq[A]      = RDD[A]
- type GenericIterable[A] = Iterable[A]
+ type GenericIterable[A] = RDD[A]

Then we need to add a SparkContext like so:

val sc =
  new spark.SparkContext(
    new spark.SparkConf()
      .setAppName("demo")
      .setMaster("local"))

A little bit of cheating to compensate for the fact that Spark uses sequences of tuples instead of maps:

- .toSeq

A way to collect the result, i.e. actually triggering the workflow (which up until then is only a "plan of action"):

+ .collect()

Lastly we need to change how the files are being read in:

    def readIn(filePath: String): GenericSeq[String] =
-       io.Source
-           .fromFile(filePath) // TODO: close
-           .getLines()
-           .drop(1) // drop header
-           .toSeq
+       sc
+           .textFile(filePath)
+           // not quite as clean as the Scala version
+           // but I didn't want to complicate things too much here
+           .filter(!_.startsWith("gene\t")) 

Et voilà ![6]


Conclusion

As you can see, 90% of the lines are the same, verbatim. And obviously, it produces the same result (order aside)!

In an upcoming post, I will show a significantly more compact way to express the same computation in Scala, though obviously at the expense of readability and scalability. I will then compare it to a python equivalent and comment on the relative sizes and merits of each[7].

Now I recognize that Scala is far from the easiest language to deal with, and that many things could be done to bolster its adoption in bioinformatics[8], but one has to admit that turning the above code into a scalable equivalent in just a few line changes is rather impressive!

Anyway, I thought it would be a good way to kickstart this blog :)

Feel free to share your comments/suggest corrections and improvements in the github issue I created for that purpose[9].


Anthony Cros
  @ The Bioinformatics Group
    @ The Department of Biomedical and Health Informatics
      @ The Children's Hospital of Philadelphia

A big thank you to the people who were nice enough to review the post, in alphabetical order: Emilie Lalonde, Pichai Raman, Deanne Taylor and Bob Tiernay.



Footnotes: