How to make joins in Spark Dataset API more type-safe.
I'm a senior freelance developer specializing in Scala and Apache Spark. I primarily work with companies in the fintech domain and help them tackle their big data challenges.
In this blog post, I want to address a common issue that we encounter when writing data transformations in Spark using the Dataset API. As developers, we often face a trade-off between optimizing for type-safety or performance. In my experience, this decision is particularly challenging in the fintech domain where the data can be complex and requires careful handling.
Over the years, I have found that prioritizing type-safety can make a significant difference in the reliability and accuracy of data transformations. In this article, I will share some tips and tricks that I have learned along the way to help you navigate this dilemma and make informed decisions when working with Spark's Dataset API.
Let's explore the limitations of Spark's Dataset API when it comes to type-safe "outer join" operations. As you may already know, the join
operation is one of the most common operations in Spark. However, it's surprising that the standard Dataset API does not provide a type-safe variant for outer joins. Although we do have joinWith
, it only works for the "inner join" variant. When we use the left.joinWith(right, ...)
operation, the resulting dataset of pairs Dataset[(L,R)]
can produce rows with null
fields on the right side in case of a "left_outer"
join. To handle these nulls in our Scala code, we must convert the R
part to an Option[R]
.
Here's an example of how we can achieve this conversion:
leftDS
.joinWith(rightDS, joinCondition, "left_outer")
.as[L, Option[R]] // conversion happens here
.map { case (left:L, right:Option[R]) => ... } // now we can safely map
Extending outer joins in Spark
Let's dive into how we can extend outer joins in Spark and simplify our code. After encountering the same pattern repeatedly, I decided to refactor it into a separate class. Using an implicit class, I created multiple useful methods that extend the joining functionality of a standard Dataset
.
For those interested, here's a quick rundown of the tech stack I used: Spark 3.3.1, Scala 2.13 (since Spark does not yet officially support Scala 3 at the time of writing this post).
Now, let's talk about how we implement this solution. Implicit classes are a feature of Scala that allows us to extend the functionality of an existing API without accessing its source code. This feature can significantly improve the developer experience when used carefully.
In our case, we define a few extension methods for the Dataset
type called left
. These methods handle the conversion of the R
type to Option[R]
for the "left_outer" join variant and enable us to write type-safe and concise code.
implicit class DatasetTypesafeJoins[L <: Product : TypeTag](left: Dataset[L]) {
type JoinResult = (L, Option[R]) // type alias for better traceability
def joinLeftOuterWith[R <: Product : TypeTag]
(right: Dataset[R], condition: Column): Dataset[JoinResult] = {
left
.joinWith(right, condition, "left_outer")
.as(Encoders.product[JoinResult])
}
// ... more extension methods
}
Moreover, we need to import the TypeTag
trait from scala.reflect.runtime.universe
package required by the Encoders.product
, which derives the encoder for our result type.
import scala.reflect.runtime.universe.TypeTag
Other useful extension methods that we can define are:
joinLeftOuterWith(right: Dataset[R], usingColumn: String)
joinLeftAntiWith
(right: Dataset[R], condition: Column)`joinLeftAntiWith
(right: Dataset[R], usingColumn: String)`crossJoinWith(right: Dataset[R])
joinLeftInnerWith(right: Dataset[R], usingColumn: String)
Challenges I faced
Unfortuatelly, the presented solution only works for Scala 2.13. This is due to the TypeTag
not being available in Scala 3.
Key learnings
Refactoring our code using this new API simplified many of our existing transformations. In summary, making joins type-safe in Spark's Dataset API ensures that you catch potential errors at compile time, saving debugging time and avoiding costly runtime errors. By adopting this approach, you can write more robust data pipelines with confidence that your joins are both correct and efficient.
Tips and advice
I wish such functionality would be available in standard Spark by default. Until then, you can at least use my solution. If you needed help with refactoring code or implementing similar functionality, just drop me an email or reach our on Codementor and schedule an online session.
Final thoughts and next steps
Next time, we can take a look at more extension methods for the Spark Dataset API and Kafka that I gathered from recent projects.
In summary, making joins type-safe in Spark's Dataset API ensures that you catch potential errors at compile time, saving debugging time and avoiding costly runtime errors. By adopting this approach, you can write more robust data pipelines with confidence that your joins are both correct and efficient.
I was trying to solve almost the same problem couple of years ago. Its not only about making it ‘typesafe’, ideally we can replace chained join placeholders like
(_._1._1._1._1.value) to something more nice
This ended with a small macro-base project https://github.com/Salamahin/joinwiz, which can not only bring you typesafety, but also speedup some of your tests with spark