Turning Scala code into Spark
by Anthony Cros, September 19 2016
The other day I found myself explaining to a bioinformatician in our team how we could process some data at scale. She is mostly used to processing data in-memory due to the relative small size of the data she typically deals with, but this time we were going to have to leverage a cluster in order to make it happen. The size of the data wasn't all that big, somewhere in the vicinity of 1TB, but just too big to fit in the most powerful machine we had handy. An HPC-based solution could also have been considered here, but would have required more low-level manipulations[1]. So I thought it'd be a good occasion to introduce her to Apache Spark. But because she was mostly used to processing data in an imperative manner (as opposed to declarative here), I figured I would first show her what the processing flow would look like in Scala[2]. That way I would not introduce both a new computational paradigm and an entirely new framework at once.
Scala approach
The original task at hand, part of a Stand Up 2 Cancer effort, consisted of turning a bunch of CSV files[3] into a highly nested structure and annotating nested documents from various web services. The most important part was to do two nested GROUP BYs
, so I only kept that part. I made up two dummy input files that vaguely resembled the original ones:
f1.tsv:
gene sample other1
g1 s1 a1
g1 s2 b1
g1 s3a c1
g2 s4 d1
And
f2.tsv:
gene sample other21 other22
g1 s1 a21 a22
g1 s2 b21 b22
g1 s3b c21 c22
g2 s4 d21 d22
g3 s5 f21 f22
And came up with the following Scala code:
package demo.scala
// intentionally verbose so it reads more easily
// shouldn't actually be all in one file either
// ===========================================================================
object Inputs {
trait HasGene { val gene: String }
trait HasSample { val sample: String }
sealed trait F
extends HasGene
with HasSample
// ---------------------------------------------------------------------------
case class F1(
gene: String,
sample: String,
other1: String)
extends F
object F1 {
def apply(line: String): F1 = { // factory of F1s
val it = line.split("\t").iterator
F1(
gene = it.next, // TODO: should add checks
sample = it.next,
other1 = it.next)
}
}
// ---------------------------------------------------------------------------
case class F2(
gene: String,
sample: String,
other21: String,
other22: String)
extends F
object F2 {
def apply(line: String): F2 = { // factory of F2s
val it = line.split("\t").iterator
F2(
gene = it.next,
sample = it.next,
other21 = it.next,
other22 = it.next)
}
}
}
// ===========================================================================
object Outputs {
import Inputs._
// ---------------------------------------------------------------------------
case class GeneCentric(
gene: String,
samples: Iterable[SampleCentric])
// ---------------------------------------------------------------------------
case class SampleCentric(
sample: String,
extras: Iterable[Extra])
// ---------------------------------------------------------------------------
sealed trait Extra
object Extra {
case class Extra1(
other1: String)
extends Extra
case class Extra2(
other21: String,
other22: String)
extends Extra
// factory of "extras" (either as Extra1 or Extra2,
// based on the type of f we get)
def apply(f: F): Extra =
// this pattern matching is safe because F is sealed
// (compiler will warn if we forget a case)
// pattern matching is one of the most powerful scala constructs, see
// alvinalexander.com/scala/using-match-expression-like-switch-statement
f match {
case F1(_, _, other1) => Extra1(other1)
case F2(_, _, other21, other22) => Extra2(other21, other22)
}
}
}
// ===========================================================================
object Demo extends App { // i.e. main ("App" trait)
import Inputs._
import Outputs._
import Outputs.Extra._
// ---------------------------------------------------------------------------
// These are simply type aliases used for illustration purposes here
type GenericSeq[A] = Seq[A]
type GenericIterable[A] = Iterable[A]
// ---------------------------------------------------------------------------
// read lines and transform each into a F1/F2
// "apply()" is an implicit factory that case classes all have
val f1s: GenericSeq[F] = readIn(args(0)).map(F1.apply)
val f2s: GenericSeq[F] = readIn(args(1)).map(F2.apply)
// ---------------------------------------------------------------------------
val genes: GenericSeq[(String /* genes */, Map[String /* sample */, Iterable[F]])] =
f1s
// combine both file contents
.union(f2s)
// group by "gene" since they both are guaranteed
// to have this property (they transitively extend HasGene via F)
.groupBy(f => f.gene)
// ignore key for now and focus on values (the groupings)
.mapValues(geneGroup =>
geneGroup
// within each such grouping,
// do the same thing but with "sample"
.groupBy(f => f.sample))
.toSeq
//---------------------------------------------------------------------------
val geneCentrics: GenericIterable[GeneCentric] =
genes
.map { case (genes, samples) => genes ->
samples
.mapValues(f =>
// lastly extract last bits of interest
f.map(Extra.apply))
// we can now build the sample-centric object
// (does not capture the parent gene, though it could)
// note that this uses a scala trick to be able to pass
// a tuple to the primary constructor
.map((SampleCentric.apply _).tupled) }
// we can now build the gene-centric object
.map((GeneCentric.apply _).tupled)
// ---------------------------------------------------------------------------
// write all as giant json array
val fw = new java.io.FileWriter("/tmp/demo.json")
fw.write( // TODO: a scalable solution should stream instead
geneCentrics
.map(geneCentric =>
net.liftweb.json.Serialization.writePretty
(geneCentric)
(net.liftweb.json.DefaultFormats))
.mkString("[", ",", "]"))
fw.close()
println("done.")
// ===========================================================================
def readIn(filePath: String): GenericSeq[String] =
scala.io.Source
.fromFile(filePath) // TODO: should actually be closed
.getLines()
.drop(1) // drop header (TODO: ideally would read schema from it)
.toSeq // TODO: a scalable solution should stream instead
}
The code produces the following JSON ouput (slightly reformatted, order not guaranteed):
[{
"gene":"g3",
"samples":[
{
"sample":"s5",
"extras":[
{ "other21":"f21", "other22":"f22" } ]
}
]
},{
"gene":"g2",
"samples":[
{
"sample":"s4",
"extras":[
{ "other1":"d1" },
{ "other21":"d21", "other22":"d22" }
]
}
]
},{
"gene":"g1",
"samples":[
{
"sample":"s2",
"extras":[
{ "other1":"b1" },
{ "other21":"b21", "other22":"b22" }
]
},
{
"sample":"s1",
"extras":[
{ "other1":"a1" },
{ "other21":"a21", "other22":"a22" }
]
},
{
"sample":"s3b",
"extras":[
{ "other21":"c21", "other22":"c22" } ]
},
{
"sample":"s3a",
"extras":[
{ "other1":"c1" } ]
}
]
}]
Now obviously the above code would be way overkill for such a trivial task, but I also thought it had value in showing the strength of the Scala type system[4]. If one pays close attention to the code, the readIn
method and its call-sites are really the only places where a runtime error can actually happen. The rest is guaranteed safe by the compiler. Now it doesn't guarantee that the program is semantically correct, but it guarantees that we've wired the various entities in a compatible manner. So the idea is to imagine how this could be part of a much more complex pipeline, where having such compile-time support would really pay dividends (otherwise I still like a good old python or groovy script for such simpler tasks).
So after walking her through that Scala code, it seems she was able to understand roughly how the different pieces fit together. She was still a bit unsettled by this new way of thinking about computation, but seemed to see the value in a more declarative approach to data processing.
The next logical step was to show her the Spark equivalent, albeit with its Scala API since I'm not as a familiar with the python one. All that would be left for her to do was to "translate" it to the python API, since that's the language she was most comfortable with. So I set out to rewrite the code with Spark. But slowly, I realized I was almost exactly replicating what I had just written in Scala... So upon completion, I tried reorganizing my Spark code in order to make it resemble the Scala one. And that's when it really struck me, how similar Scala collections and Spark really are... I was aware of the similarities, but it's not until I finished this little exercise that I realized how much so![5]
So for the rest of this post, I propose to walk you through the few steps required to turn the above Scala code into Spark code that can run locally (not all that useful for big computation but that's beside the point here).
Spark approach
Here is the Resulting file diffed with the original file:
Let's walk through the differences.
At the top, we need to import the relevant Spark entities:
import org.apache.spark
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.rdd.RDD
We then need to change the collection type of the - admittedly contrieved - collection aliases I created for illustration purposes:
- type GenericSeq[A] = Seq[A]
+ type GenericSeq[A] = RDD[A]
- type GenericIterable[A] = Iterable[A]
+ type GenericIterable[A] = RDD[A]
Then we need to add a SparkContext
like so:
val sc =
new spark.SparkContext(
new spark.SparkConf()
.setAppName("demo")
.setMaster("local"))
A little bit of cheating to compensate for the fact that Spark uses sequences of tuples instead of maps:
- .toSeq
A way to collect the result, i.e. actually triggering the workflow (which up until then is only a "plan of action"):
+ .collect()
Lastly we need to change how the files are being read in:
def readIn(filePath: String): GenericSeq[String] =
- io.Source
- .fromFile(filePath) // TODO: close
- .getLines()
- .drop(1) // drop header
- .toSeq
+ sc
+ .textFile(filePath)
+ // not quite as clean as the Scala version
+ // but I didn't want to complicate things too much here
+ .filter(!_.startsWith("gene\t"))
Et voilĂ ![6]
Conclusion
As you can see, 90% of the lines are the same, verbatim. And obviously, it produces the same result (order aside)!
In an upcoming post, I will show a significantly more compact way to express the same computation in Scala, though obviously at the expense of readability and scalability. I will then compare it to a python equivalent and comment on the relative sizes and merits of each[7].
Now I recognize that Scala is far from the easiest language to deal with, and that many things could be done to bolster its adoption in bioinformatics[8], but one has to admit that turning the above code into a scalable equivalent in just a few line changes is rather impressive!
Anyway, I thought it would be a good way to kickstart this blog :)
Feel free to share your comments/suggest corrections and improvements in the github issue I created for that purpose[9].
Anthony Cros
A big thank you to the people who were nice enough to review the post, in alphabetical order: Emilie Lalonde, Pichai Raman, Deanne Taylor and Bob Tiernay.
Footnotes:
- [1] The kind I would like bioinformaticians in our team not to have to deal with: they have precious knowledge I'd rather see applied to solve new problems rather than reinventing the wheel every time.
- [2] See also the excellent Twitter Scala Schools and Daniel Westheide's Neophyte's Guide to Scala as learning resources
- [3] Bioinformaticians sure like their CSV files!
- [4] Not the most diplomatic case for it but still valuable.
- [5] I had googled it around first and found relatively little guidance on the matter.
- [6] I'm actually French, so it's legit.
- [7] It is possible to write very compact Scala code, though I tend to find that it defeats its purpose if overused.
- [8] See upcoming post (in the works, I'll link from here once it's ready).
- [9] I'm hoping to turn this into a proper blog soon, but this will have to do at first.