0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Apache Pulsar的特性

科技绿洲 来源:了不起 作者:了不起 2023-09-25 11:45 次阅读

Apache Pulsar

Apache Pulsar是Apache软件基金会顶级项目,是下一代云原生分布式消息流平台。

Pulsar 作为下一代云原生分布式消息流平台,支持 多租户、持久化存储、多机房跨区域数据复制 ,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性, 内置诸多其他系统商业版本才有的特性,是云原生时代解决实时消息流数据传输、存储和计算的最佳解决方案。

图片

Pulsar简介

  • 系统架构

图片

  • 功能特色
    租户和命名空间(namespace)是 Pulsar 支持多租户的两个核心概念。在租户级别,Pulsar 为特定的租户预留合适的存储空间、应用授权与认证机制。在命名空间级别,Pulsar 有一系列的配置策略(policy),包括存储配额、流控、消息过期策略和命名空间之间的隔离策略。
    Pulsar 做了队列模型和流模型的统一,在 Topic 级别只需保存一份数据,同一份数据可多次消费。以流式、队列等方式计算不同的订阅模型大大提升了灵活度。
    Pulsar 使用计算与存储分离的云原生架构,数据从 Broker 搬离,存在共享存储内部。上层是无状态 Broker,复制消息分发和服务;下层是持久化的存储层 Bookie 集群。Pulsar 存储是分片的,这种构架可以避免扩容时受限制,实现数据的独立扩展和快速恢复。
    Pulsar 原生支持跨地域复制,因此 Pulsar 可以跨不同地理位置的数据中心复制数据。当数据中心中断或网络分区时,在多个数据中心存有消息副本尤为重要,提高可用性。
    Pulsar Functions 是基于 Pulsar 的轻量级流处理方式。Pulsar Functions 直接部署在 broker 节点上(或作为 Kubernetes 集群中的容器)。通过 Pulsar Functions,Pulsar 可以直接解决许多流处理任务,简化操作。
  • 支持客户端
    • Java 客户端
    • C++ 客户端
    • .Net/C# 客户端
    • Go 客户端
    • NodeJS 客户端
    • Ruby 客户端

Pulsar安装与部署

目前Pulsar不支持Window,下面通过Docker进行安装,可以参考官网https://pulsar.apache.org/docs/next/getting-started-docker/

同时可以安装Pulsar Manager,具体操作可以参考官方文档 https://pulsar.apache.org/docs/next/administration-pulsar-manager/

其中Pulsar Manager 是一个网页式可视化管理与监测工具,支持多环境下的动态配置。可用于管理和监测租户、命名空间、topic、订阅、broker、集群等。

  1. window环境使用docker推荐使用Docker Desktop,和linux一样可以通过docker命令管理镜像、部署容器等操作。

打开并启动Docker Desktop后,在终端执行命令执行

_> docker search pulsar

可以查询到pulsar相关的镜像

图片

  1. 镜像下载

这里我们选择分别下载红框的两个镜像,执行命令

_> docker pull apachepulsar/pulsar _> docker pull apachepulsar/pulsar-manager

  1. 启动
  • 启动Pulsar
docker run -it -p 6650:6650 -p 8080:8080 
      --mount source=pulsardata,target=/pulsar/data 
      --mount source=pulsarconf,target=/pulsar/conf 
      apachepulsar/pulsar bin/pulsar standalone
  • 启动Pulsar Manager
docker run --name pulsar-manager -dit 
      -p 9527:9527 -p 7750:7750 
      -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties 
      apachepulsar/pulsar-manager

添加用户:

for /f "tokens=1" %A in ('curl http://localhost:7750/pulsar-manager/csrf-token') do set CSRF_TOKEN=%A
curl -X PUT "X-XSRF-TOKEN: %CSRF_TOKEN%"   -H "Cookie: XSRF-TOKEN=%CSRF_TOKEN%;" 
  -H "Content-Type: application/json" -d "{"name": "admin", "password": "123456", "description": "super user admin", "email": "admin@test.com"}" 
  "http://localhost:7750/pulsar-manager/users/superuser"

访问:

http://localhost:9527/ 
用户名密码:admin/123456

配置environments:

这里需要保证Pulsar Manager应用服务能够访问到Pulsar应用,由于都是通过Docker部署,配置Service URL需要使用网络IP,不要用localhost

图片

管理界面:

图片

Pulsar与SpringBoot集成

  • springboot version : 2.3.7.RELEASE
  • pulsar client: 2.10.2
  1. 通过Properties简单定义一些Broker相关的属性
@Data
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
    
    private String cluster;
    
    private String namespace;

    private String serverUrl;

    private String token;
}
  1. 通过配置定义了一些常用的组件,比如生产、消费工厂
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
public class PulsarBootstrapConfiguration {

    private final PulsarProperties properties;

    public PulsarBootstrapConfiguration(PulsarProperties properties) {
        this.properties = properties;
    }

    @Bean(destroyMethod = "close")
    public PulsarClient pulsarClient() throws PulsarClientException {
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(properties.getServerUrl());
        return clientBuilder.build();
    }

    @Bean
    public PulsarProducerFactory pulsarProducerFactory() throws PulsarClientException {
        return new PulsarProducerFactory(pulsarClient(), properties);
    }

    @Bean
    public PulsarConsumerFactory pulsarConsumerFactory() throws PulsarClientException {
        return new PulsarConsumerFactory(pulsarClient(), properties);
    }

}
  1. 启动服务,在服务启动后,通过实现SmartInitializingSingleton接口,完成容器基本启动(不包含Lazy的Bean)后,开始对消费者Consumer监听
@Slf4j
@SpringBootApplication
public class PulsarApplication implements SmartInitializingSingleton {

    @Autowired
    private PulsarConsumerFactory consumerFactory;

    public static void main(String[] args) {
        SpringApplication.run(PulsarApplication.class,args);
    }

    @Override
    public void afterSingletonsInstantiated() {
        startConsumerListener();
    }

    private void startConsumerListener(){
        Consumer< String > consumer = createConsumer();
        if( consumer != null ){
            while (!Thread.currentThread().isInterrupted()){
                CompletableFuture< ? extends Message< ? >> completableFuture = consumer.receiveAsync();
                Message< ? > message = null;
                try {
                    message = completableFuture.get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("错误",e);
                } catch (ExecutionException e) {
                    log.error("错误",e);
                }

                if( message!=null ){
                    try {
                        log.info(" 接收消息:{} ", message.getValue() );
                        consumer.acknowledge(message);
                    } catch (PulsarClientException e) {
                        consumer.negativeAcknowledge(message);
                        throw new RuntimeException(e);
                    }
                }
            }
        }
    }

    private Consumer< String > createConsumer() {
        try {
            return consumerFactory.getConsumer(Constants.TOPIC_DEMO);
        } catch (PulsarClientException e) {
            log.error("创建consumer出错:{}", e.getMessage(),e);
        }
        return null;
    }
}
  1. 消息发送测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class PulsarBootTests {

    @Autowired
    private PulsarProducerFactory producerFactory;

    @Test
    public void sendMessage() throws PulsarClientException {
        Producer producer = producerFactory.getProducer(Constants.TOPIC_DEMO);

        producer.send(" 测试消息: " + new Date());

        producer.close();
    }

}
  1. 检查消息接收情况
2023-02-05 12:05:14.043  INFO 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [TOPIC_DEMO] [sub-TOPIC_DEMO] [7c2b2] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2023-02-05 12:06:16.425  INFO 23472 --- [           main] com.sucl.pulsar.PulsarApplication        :  接收消息: 测试消息: Sun Feb 05 12:06:16 CST 2023

结束语

该篇主要通过官网对Apache Pulsar做了简单的了解与尝试,同时基于SpringBoot,以简单的示例代码实现了消息的发送与接收,其中各个组件仅仅使用了默认的配置,在生产环境需要根据Pulsar的特性以及官方API使其具有扩展性与易用性。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据传输
    +关注

    关注

    9

    文章

    1530

    浏览量

    63567
  • 存储
    +关注

    关注

    12

    文章

    3859

    浏览量

    84670
  • 容器
    +关注

    关注

    0

    文章

    481

    浏览量

    21883
  • Apache
    +关注

    关注

    0

    文章

    64

    浏览量

    12313
收藏 人收藏

    评论

    相关推荐

    Apache Spark 1.6预览版新特性展示

    日前,Databricks公司发布了一个Apache Spark主要版本的可用性。除了可用性、可移植性等几个新的特性外,本次发布还提供了对尚未发布的Apache Spark 1.6预览
    发表于 10-13 11:21 0次下载
    <b class='flag-5'>Apache</b> Spark 1.6预览版新<b class='flag-5'>特性</b>展示

    AD7626:16位、10 MSPS、PulSAR差分ADC

    AD7626:16位、10 MSPS、PulSAR差分ADC
    发表于 03-20 17:34 2次下载
    AD7626:16位、10 MSPS、<b class='flag-5'>PulSAR</b>差分ADC

    AN-931: 了解PulSAR ADC支持电路

    AN-931: 了解PulSAR ADC支持电路
    发表于 03-20 21:48 1次下载
    AN-931: 了解<b class='flag-5'>PulSAR</b> ADC支持电路

    AD7961:16位、5 MSPS PulSAR差分ADC

    AD7961:16位、5 MSPS PulSAR差分ADC
    发表于 03-20 22:32 4次下载
    AD7961:16位、5 MSPS <b class='flag-5'>PulSAR</b>差分ADC

    AD7960:18位、5 MSPS PulSAR

    AD7960:18位、5 MSPS PulSAR
    发表于 03-21 09:23 11次下载
    AD7960:18位、5 MSPS <b class='flag-5'>PulSAR</b>

    AD7625: 16位、6 MSPS、 PulSAR差分ADC

    AD7625: 16位、6 MSPS、 PulSAR差分ADC
    发表于 03-21 10:21 5次下载
    AD7625:  16位、6 MSPS、 <b class='flag-5'>PulSAR</b>差分ADC

    10引脚Pulsar®家庭评估板用户指南

    10引脚Pulsar®家庭评估板用户指南
    发表于 05-24 16:50 11次下载
    10引脚<b class='flag-5'>Pulsar</b>®家庭评估板用户指南

    PULSAR模拟电压可变衰减器特性

    输入功率为+27DBM。PULSAR模拟电压控制衰减器是双向的:任何的SMA端口可适用于输入。 特性 操作电流电压:0至+8V(典型); 工作功率:=0DBM; 功率处理:最大+27DBM; 工作温度
    发表于 11-10 15:38 629次阅读

    Apache与Weblogic的整合

    Apache与Weblogic的整合(电源技术论文录用后可以改作者吗)-Apache与Weblogic的整合                       
    发表于 08-31 11:24 7次下载
    <b class='flag-5'>Apache</b>与Weblogic的整合

    Apache2+tomcat5.5集群及Apache负载均衡配置实例

    Apache2+tomcat5.5集群及Apache负载均衡配置实例(新星电源技术有限公司)-Apache2+tomcat5.5集群及Apache负载均衡配置实例     
    发表于 08-31 12:16 0次下载
    <b class='flag-5'>Apache</b>2+tomcat5.5集群及<b class='flag-5'>Apache</b>负载均衡配置实例

    Linux的apache

    Linux的apache(ups电源技术转让)-Linux的apache,有需要的可以参考!
    发表于 08-31 16:17 1次下载
    Linux的<b class='flag-5'>apache</b>

    Apache Spark 3.2有哪些新特性

    经过七轮投票, Apache Spark 3.2 终于正式发布了。Apache Spark 3.2 已经是 Databricks Runtime 10.0 的一部分,感兴趣的同学可以去试用一下。按照
    的头像 发表于 11-17 14:09 1564次阅读

    Pulsar NodeJS客户端

    ./oschina_soft/pulsar-client-node.zip
    发表于 06-17 11:33 0次下载
    <b class='flag-5'>Pulsar</b> NodeJS客户端

    Apache Doris正式成为 Apache 顶级项目

    全球最大的开源软件基金会 Apache 软件基金会(以下简称 Apache)于美国时间 2022 年 6 月 16 日宣布,Apache Doris 成功从 Apache 孵化器毕业,
    的头像 发表于 06-17 14:08 768次阅读

    什么是Apache日志?Apache日志分析工具介绍

    Apache Web 服务器在企业中广泛用于托管其网站和 Web 应用程序,Apache 服务器生成的原始日志提供有关 Apache 服务器托管的网站如何处理用户请求以及访问您的网站时经常遇到的错误的重要信息。
    的头像 发表于 01-04 10:09 249次阅读