0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Apache Storm是什么

汽车电子技术 来源:码农与软件时代 作者:码农与软件时代 2023-02-20 15:34 次阅读

一、概念

Apache Storm作为流数据的实时处理框架,官网给出了如下模型:

图片

图中“水龙头”便是spout [spaʊt] 出水管,“闪电”便是bolt [bəʊlt],“箭头”表达的是数据的流转,“水龙头”、“闪电”和“箭头”组成的有向无环图称为Topology(拓扑)。

使用Storm框架进行流数据的实时处理,就需要编写“水龙头”和“闪电”的处理逻辑,并将它们通过Topology串接在一起,构建实时处理的业务逻辑。

具体的做法是:

(1)实时数据源,如kafka,接入到“水龙头”Spout中;

(2)Spout读取源数据并不断地发出数据到后续Bolt中,这些数据称为Tuple(元组);

(3)Bolt对发送过来的数据Tuple进行处理,完成数据流转换;

读到这里,可能还是很迷惑,我们以常见的示例统计词频(heart.txt)来进行说明:

Take me to your heart
Take me to your soul
Give me your hand and hold me
Show me what love is
Be my guiding star
It's easy take me to your heart
Standing on a mountain high
Looking at the moon through a clear blue sky

我们可以设计一个topology:

WordSourceSpout:读取heart.txt,并逐行发送数据流Stream,每行即为一个Tuple;
WordSplitBolt:拆分Tuple,并将单词Tuple发出到下个Bolt;
WordCountBolt:对单词的频率进行累加计算;

二、编程

1.Topology是如何构建的?

Topology是通过TopologyBuilder来构建的,提供setSpout和setBolt方法来配置Spout和Bolt,这两个方法都具有3个参数,比较类似,以setSpout为例,第1个参数表示Stream的名称,第2个参数表示stream的处理对象,第3个参数表示并发数,也就是同时运行多少个任务来处理Stream。先来看一段代码:

TopologyBuilder topologyBuilder = new TopologyBuilder();
WordSourceSpout spout = new WordSourceSpout();
WordSplitBolt splitBlot = new WordSplitBolt();
WordCountBolt countBlot = new WordCountBolt();
topologyBuilder.setSpout("sentences", spout, 2);
topologyBuilder.setBolt("split",splitBlot , 8).shuffleGrouping("sentences");
topologyBuilder.setBolt("count",countBlot , 8).fieldGrouping("split",new Fields(“word”));

上面定义了两个Bolt,它们之间数据流的关联关系:第1个Bolt声明其输出Stream的名称为split,而第2个Bolt订阅的Stream为split。countBlot 通过fieldGroupings()在word上具有相同字段的所有Tuple发送到同一个任务中进行统计。

2.Spout和Bolt是如何定义的?

编程模型中,Spout和Bolt都称为组件Component。

WordSourceSpout 需要继承BaseRichSpout,其类结构关系为:

BaseRichSpout--继承--BaseComponent--实现--IComponent
BaseRichSpout--实现--IRichSpout--实现--ISpout

ISpout接口的定义为:

public interface ISpout extends Serializable {
    /**
     * Called when a task for this component is initialized within a worker on the cluster. It provides the spout with the environment in
     * which the spout executes.
     *
     * 

This includes the: * * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster * configuration on this machine. * @param context This object can be used to get information about this task's place within the topology, including the task id and * component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and * close methods. The collector is thread-safe and should be saved as an instance variable of this spout object. */ void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector); /** * Called when an ISpout is going to be shutdown. There is no guarentee that close will be called, because the supervisor kill -9's * worker processes on the cluster. * *

The one context where close is guaranteed to be called is a topology is killed when running Storm in local mode. */ void close(); /** * Called when a spout has been activated out of a deactivated mode. nextTuple will be called on this spout soon. A spout can become * activated after having been deactivated when the topology is manipulated using the `storm` client. */ void activate(); /** * Called when a spout has been deactivated. nextTuple will not be called while a spout is deactivated. The spout may or may not be * reactivated in the future. */ void deactivate(); /** * When this method is called, Storm is requesting that the Spout emit tuples to the output collector. This method should be * non-blocking, so if the Spout has no tuples to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short * amount of time (like a single millisecond) so as not to waste too much CPU. */ void nextTuple(); /** * Storm has determined that the tuple emitted by this spout with the msgId identifier has been fully processed. Typically, an * implementation of this method will take that message off the queue and prevent it from being replayed. */ void ack(Object msgId); /** * The tuple emitted by this spout with the msgId identifier has failed to be fully processed. Typically, an implementation of this * method will put that message back on the queue to be replayed at a later time. */ void fail(Object msgId); }

WordCountBolt需要继承BaseBasicBolt,其类结构关系为:

BaseBasicBolt--继承--BaseComponent--实现--IBasicBolt--IComponent

IBasicBolt接口的定义为:

public interface IBasicBolt extends IComponent {
    void prepare(Map topoConf, TopologyContext context);


    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     *
     * 

All acking is managed for you. Throw a FailedException if you want to fail the tuple. */ void execute(Tuple input, BasicOutputCollector collector); void cleanup(); }

IComponent接口的定义:

/**
 * Common methods for all possible components in a topology. This interface is used when defining topologies using the Java API.
 */
public interface IComponent extends Serializable {


    /**
     * Declare the output schema for all the streams of this topology.
     *
     * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
     */
    void declareOutputFields(OutputFieldsDeclarer declarer);


    /**
     * Declare configuration specific to this component. Only a subset of the "topology.*" configs can be overridden. The component
     * configuration can be further overridden when constructing the topology using {@link TopologyBuilder}
     */
    Map<String, Object> getComponentConfiguration();


}

Storm框架基本逻辑为:

Spout组件通过Open方法进行SpoutOutputCollector(Spout输出收集器)的初始化,Storm调用Spout组件的nextTuple方法请求tuple时,便通过SpoutOutputCollector的emit方法发送一个tuple。Bolt组件通过execute方法接收到tuple,并对tuple进行数据处理。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 逻辑
    +关注

    关注

    2

    文章

    818

    浏览量

    29293
  • 编写
    +关注

    关注

    0

    文章

    29

    浏览量

    8372
  • Storm框架
    +关注

    关注

    0

    文章

    3

    浏览量

    1494
收藏 人收藏

    评论

    相关推荐

    Storm使用场景

    Storm基础(一):架构和组件
    发表于 06-11 16:37

    Storm高阶之Trident的全面介绍

    Storm高阶(一):Trident
    发表于 07-29 10:18

    雅虎机器学习平台CaffeOnSpark解读

    Andy Feng是Apache Storm的Committer,同时也是雅虎公司负责大数据与机器学习平台的副总裁。他带领雅虎机器学习团队基于开源的Spark和Caffe开发了深度学习框架
    发表于 10-10 11:46 0次下载
    雅虎机器学习平台CaffeOnSpark解读

    怎样在Docker Swarm上部署Apache Storm

    本文是一篇来源于Baqend Tech博客的客座转贴,描述了如何在Docker Swarm,而不是在虚拟机上部署和调配Apache Storm集群。这个题目很有意思,Wolfram
    发表于 10-10 14:24 0次下载
    怎样在Docker Swarm上部署<b class='flag-5'>Apache</b> <b class='flag-5'>Storm</b>

    如何利用Storm完成实时分析处理数据

    Storm本身是Apache托管的开源的分布式实时计算系统,它的前身是Twitter Storm。在Storm问世以前,处理海量的实时数据信息,大部分是类似于使用消息队列,加上工作进程
    发表于 04-26 15:30 7930次阅读
    如何利用<b class='flag-5'>Storm</b>完成实时分析处理数据

    Storm环境下基于权重的任务调度算法

    大数据流式计算平台Apache Storm默认采用轮询的方式进行任务调度,未考虑到拓扑中各任务计算开销的差异以及任务之间不同类型的通信模式,在负载均衡和通信开销方面存在较大的优化空间。针对这一
    发表于 04-17 10:52 0次下载
    <b class='flag-5'>Storm</b>环境下基于权重的任务调度算法

    探讨Apache kafka在部署可伸缩物联网解决方案中所扮演的角色

    中由Apache stormApache spark和Apache hadoop集群提供支持的数据处理管道的网关。
    发表于 07-21 09:37 466次阅读

    一种基于Apache Storm的增量式FFT方法

    针对传统单机版批处理式的快速傅里叶变换( Fast fourier transfor,FFT)难以满足工业生产现场海量流数据实时处理的需求,提出一种基于Δ pache Storm的增量式FFT方法
    发表于 04-28 14:44 10次下载
    一种基于<b class='flag-5'>Apache</b> <b class='flag-5'>Storm</b>的增量式FFT方法

    Apache2+tomcat5.5集群及Apache负载均衡配置实例

    Apache2+tomcat5.5集群及Apache负载均衡配置实例(新星电源技术有限公司)-Apache2+tomcat5.5集群及Apache负载均衡配置实例     
    发表于 08-31 12:16 0次下载
    <b class='flag-5'>Apache</b>2+tomcat5.5集群及<b class='flag-5'>Apache</b>负载均衡配置实例

    Linux的apache

    Linux的apache(ups电源技术转让)-Linux的apache,有需要的可以参考!
    发表于 08-31 16:17 1次下载
    Linux的<b class='flag-5'>apache</b>

    Storm-Engine基于C++的开源游戏引擎

    ./oschina_soft/storm-engine.zip
    发表于 06-16 10:05 0次下载
    <b class='flag-5'>Storm</b>-Engine基于C++的开源游戏引擎

    Apache Doris正式成为 Apache 顶级项目

    全球最大的开源软件基金会 Apache 软件基金会(以下简称 Apache)于美国时间 2022 年 6 月 16 日宣布,Apache Doris 成功从 Apache 孵化器毕业,
    的头像 发表于 06-17 14:08 766次阅读

    Apache Storm的安装部署

    Storm是一个免费开源的分布式实时计算系统。分布式意味着Storm是一个集群,部署在多台机器上。实时便是实时计算,相比于MapReduce的批处理,实时更关注于数据处理的速度和延时。
    的头像 发表于 02-20 15:41 714次阅读
    <b class='flag-5'>Apache</b> <b class='flag-5'>Storm</b>的安装部署

    大数据平台有哪些 大数据技术应用有哪些

    。   2. 实时数据处理平台:Apache Kafka、Apache StormApache Ignite等,专注于实时数据处理和流计算,适用于流媒体、监控和物联网等场景。
    的头像 发表于 04-16 16:14 1.1w次阅读

    什么是Apache日志?Apache日志分析工具介绍

    Apache Web 服务器在企业中广泛用于托管其网站和 Web 应用程序,Apache 服务器生成的原始日志提供有关 Apache 服务器托管的网站如何处理用户请求以及访问您的网站时经常遇到的错误的重要信息。
    的头像 发表于 01-04 10:09 244次阅读