0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

关于Spark的从0实现30s内实时监控指标计算

佳佳 来源:jf_36786605 作者:jf_36786605 2024-06-14 15:52 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

前言

说起Spark,大家就会自然而然地想到Flink,而且会不自觉地将这两种主流的大数据实时处理技术进行比较。然后最终得出结论:Flink实时性大于Spark。

的确,Flink中的数据计算是以事件为驱动的,所以来一条数据就会触发一次计算,而Spark基于数据集RDD计算,RDD最小生成间隔就是50毫秒,所以Spark就被定义为亚实时计算。

窗口Window

这里的RDD就是“天然的窗口”,将RDD生成的时间间隔设置成1min,那么这个RDD就可以理解为“1min窗口”。所以如果想要窗口计算,首选Spark。

但当需要对即临近时间窗口进行计算时,必须借助滑动窗口的算子来实现。

临近时间如何理解

例如“3分钟内”这种时间范围描述。这种时间范围的计算,需要计算历史的数据。例如1 ~ 3是3min,2 ~ 4也是3min,这里就重复使用了2和3的数据,依次类推,3 ~ 5也是3min,同样也重复使用了3和4。

如果使用普通窗口,就无法满足“最近3分钟内”这种时间概念。

很多窗口都丢失了临近时间,例如第3个RDD的临近时间其实是第二个RDD,但是他们就没法在一起计算,这就是为什么不用普通窗口的原因。

滑动窗口

滑动窗口三要素:RDD的生成时间、窗口的长度、滑动的步长。

我在本次实践中,将RDD的时间间隔设置为10s,窗口长度为30s、滑动步长为10s。也就是说每10s就会生成一个窗口,计算最近30s内的数据,每个窗口由3个RDD组成。

数据源构建

1. 数据规范

假设我们采集了设备的指标信息,这里我们只关注吞吐量和响应时间,在采集之前定义数据字段和规范[throughput, response_time],这里都定义成int类型,响应时间单位这里定义成毫秒ms。

实际情况中,我们不可能只采集一台设备,如果我们想要得出每台或者每个种类设备的指标监控,就要在采集数据的时候对每个设备加上唯一ID或者TypeID。

我这里的想法是对每台设备的指标进行分析,所以我给每个设备都增加了一个唯一ID,最终字段[id, throughput, response_time],所以我们就按照这个数据格式,在SparkStreaming中构建数据源读取部分。

2. 读取kafka

代码语言:scala

复制

val conf = new SparkConf().setAppName("aqi").setMaster("local[1]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "121.91.168.193:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "aqi",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (true: java.lang.Boolean)
)

val topics = Array("evt_monitor")
val stream: DStream[String] = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
).map(_.value)

这里我们将一个RDD时间间隔设置为10S,因为使用的是笔记本跑,所以这里要将Master设置为local,表示本地运行模式,1代表使用1个线程。

我们使用Kafka作为数据源,在读取时就要构建Consumer的config,像bootstrap.servers这些基本配置没有什么好说的,关键的是auto.offset.reset和enable.auto.commit,

这两个参数分表控制读取topic消费策略和是否提交offset。这里的earliest会从topic中现存最早的数据开始消费,latest是最新的位置开始消费。

当重启程序时,这两种消费模式又被enable.auto.commit控制,设置true提交offset时,earliest和latest不再生效,都是从消费组记录的offset进行消费。设置为false不提交offset,offset不被提交记录earliest还是从topic中现存最早的数据开始消费,latest还是从最新的数据消费。

最后就是设置要读取的topic和创建Kafka的DStream数据流。至此,整个数据源的读取就已经完成了,下面就是对数据处理逻辑的开发。

3. 指标聚合计算

代码语言:scala

复制

stream.map(x => {
      val s = x.split(",")
      (s(0), (s(2).toInt, 1))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      .reduceByKeyAndWindow((x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2), Seconds(30), Seconds(10))
    .foreachRDD(rdd => {
      rdd.foreach(x => {
        val id = x._1
        val responseTimes = x._2._1
        val num = x._2._2
        val responseTime_avg = responseTimes / num
        println(id, responseTime_avg)
      })
    })

我们从自身需求出发,来构思程序逻辑的开发。从需求看,关键字无非是最近一段时间内、平均值。想要取一段时间内的数据,就要使用滑动窗口,以当前时间为基准,向前圈定时间范围。

而平均值,无非就是将时间范围内,即窗口所有的响应时间加起来,然后除以数据条数即可。想要把所有的响应时间加起来,这里使用reduceByKey() 将窗口内相同ID的设备时间相加,将数据条数进行相加。

所以我在第一步切分数据的时候,就将数据切分成KV的元组形式,V有两个字段,第一个是响应时间,第二个1表示一条数据。reduceByKey一共分为两步,第一是RDD内的reduceByKey,这也算是数据的预处理,RDD的数据只会计算一次,当这个RDD被多个窗口使用,就不会重复计算了。第二步是基于窗口的reduceByKey,将窗口所有RDD的数据再一次聚合,最后在foreachRDD中获取输出

4. 验证结果

我们向kafka的evt_monitor这个topic中写入数据。

备注:(最后11那个id是终端显示问题,其实是1),然后可以输出平均值。

验证结果是没有问题的,换个角度,我们也可以从DAG来看。

这个窗口一共计算了3个RDD,其中左侧的两个是灰色的,上面是skipped标识,代表着这两个RDD在上一个窗口已经计算完成了,在这个窗口只需要计算当前的RDD,然后再一起对RDD的结果数据进行窗口计算。

结语

本篇文章主要是利用Spark的滑动窗口,做了一个计算平均响应时长的应用场景,以Kafka作为数据源、通过滑动窗口和reduceByKey算子得以实现。同时,开发Spark还是强烈推荐scala,整个程序看起来没有任何多余的部分。

最后对于Spark和Flink的选型看法,Spark的确是在实时性上比Flink差一些,但是Spark对于窗口计算还是有优势的。所以对于每种技术,也不用人云亦云,适合自己的才是最好的。

审核编辑 黄宇

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • RDD
    RDD
    +关注

    关注

    0

    文章

    7

    浏览量

    8159
  • 实时监控
    +关注

    关注

    1

    文章

    124

    浏览量

    14222
  • SPARK
    +关注

    关注

    1

    文章

    108

    浏览量

    21113
收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    esp32s3多连接机BLE设备出现超时断开连接的现象

    基于V5.5.1版本的gattc_gatts_cox例程修改;同时也修改了menuconfig里的配置;在都连接上四个机后;在数据传输过程一段时间(几分钟或者更久)会出现超时断开连接的现象;有什么解决办法;其中机在30s后连
    发表于 12-02 09:18

    【CPKCOR-RA8D1】打造一个迷你系统监控中心:ADC电压与温度实时显示

    本文将带领大家在CPKCOR-RA8D1开发板上,实现一个兼具实用与观赏性的“迷你系统监控中心”。项目基于MIPI显示屏,实时可视化地展示ADC采集的电压值以及MCU内部温度,让您对系统状态一目了然
    发表于 10-30 15:14

    怎样确定实时校验机制的验证指标

    确定实时校验机制的验证指标,需遵循 “ 风险导向 + 场景适配 + 标准量化 ” 原则,围绕 “ 准确性(防漏判 / 误判)、抗干扰性(应对复杂环境)、安全性(防篡改)、稳定性(长期可靠) ” 四大
    的头像 发表于 10-11 17:03 693次阅读

    NVIDIA DGX Spark桌面AI计算机开启预订

    DGX Spark 现已开启预订!丽台科技作为 NVIDIA 授权分销商,提供产品到服务的一站式解决方案,助力轻松部署桌面 AI 计算机。
    的头像 发表于 09-23 17:20 919次阅读
    NVIDIA DGX <b class='flag-5'>Spark</b>桌面AI<b class='flag-5'>计算</b>机开启预订

    DTU 30s后没有指令,会自动断开连接?

    DTU 30s后没有指令,会自动断开连接? 30s后没有指令,DTU就会自动断开连接,如果需要DTU不自动断开,在30s发送查询命令作为心跳包。
    发表于 08-07 07:38

    如何评估协议分析仪的性能指标

    评估协议分析仪的性能指标硬件处理能力、协议解析精度、实时响应效率、扩展性与兼容性、用户体验五大维度综合考量。以下是具体指标及评估方法,结合实际场景说明其重要性:一、硬件处理能力:决
    发表于 07-18 14:44

    网络化多电机伺服系统监控终端设计

    较少,只能实现基本的系统状态监控和报警等功能。同时,需要现场对每个电机驱动器参数逐一进行设定,不便于系统的使用和调试\"1。因此,针对基于CAN总线的多电机伺服系统,设计一种实时性高、运行
    发表于 06-23 07:15

    边缘计算网关在水产养殖尾水处理中的实时监控应用

    ,某大型水产养殖企业决定引入先进的 YC-GR90-S工业智能网关 技术,对尾水处理过程进行远程监控和管理。 二、项目需求 设备远程监控: 需要实时
    的头像 发表于 06-06 14:36 438次阅读
    边缘<b class='flag-5'>计算</b>网关在水产养殖尾水处理中的<b class='flag-5'>实时</b><b class='flag-5'>监控</b>应用

    自媒体推广实时监控服务器带宽到用户行为解决方法

    自媒体推广的实时监控需要从底层基础设施到前端用户行为进行全链路覆盖,确保推广活动的稳定性和效果可追踪。以下是系统性解决方案,主机推荐小编为您整理发布自媒体推广实时监控
    的头像 发表于 04-09 10:47 461次阅读

    边缘计算网关的实时监控与预测性维护都有哪些方面?适合哪些行业使用?

    边缘计算网关的实时监控与预测性维护都有哪些方面?适合哪些行业使用? 有实施过得案例的介绍吗? 深控技术的不需要点表的边缘计算网关如何?
    发表于 04-01 09:44

    NVIDIA 宣布推出 DGX Spark 个人 AI 计算

    的 DGX™ 个人 AI 超级计算机。   DGX Spark(前身为 Project DIGITS)支持 AI 开发者、研究人员、数据科学家和学生,在台式电脑上对大模型进行原型设计、微调和推理。用
    发表于 03-19 09:59 504次阅读
       NVIDIA 宣布推出 DGX <b class='flag-5'>Spark</b> 个人 AI <b class='flag-5'>计算</b>机

    HarmonyOS NEXT 原生应用/元服务-DevEco Profiler性能问题定界实时监控

    不同的图像形式(直方图、柱状图、折线图等)来更加清晰的展示某一项资源在一段时间范围的变化趋势,以帮助您快速判断性能热点区域。 整个实时监控页面从上到下,依次展示了系统事件、异常事件、前台应用、CPU
    发表于 02-21 14:35

    HarmonyOS NEXT 原生应用/元服务-DevEco Profiler性能问题定界实时监控

    不同的图像形式(直方图、柱状图、折线图等)来更加清晰的展示某一项资源在一段时间范围的变化趋势,以帮助您快速判断性能热点区域。 整个实时监控页面从上到下,依次展示了系统事件、异常事件、前台应用、CPU
    发表于 02-20 10:14

    输电线路防外破防异物实时监控预警装置|场景模型真实还原|测距误差在0.25米

    输电线路防外破防异物实时监控预警装置|场景模型真实还原|测距误差在0.25米 输电线路防外破防异物实时监控预警装置针对输电线路通道走廊
    的头像 发表于 01-21 09:48 683次阅读

    可与MES系统集成的数据采集监控平台

    有用的指标和报表。 提供实时监控、故障预警、产能优化等数据分析功能,为决策提供支持。 数据分发与集成: 将处理后的数据按需分发给各个业务系统或用户。 实现与MES、ERP等系统的无缝集
    发表于 12-16 15:08