0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

利用KoP如何将Pulsar数据快速且无缝接入Apache Doris

电子工程师 来源:OSC开源社区 作者:OSC开源社区 2022-08-08 15:13 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

KoP 架构介绍

KoP 是 Kafka on Pulsar 的简写,顾名思义就是如何在 Pulsar 上实现对 Kafka 数据的读写。KoP 将 Kafka 协议处理插件引入 Pulsar Broker 来实现 Apache Pulsar 对 Apache Kafka 协议的支持。将 KoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 Kafka 应用程序和服务迁移到 Pulsar。

Apache Pulsar 主要特点如下:

利用企业级多租户特性简化运营。

避免数据搬迁,简化操作。

利用 Apache BookKeeper 和分层存储持久保留事件流。

利用 Pulsar Functions 进行无服务器化事件处理。

KoP 架构如下图,通过图可以看到 KoP 引入一个新的协议处理插件,该协议处理插件利用 Pulsar 的现有组件(例如 Topic 发现、分布式日志库-ManagedLedger、cursor 等)来实现 Kafka 传输协议。

Routine Load 订阅 Pulsar 数据思路

Apache Doris Routine Load 支持了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的消息发布和订阅系统,已经在很多线上服务使用。那么 Apache Pulsar 用户如何将数据接入 Apache Doris 呢,答案是通过 KoP 实现。

由于 KoP 直接在 Pulsar 侧提供了对 Kafka 的兼容,那么对于 Apache Doris 来说可以像使用 Kafka 一样使用 Plusar。整个过程对于 Apache Doris 来说无需任务改变,就能将 Pulsar 数据接入 Apache Doris,并且可以获得 Routine Load 的事务性保障。

--------------------------

| Apache Doris |

| --------------- |

| | Routine Load | |

| --------------- |

--------------------------

|Kafka Protocol(librdkafka)

------------v--------------

| --------------- |

| | KoP | |

| --------------- |

| Apache Pulsar |

--------------------------

操作实践

Pulsar Standalone 安装环境准备:

JDK 安装:略

下载 Pulsar 二进制包,并解压:

#下载

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz

#解压并进入安装目录

tar xvfz apache-pulsar-2.10.0-bin.tar.gz

cd apache-pulsar-2.10.0

组件编译和安装

1. 下载 KoP 源码

git clone https://github.com/streamnative/kop.git

cd kop

2. 编译 KoP 项目

mvn clean install -DskipTests

3. protocols 配置:在解压后的 apache-pulsar 目录下创建 protocols文 件夹,并把编译好的 nar 包复制到 protocols 文件夹中。

mkdir apache-pulsar-2.10.0/protocols

# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols

cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols

4. 添加后的结果查看

[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/

pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar

KoP 配置添加

1. 在 standalone.conf 或者 broker.conf 添加如下配置

#kop适配的协议

messagingProtocols=kafka

#kop 的NAR文件路径

protocolHandlerDirectory=。/protocols

#是否允许自动创建topic

allowAutoTopicCreationType=partitioned

2. 添加如下服务监听配置

# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0

kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.

# If it’s not configured, it will be the same with `kafkaListeners` config by default

kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

brokerDeleteInactiveTopicsEnabled=false

当出现如下错误:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

添加如下配置,开启 transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true

transactionCoordinatorEnabled=true

Pulsar 启动

#前台启动

#bin/pulsar standalone

#后台启动

pulsar-daemon start standalone

创建 Doris 数据库和建表

#进入Doris

mysql -u root -h 127.0.0.1 -P 9030

# 创建数据库

create database pulsar_doris;

#切换数据库

use pulsar_doris;

#创建clicklog表

CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog

`clickTime` DATETIME NOT NULL COMMENT “点击时间”,

`type` String NOT NULL COMMENT “点击类型”,

`id` VARCHAR(100) COMMENT “唯一id”,

`user` VARCHAR(100) COMMENT “用户名称”,

`city` VARCHAR(50) COMMENT “所在城市”

DUPLICATE KEY(`clickTime`, `type`)

DISTRIBUTED BY HASH(`type`) BUCKETS 1

PROPERTIES (

“replication_allocation” = “tag.location.default: 1”

);

创建 Routine Load 任务

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog

COLUMNS(clickTime,id,type,user)

PROPERTIES

“desired_concurrent_number”=“3”,

“max_batch_interval” = “20”,

“max_batch_rows” = “300000”,

“max_batch_size” = “209715200”,

“strict_mode” = “false”,

“format” = “json”

FROM KAFKA

“kafka_broker_list” = “127.0.0.1:9092”,

“kafka_topic” = “test”,

“property.group.id” = “doris”

);

上述命令中的参数解释如下:

pulsar_doris :Routine Load 任务所在的数据库

load_from_pulsar_test:Routine Load 任务名称

clicklog:Routine Load 任务的目标表,也就是配置 Routine Load 任务将数据导入到 Doris 哪个表中。

strict_mode:导入是否为严格模式,这里设置为 False。

format:导入数据的类型,这里配置为 Json。

kafka_broker_list:Kafka Broker 服务的地址

kafka_broker_list:Kafka Topic 名称,也就是同步哪个 Topic 上的数据。

property.group.id:消费组 ID

数据导入和测试

1. 数据导入 构造一个 ClickLog 的数据结构,并调用 Kafka 的 Producer 发送 5000 万条数据到 Pulsar。 ClickLog 数据结构如下:

public class ClickLog {

private String id;

private String user;

private String city;

private String clickTime;

private String type;

。.. //省略getter和setter

}

消息构造和发送的核心代码逻辑如下:

String strDateFormat = “yyyy-MM-dd HHss”;

@Autowired

private Producer producer;

try {

for(int j =0 ; j《50000;j++){

int batchSize = 1000;

for(int i = 0 ; i《batchSize ;i++){

ClickLog clickLog = new ClickLog();

clickLog.setId(UUID.randomUUID().toString());

SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);

clickLog.setClickTime(simpleDateFormat.format(new Date()));

clickLog.setType(“webset”);

clickLog.setUser(“user”+ new Random().nextInt(1000) +i);

producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));

}

}

} catch (Exception e) {

e.printStackTrace();

}

2. ROUTINE LOAD 任务查看执行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;命令,查看导入任务的状态。

mysql》 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;

*************************** 1. row ***************************

Id: 87873

Name: load_from_pulsar_test

CreateTime: 2022-05-31 1234

PauseTime: NULL

EndTime: NULL

DbName: default_cluster:pulsar_doris

TableName: clicklog1

State: RUNNING

DataSourceType: KAFKA

CurrentTaskNum: 1

JobProperties: {“partitions”:“*”,“columnToColumnExpr”:“clickTime,id,type,user”,“maxBatchIntervalS”:“20”,“whereExpr”:“*”,“dataFormat”:“json”,“timezone”:“Europe/London”,“send_batch_parallelism”:“1”,“precedingFilter”:“*”,“mergeType”:“APPEND”,“format”:“json”,“json_root”:“”,“maxBatchSizeBytes”:“209715200”,“exec_mem_limit”:“2147483648”,“strict_mode”:“false”,“jsonpaths”:“”,“deleteCondition”:“*”,“desireTaskConcurrentNum”:“3”,“maxErrorNum”:“0”,“strip_outer_array”:“false”,“currentTaskConcurrentNum”:“1”,“execMemLimit”:“2147483648”,“num_as_string”:“false”,“fuzzy_parse”:“false”,“maxBatchRows”:“300000”}

DataSourceProperties: {“topic”:“test”,“currentKafkaPartitions”:“0”,“brokerList”:“127.0.0.1:9092”}

CustomProperties: {“group.id”:“doris”,“kafka_default_offsets”:“OFFSET_END”,“client.id”:“doris.client”}

Statistic: {“receivedBytes”:5739001913,“runningTxns”:[],“errorRows”:0,“committedTaskNum”:168,“loadedRows”:50000000,“loadRowsRate”:23000,“abortedTaskNum”:1,“errorRowsAfterResumed”:0,“totalRows”:50000000,“unselectedRows”:0,“receivedBytesRate”:2675000,“taskExecuteTimeMs”:2144799}

Progress: {“0”:“51139566”}

Lag: {“0”:0}

ReasonOfStateChanged:

ErrorLogUrls:

OtherMsg:

1 row in set (0.00 sec)

ERROR:

No query specified

从上面结果可以看到 totalRows 为 50000000,errorRows 为 0。说明数据不丢不重的导入 Apache Doris 了。

3. 数据统计验证执行如下命令统计表中的数据,发现统计的结果也是 50000000,符合预期。

mysql》 select count(*) from clicklog;

+----------+

| count(*) |

+----------+

| 50000000 |

+----------+

1 row in set (3.73 sec)

mysql》

通过 KoP 我们实现了将 Apache Pulsar 数据无缝接入 Apache Doris ,无需对 Routine Load 任务进行任何修改,并保障了数据导入过程中的事务性。与此同时,Apache Doris 社区已经启动了 Apache Pulsar 原生导入支持的设计,相信在不久后就可以直接订阅 Pulsar 中的消息数据,并保证数据导入过程中的 Exactly-Once 语义。

审核编辑:郭婷

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 服务器
    +关注

    关注

    14

    文章

    10467

    浏览量

    91890
  • 代码
    +关注

    关注

    30

    文章

    4985

    浏览量

    74590

原文标题:如何将Pulsar数据快速且无缝接入Apache Doris

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    如何将光强度转换为电量

    如何将光强度转换为电量 在电子工程领域,光强度转换为电量是一项常见重要的任务。它在诸多场景中都有广泛应用,比如室内照明设计、摄影准备,以及当下热门的智能农业领域。本文详细探讨如何
    的头像 发表于 05-29 12:25 179次阅读

    罗德与施瓦茨推出支持Xona新一代卫星导航服务Pulsar的信号模拟功能

    罗德与施瓦茨(以下简称“R&S”)近期宣布推出支持Xona新一代卫星导航服务"Pulsar" 的信号模拟功能。该功能使设备制造商能够利用R&S信号发生器在生产环节
    的头像 发表于 05-07 14:33 389次阅读

    AD7623:16位、1.33 MSPS PulSAR® ADC的深度解析

    的16位、1.33 MSPS PulSAR® ADC——AD7623,它在医疗仪器、高速数据采集、数字信号处理等众多领域都有着广泛的应用。 文件下载: AD7623.pdf 一、AD7623的特性亮点
    的头像 发表于 04-03 09:35 520次阅读

    16位6MSPS PulSAR差分ADC AD7625:高性能数据转换的理想之选

    16位6MSPS PulSAR差分ADC AD7625:高性能数据转换的理想之选 一、引言 在电子设计领域,模拟到数字的转换是一项至关重要的技术。ADC(模拟 - 数字转换器)作为连接模拟世界和数
    的头像 发表于 04-01 15:35 275次阅读

    电子工程师必看:AD7903 双差分 16 位 1 MSPS PulSAR ADC 深度解析

    、AD7903 的特性亮点 1. 高精度与高速度 AD7903 具有 16 位分辨率无失码,这意味着它能够提供非常精确的数字输出。其吞吐量高达 1 MSPS,能够快速模拟信
    的头像 发表于 03-30 12:10 293次阅读

    EM储能网关 ZWS智慧储能云应用(25) — 如何将电站高效接入省站平台?

    随着储能电站的增长,各省市正在逐步出台储能监管政策,要求业主将电站接入省站平台。ZLG智慧储能云平台,支持省站平台一键接入,助力储能终端业主将电站快速接入省站平台。简介随着新能源储能电
    的头像 发表于 03-11 11:38 340次阅读
    EM储能网关 ZWS智慧储能云应用(25) — <b class='flag-5'>如何将</b>电站高效<b class='flag-5'>接入</b>省站平台?

    工业数据中台支持接入MySQL数据库吗

    工业数据中台完全支持接入MySQL数据库 ,通过数据同步、集成与治理等技术手段,能够充分发挥MySQL在
    的头像 发表于 12-04 11:23 578次阅读
    工业<b class='flag-5'>数据</b>中台支持<b class='flag-5'>接入</b>MySQL<b class='flag-5'>数据</b>库吗

    如何将GCC项目导入NuEclipse?

    如何将GCC项目导入NuEclipse?
    发表于 09-01 07:04

    请问编译程序时如何将数据放入Flash固定地址?

    编译程序时如何将数据放入Flash固定地址?
    发表于 08-29 06:40

    编译程序时如何将数据放入Flash固定地址?

    编译程序时如何将数据放入Flash固定地址?
    发表于 08-27 13:16

    请问如何将ISP写入Nuvoton 8051 MCU系列?

    如何将ISP写入Nuvoton 8051 MCU系列?
    发表于 08-18 07:34

    台湾伟斯扫码枪通过RS232转Profinet网关接入西门子1200 PLC的完整指南

    在工业自动化领域,设备间的无缝通信是提升生产效率的关键。本文详细解析如何将台湾伟斯扫码枪通过RS232转Profinet网关接入西门子1200系列PLC,实现
    的头像 发表于 08-05 14:29 1130次阅读
    台湾伟斯扫码枪通过RS232转Profinet网关<b class='flag-5'>接入</b>西门子1200 PLC的完整指南

    工业自动化通信方案:台湾伟斯扫码枪通过RS232转Profinet网关接入西门子S7-1200 PLC系统详解

    在工业自动化领域,设备间的通信如同神经网络的信号传递,需要精准高效的连接方式。本文聚焦如何将台湾伟斯扫码枪通过RS232转Profinet网关接入西门子S7-1200 PLC系统,
    的头像 发表于 08-04 18:25 1252次阅读
    工业自动化通信方案:台湾伟斯扫码枪通过RS232转Profinet网关<b class='flag-5'>接入</b>西门子S7-1200 PLC系统详解

    使用NVIDIA GPU加速Apache Spark中Parquet数据扫描

    随着各行各业的企业数据规模不断增长,Apache Parquet 已经成为了一种主流数据存储格式。Apache Parquet 是一种列式存储格式,专为高效的大规模
    的头像 发表于 07-23 10:52 1318次阅读
    使用NVIDIA GPU加速<b class='flag-5'>Apache</b> Spark中Parquet<b class='flag-5'>数据</b>扫描

    Modbus TCP转Profibus网关如何快速把流量计接入到DCS?

    在工业自动化领域,设备间的协议互通往往如同语言不通的对话者,需要一位“翻译官”才能实现高效协作。本文围绕Modbus TCP转Profibus网关的应用,解析如何通过这一技术桥梁,流量计数据
    的头像 发表于 07-07 16:50 764次阅读
    Modbus TCP转Profibus网关如何<b class='flag-5'>快速</b>把流量计<b class='flag-5'>接入</b>到DCS?