Как сделать соединения в Spark Dataset API более типобезопасными.
Обо мне
Я старший внештатный разработчик, работающий со Scala и Apache Spark для компаний, работающих в сфере финансовых технологий.
Проблема, которую я хотел решить
При написании преобразования данных в Spark с использованием его API набора данных мы часто сталкиваемся с дилеммой. Должны ли мы оптимизировать безопасность типов или производительность? В области финансовых технологий обрабатываемые данные могут быть довольно сложными, и поэтому безопасность типов важнее (по крайней мере, в проектах, над которыми я работаю).
Одной из наиболее распространенных операций в Spark является join
операция. Я нахожу довольно странным, что стандартный API набора данных не предоставляет типобезопасный вариант «внешнего соединения». Конечно, есть joinWith
доступен, но безопасен только для варианта «внутреннее соединение». Результат left.joinWith(right, ...)
операция представляет собой набор данных пар Dataset[(L,R)]
который для соединения «left_outer» может создавать строки с null
поля на right
сторона. Чтобы избежать загрязнения нашего кода Scala нулями, мы должны преобразовать R
часть Option[R]
например следующим образом:
leftDS
.joinWith(rightDS, joinCondition, "left_outer")
.as[L, Option[R]]
.map { case (left:L, right:Option[R]) => ... }
Расширение внешних объединений в Spark
Видя этот паттерн снова и снова, я решил провести его рефакторинг в отдельный класс. Собственно, я создал неявный класс содержащий несколько полезных методов, которые расширяют функциональность соединения стандартного Dataset
.
Стек технологий
Spark 3.3.1, Scala 2.13 (поскольку Spark пока официально не поддерживает Scala 3)
Реализация
Неявные классы позволяют нам расширять функциональность существующего API без необходимости доступа к его исходному коду. Это приятная особенность Scala, которая при осторожном использовании может улучшить опыт разработчиков. В нашем случае мы определяем несколько методов расширения для Dataset
тип называется left
:
implicit class DatasetTypesafeJoins[L <: Product : TypeTag](left: Dataset[L]) {
type JoinResult = (L, Option[R])
def joinLeftOuterWith[R <: Product : TypeTag]
(right: Dataset[R], condition: Column): Dataset[JoinResult] = {
left
.joinWith(right, condition, "left_outer")
.as(Encoders.product[JoinResult])
}
}
Кроме того, нам нужно импортировать TypeTag
черта от scala.reflect.runtime.universe
пакет, требуемый Encoders.product
который выводит кодировщик для нашего типа результата.
import scala.reflect.runtime.universe.TypeTag
Другие полезные методы расширения, которые мы можем определить:
joinLeftOuterWith(right: Dataset[R], usingColumn: String)
joinLeftAntiWith
(справа: набор данных[R]условие: Столбец)`joinLeftAntiWith
(справа: набор данных[R]используяКолонка: Строка)`crossJoinWith(right: Dataset[R])
joinLeftInnerWith(right: Dataset[R], usingColumn: String)
Проблемы, с которыми я столкнулся
К сожалению, представленное решение работает только для Scala 2.13. Это связано с TypeTag
недоступен в Scala 3.
Основные выводы
Рефакторинг нашего кода с использованием этого нового API упростил многие из наших существующих преобразований. Мы также смогли избежать некоторых NullPointerExceptions
из прошлого (и, надеюсь, из будущего).
Советы и советы
Хотелось бы, чтобы такая функциональность была доступна в стандартном Spark по умолчанию. До тех пор вы можете по крайней мере использовать мое решение. Если вам нужна помощь с рефакторингом кода или реализацией аналогичных функций, просто напишите мне по электронной почте или свяжитесь с нами на Codementor и запланируйте онлайн-сессию.
Заключительные мысли и следующие шаги
В следующий раз мы можем взглянуть на дополнительные методы расширения для Spark Dataset API и Kafka, которые я собрал из последних проектов.