0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

基于DPU和HADOS-RACE加速Spark 3.x

中科驭数 来源:中科驭数 作者:中科驭数 2024-03-25 18:12 次阅读

背景简介

Apache Spark(下文简称Spark)是一种开源集群计算引擎,支持批/流计算、SQL分析、机器学习、图计算等计算范式,以其强大的容错能力、可扩展性、函数式API、多语言支持(SQL、PythonJava、Scala、R)等特性在大数据计算领域被广泛使用。其中,Spark SQL 是 Spark 生态系统中的一个重要组件,它允许用户以结构化数据的方式进行数据处理,提供了强大的查询和分析功能。

随着SSD和万兆网卡普及以及IO技术的提升,CPU计算逐渐成为Spark 作业的瓶颈,而IO瓶颈则逐渐消失。 有以下几个原因,首先,因为 JVM 提供的 CPU 指令级的优化如 SIMD要远远少于其他 Native 语言(如C/C++,Rust)导致基于 JVM 进行 CPU 指令的优化比较困难。其次,NVMe SSD缓存技术和AQE带来的自动优化shuffle极大的减轻了IO延迟。最后,Spark的谓词下推优化跳过了不需要的数据,进一步减少了IO开销。

基于此背景,Databricks(Spark背后的商业公司)在2022年SIGMOD会议上发表论文《Photon: A Fast Query Engine for Lakehouse Systems》,其核心思想是使用C++、向量化执行等技术来执行Spark物理计划,在客户工作负载上获得了平均3倍、最大10倍的性能提升,这证明Spark向量化及本地化是后续值得优化的方向。 Spark3.0(2020年6月发布)开始支持了数据的列式处理,英伟达也提出了利用GPU加速Spark的方案,利用GPU的列式计算和并发能力加速Join、Sort、Aggregate等常见的ETL操作。

DPU(Data Processing Unit) 作为未来计算的三大支柱之一,其设计旨在提供强大的计算能力,以加速各种数据处理任务。DPU的硬件加速能力,尤其在数据计算、数据过滤等计算密集型任务上,为处理海量数据提供了新的可能。通过高度定制和优化的架构,DPU能够在处理大规模数据时显著提升性能,为数据中心提供更高效、快速的计算体验,从而满足现代数据处理需求的挑战。但是目前DPU对Spark生态不能兼容,Spark计算框架无法利用DPU的计算优势。

中科驭数HADOS 异构计算加速软件平台(下文简称HADOS)是一款敏捷异构软件平台,能够为网络、存储、安全、大数据计算等场景进行提速。对于大数据计算场景,HADOS可以认为是一个异构执行库,提供了数据类型、向量数据结构、表达式计算、IO和资源管理等功能。 为了发挥Spark与DPU各自的优势,基于HADOS平台,我们开发了RACE算子卸载引擎,既能够发挥Spark优秀的分布式调度能力又可以发挥DPU的向量化执行能力。

我们通过实验发现,将Spark SQL的计算任务通过RACE卸载到DPU上, 预期可以把原生SparkSQL的单表达式的执行效率提升至9.97倍,TPC-DS单Query提升最高4.56倍。本文将介绍如何基于 DPU和RACE来加速 Spark SQL的查询速度,为大规模数据分析和处理提供更可靠的解决方案。

整体架构

整个解决方案可以参考下图:

wKgZomXcZN2AYm7rAAIEZvTT08A684.png

• 最底层硬件资源层是DPU硬件,是面向数据中心的专用处理器,其设计旨在提供强大的计算能力,以加速各种数据处理任务,尤其是优化Spark等大数据框架的执行效率。通过高度定制和优化的架构,DPU能够在处理大规模数据时显著提升性能,为数据中心提供更高效、快速的计算体验。

• DPU加速层底层是HADOS异构计算加速软件平台,是中科驭数推出的专用计算敏捷异构软件开发平台。HADOS数据查询加速库通过提供基于列式数据的查询接口,供数据查询应用。支持Java、Scala、C和C++语言的函数调用,主要包括列数据管理、数据查询运行时函数、任务调度引擎、函数运算代价评估、内存管理、存储管理、硬件管理、DMA引擎、日志引擎等模块,目前对外提供数据管理、查询函数、硬件管理、文件存储相关功能API。

• DPU加速层中的RACE层,其最核心的能力就是修改执行计划树,通过 Spark Plugin 的机制,将Spark 执行计划拦截并下发给 DPU来执行,跳过原生 Spark 不高效的执行路径。整体的执行框架仍沿用 Spark 既有实现,包括消费接口、资源和执行调度、查询计划优化、上下游集成等。

• 最上层是面向用户的原生Spark,用户可以直接使用已有的业务逻辑,无感享受DPU带来的性能提升

目前支持的算子覆盖Spark生产环境常用算子,包括Scan、Filter、Project、Union、Hash Aggregation、Sort、Join、Exchange等。表达式方面,我们开发了目前生产环境常用的布尔函数、Sum/Count/AVG/Max/Min等聚合函数。

其中RACE层的架构如下:

wKgaomXcZN2Ad8CsAADO2y4AKy4604.png

下面我们着重介绍RACE层的核心功能。

核心功能模块

RACE与Spark的集成

RACE作为Spark的一个插件,实现了SparkPlugin接口,与Spark的集成分为Driver端和Executor端。

• 在Driver端, 通过Spark Catalyst扩展点插入自定义的规则,实现对查询语句解析过程、优化过程以及物理计划转换过程的控制。

• 在Executor端, 插件在Executor的初始化过程中完成DPU设备的初始化工作。

wKgZomXcZN6AMq1KAACkG9BHo5E587.png

Plan Conversion

Spark SQL在优化 Physical Plan时,会应用一批规则,RACE通过插入的自定义规则可以拦截到优化后的Physical Plan,如果发现当前算子上的所有表达式可以下推给DPU,那么替换Spark原生算子为相应的可以在DPU上执行的自定义算子,由HADOS将其下推给DPU 来执行并返回结果。

Fallback

Spark支持的Operator和Expression非常多,在RACE研发初期,无法 100% 覆盖 Spark 查询执行计划中的算子和表达式,因此 RACE必须有Fallback机制,支持Spark 查询执行计划中部分算子不运行在DPU上。

对于DPU无法执行的算子,RACE安排 Fallback 回正常的 Spark 执行路径进行计算。例如,下图中展示了插件对原生计划树的修改情况,可以下推给DPU的算子都替换成了对应的"Dpu"开头的算子,不能下推的算子仍然保留。除此之外,会自动插入行转列算子或者列转行算子来适配数据格式的变化。

wKgaomXYe0uAVcwUAAGOSHWvTHU017.png

当然了,不管是行转列算子还是列转行算子,都是开销比较大的算子,随着RACE支持的算子和表达式越来越多,Fallback的情况会逐渐减少。

Strategy

当查询计划中存在未卸载的算子时,因为这样引入了行列转换算子,由于其带来了额外的开销,导致即使对于卸载到DPU上的算子,其性能得到提升,而对于整个查询来说,可能会出现比原生Spark更慢的情况。 针对这种情况,最稳妥的方式就是整个Query全部回退到CPU,这至少不会比原生Spark慢,这是很重要的。

由于Spark3.0加入了AQE的支持,规则通常拦截到的是一个个QueryStage,它是Physical Plan的一部分而非完整的 Physical Plan。 RACE的策略是获取AQE规则介入之前的整个Query的 Physical Plan,然后分析该Physical Plan中的算子是否全部可卸载。如果全部可以卸载,则对QueryStage进行Plan Conversion, 如果不能全部卸载,则跳过Plan Conversion转而直接交给Spark处理。

我们在实际测试过程中发现,一些算子例如Take操作,它需要处理的数据量非常小,那么即使发生Fallback,也不会有很大的行列转换开销,通过白名单机制忽略这种算子,防止全部回退到CPU,达到加速目的。

Metrics

RACE会收集DPU执行过程中的指标统计,然后上报给Spark的Metrics System做展示,以方便Debug和系统调优。

Native Read&Write

SparkSQL的Scan算子支持列式读取,但是Spark的向量与DPU中定义的向量不兼容,需要在JVM中进行一次列转行然后拷贝到DPU中,这会造成巨大的IO开销。我们主要有以下优化:

1. 减少行列转换:对于Parquet格式等列式存储格式的文件读取,SparkSQL采用的是按列读取的方式,即Scan算子是列式算子,但是后续数据过滤等数据处理算子均是基于行的算子,SparkSQL必须把列式数据转换为行式数据,这会导致额外的计算开销。而本方案由于都是列式计算的算子,因此无需这种行列转换。

2. 减少内存拷贝: RACE卸载Scan算子到HADOS平台,HADOS平台的DPUScan算子以Native库的方式加载磁盘数据直接复制到DPU,省去了JVM到DPU的拷贝开销

3. 谓词下推支持:DPUScan也支持ColumnPruning规则,这个规则会确保只有真正使用到的字段才会从这个数据源中提取出来。支持两种Filter:PartitionFilters和PushFilters。PartitionFilters可以过滤掉无用的分区, PushFilters把字段直接下推到Parquet文件中去

4. 同时,文件的写出也进行了类似的优化

注意,这些优化仍然需要对数据进行一次复制,DPU直接读取磁盘是一个后续的优化方向。

加速效果

TPC-DS 单Query加速

单机单线程local模式场景,在1T数据集下,TPC-DS语句中有5条语句E2E时间提升比例超过2倍,最高达到4.56倍:

wKgZomXYe0uAft0dAABd4UuWpbQ563.png

运算符加速效果

运算符的性能提升,DPU运算符相比Spark原生的运算符的加速比最高达到9.97。

wKgaomXcZN6AdzreAABYg2coTWU859.png

算子加速效果

TPC-DS的测试中,向对于原生Spark解决方案,本方案Filter算子性能最高提高到了43倍,哈希聚合算子提升了13倍。这主要是因为我们节省了列式数据转换为行式数据的开销以及DPU运算的加速。

wKgZomXcZN6AfOZRAABWbiuTNf8102.png

wKgZomXcZN-AExwpAABWKy9uNag479.png

CPU资源使用情况

CPU资源从平均60%下降到5%左右

原生Spark方案CPU使用情况:

wKgaomXcZOCAetkFAAJk9GZexsw526.png

基于RACE和DPU加速后,CPU使用情况:

wKgZomXcZOKAWpplAAHre-J6L50116.png

总结与展望

通过把Spark的计算卸载到DPU加速器上,在用户原有代码无需变更的情况下,端到端的性能可以得到2-5倍的提升,某些算子能达到43倍性能提升,同时CPU资源使用率从60%左右下降到5%左右,显著提升了原生SparkSQL的执行效率。DPU展现了强大的计算能力,对于端到端的分析,会有一些除去算子之外的因素影响整体运行时间,包括磁盘IO,网络Shuffle以及调度的Overhead。这些影响因素将来可以逐步去做特定的优化,例如:

1. 算子的Pipeline执行

原生Spark的算子Pipeline执行以及CodeGen都是Spark性能提升的关键技术,当前,我们卸载到DPU中的计算还没有支持Pipeline以及CodeGen。未来这两个技术的加入,是继续提升Spark的执行效率的一个方向。

2. 读数据部分,通过DPU卡直读磁盘数据来做优化

我们还可以通过DPU卡直接读取硬盘数据,省去主机DDR到DPU卡DDR的数据传输时间,以达到性能提升的效果,可以参考英伟达的GPU对磁盘读写的优化,官方数据CSV格式的文件读取可优化20倍左右。

3. RDMA技术继续提升Shuffle性能

对于Shuffle占比很高的作业,可以通过内存Shuffle以及RDMA技术,来提升整个Shuffle的过程,目前已经实现内存Shuffle,未来我们还可以通过RDMA技术直读远端内存数据,从而完成整个Shuffle链路的优化。

审核编辑 黄宇

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • DPU
    DPU
    +关注

    关注

    0

    文章

    294

    浏览量

    23966
  • SPARK
    +关注

    关注

    1

    文章

    99

    浏览量

    19729
  • RACE
    +关注

    关注

    0

    文章

    2

    浏览量

    2301
收藏 人收藏

    评论

    相关推荐

    《数据处理器:DPU编程入门》DPU计算入门书籍测评

    , 数据加速应用实践 。 分别介绍了DPU技术和英伟达DPU技术: DPU的诞生 现代计算架构及问题 数据解耦及大带宽数据应用 英伟达DPU
    发表于 12-24 10:54

    无法下载GRID-for-windows驱动程序(GRID 4.x3.x

    嗨,我无法下载GRID-for-windows驱动程序(4.x3.x)。我有一个GRID k1,想要用k1测试hyper-v remoteFX。我需要下载GRID-for-windows驱动程序
    发表于 10-08 14:22

    是否可以使用SPC5Studio 3.x进行STM32开发?

    使用SPC5Studio 3.x进行STM32开发?以上来自于谷歌翻译以下为原文 We do develment for SPC56xxx & stm32f4. I found description
    发表于 12-04 16:19

    Spark1x和2x的csv文件读取和写入方法

    Spark1x和2x如何读取和写入csv文件
    发表于 03-24 11:09

    米尔MYD-CZU3EG搭载Xilinx DPU,具备强大AI计算能力,你怎么看米尔这款产品?

    ` 9月25日,米尔官方发布,其MYD-CZU3EG开发板在原产品的基础上搭载了Xilinx深度学习处理单元DPU,该部分新功能的增加可以极大的提升产品数据处理与运行效率,为AI应用落地提供完整
    发表于 09-27 12:10

    如何去降低USB 3.x的插入损耗?

    如何去降低USB 3.x的插入损耗?有大神能解答这个问题吗
    发表于 07-15 06:26

    专⽤数据处理器 (DPU) 技术⽩⽪书

    本帖最后由 jf_35791075 于 2022-3-14 17:37 编辑 ⽬录内容提要71.DPU技术发展概况91.1. 什么是DPU 91.2. DPU的发展背景 121.
    发表于 03-14 17:35

    英伟达DPU的过“芯”之处

    核心,从服务器的应用加速扩展到服务器的全部应用场景,从而实现在数据中心服务器领域的更大突破,目标自然是剑指英特尔CPU为代表的X86服务器生态。而在考察DPU挑战CPU霸主地位的可能性之前,我们可以简单
    发表于 03-29 14:42

    【书籍评测活动NO.23】数据处理器:DPU编程入门

    开启自己的开发之旅。 DPU + GPU + CPU “3U”一体的新数据中心架构是计算的未来 基于DPU可以实现软件定义、硬件加速的数据中心基础设施,极具灵活性、扩展性、可编程性,
    发表于 10-24 15:21

    什么是DPU

    DPU则进行数据处理。” 什么是DPU? 数据处理器 基于行业标准,高性能及软件可编程的多核CPU 高性能网络接口 灵活、可编程的加速引擎 DPU有什么与众不同之处?
    发表于 11-03 10:55

    《数据处理器:DPU编程入门》+初步熟悉这本书的结构和主要内容

    ,并提高系统的响应速度和隐私保护。 4. 高性能计算:DPU可以在高性能计算领域中使用,例如科学研究、天气预报、金融模拟和大规模数据处理等。它可以提供更大的并行性和计算能力,加速复杂计算任务的执行。 3
    发表于 12-08 18:03

    在BlueField DPU上通过DOCA加速方案开发

    DOCA 中的驱动、库、服务和应用示例等。您将能够使用 NVIDIA DPU 创建更先进、更高效的解决方案,加速基础设施服务。使用 DOCA 提高数据中心的性能、效率、安全性和可管理性。
    的头像 发表于 04-14 15:51 1590次阅读
    在BlueField <b class='flag-5'>DPU</b>上通过DOCA<b class='flag-5'>加速</b>方案开发

    利用Apache Spark和RAPIDS Apache加速Spark实践

      在第三期文章中,我们详细介绍了如何充分利用 Apache Spark 和 Apache RAPIDS 加速Spark 。 大多数团队都会通过干净地使用 Spark 的数据帧抽象
    的头像 发表于 04-26 17:39 1554次阅读
    利用Apache <b class='flag-5'>Spark</b>和RAPIDS Apache<b class='flag-5'>加速</b><b class='flag-5'>Spark</b>实践

    中科驭数发布软件开发平台HADOS 2.0 释放DPU极致性能

    近日,中科驭数在2022 CNCC 中国计算大会上发布了HADOS敏捷异构软件平台2.0版本。该平台可为驭数DPU系列产品提供通用且编程友好的软件框架,充分释放DPU产品性能,有望成为国内DP
    的头像 发表于 12-28 11:52 1057次阅读

    Spark基于DPU Snappy压缩算法的异构加速方案

    一、总体介绍 1.1 背景介绍 Apache Spark是专为大规模数据计算而设计的快速通用的计算引擎,是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些不同之处
    的头像 发表于 03-26 17:06 210次阅读
    <b class='flag-5'>Spark</b>基于<b class='flag-5'>DPU</b> Snappy压缩算法的异构<b class='flag-5'>加速</b>方案