ADT support for Flink with Shapeless
This project is maintained by joroKr21
Flink-Shapeless replaces the default macro based implicit provider for
TypeInformation[T]
in Apache Flink’s Scala API
with automatic type class derivation based on
Shapeless.
The primary use case of Flink-Shapeless is to enable custom implicit
TypeInformation
instances in scope to override the default.
// Import Flink's Scala API as usual
import org.apache.flink.api.scala._
// Replace the macro-based TypeInformation provider
import derived.auto._
// Override TypeInformation[String]
implicit val strTypeInfo = MyASCIIStringTypeInfo
// Strings below are serialized with ASCII encoding,
// even when nested in tuples, data structures, etc.
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("/path/to/file")
val counts = text
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty).map(_ -> 1)
.groupBy(0).sum(1)
There are a couple of advantages to automatic type class derivation over the default macro based approach.
Automatic derivation uses a modified version of the Scala implicit resolution mechanism with lowest priority. Thus it can be overridden for specific types by providing an implicit instance anywhere in scope, including in a companion object as idiomatic in Scala.
case class Foo(x: Int)
object Foo {
implicit val info: TypeInformation[Foo] =
MyOptimizedFooTypeInfo
}
case class Bar(foo: Foo, y: Double)
// All instances below use the optimized version.
implicitly[TypeInformation[Foo]]
implicitly[TypeInformation[List[Foo]]]
implicitly[TypeInformation[(Foo, Long)]]
implicitly[TypeInformation[Bar]]
Creating custom serializers from scratch is usually not what you want to do.
Most often, you want to map your custom data type to one with an existing serializer.
This is where the Inject
type class comes in
(called like this because it is essentially an injective i.e. invertible function between the two data types).
E.g. the following definition is enough to provide a TypeInformation
instance for a Breeze Vector
.
import breeze.linalg.Vector
implicit def injectVector[A]: Inject[Vector[A], Array[A]] =
Inject(_.toArray, Vector(_))
The default macro based implementation cannot handle Recursive data types or Coproducts without the use of reflection based serializers like Kryo. Only product types (tuples and case classes) are handled natively.
Flink-Shapeless extends the native Flink support to arbitrary Algebraic data types (ADTs) and will fail at compile time rather than default to runtime reflection. In Scala ADTs are encoded as sealed traits and case classes.
// Example: Recursive product
case class NTree[+A](v: A, children: List[NTree[A]])
// Example: Recursive coproduct
sealed trait BTree[+A]
case object BLeaf extends BTree[Nothing]
case class BNode[+A](l: BTree[A], v: A, r: BTree[A]) extends BTree[A]
Checkout the TypeSerializer
microbenchmarks comparing the default (Kryo) with the derived (via Shapeless) serializer on the NTree
and BTree
examples above. Flink-Shapeless achieves up to 10x speedup for NTree
and up to 3x speedup for BTree
.
More details about the setup:
There are a few well known limitations of automatic type class derivation with Shapeless.
TypeInformation
for coproducts.