0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

两亿多用户,六大业务场景,知乎AI用户模型服务性能如何优化?

电子工程师 来源:工程师李察 2019-01-05 11:06 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

用户模型简介

知乎 AI 用户模型服务于知乎两亿多用户,主要为首页、推荐、广告、知识服务、想法、关注页等业务场景提供数据和服务,例如首页个性化 Feed 的召回和排序、相关回答等用到的用户长期兴趣特征,问题路由、回答排序中用到的 TPR「作者创作权威度」,广告定向投放用到的基础属性等。

主要功能

提供的数据和功能主要有:

用户兴趣:长期兴趣、实时兴趣、分类兴趣、话题兴趣、keyword 兴趣、作者创作权威度等,

用户 Embedding 表示:最近邻用户、人群划分、特定用户圈定等,

用户社交属性:用户亲密度、二度好友、共同好友、相似优秀回答者等,

用户实时属性: LastN 行为、LastLogin 等,

用户基础属性:用户性别预测、年龄段计算、职业预估等。

服务架构

整体主要分为 Streaming / 离线计算、在线服务和 HBase 多集群同步三部分组成,下面将依次进行介绍。

用户模型服务架构图

Streaming / 离线计算

Streaming 计算主要涉及功能 LastRead、LastSearch、LastDisplay,实时话题/ Keyword 兴趣、最后登录时间、最后活跃的省市等。

用户模型实时兴趣计算逻辑图

实时兴趣的计算流程

相应日志获取。从 CardshowLog、PageshowLog、QueryLog 中抽取<用户,contentToken,actionType >等内容。

映射到对应的内容维度。对于问题、回答、文章、搜索分别获取对应的 Topic 和 Keyword,搜索内容对应的 Topic。在 Redis 中用 contentToken 置换 contentId 后,请求 ContentProfile 获取其对应话题和关键词;对于 Query,调用 TopicMatch 服务,传递搜索内容给服务,服务返回其对应的 Topic;调用 Znlp 的 KeywordExtractorJar 包,传递搜索内容并获得其对应的 Keyword 。

用户-内容维度汇总。根据用户的行为,在<用户,topic,actionType>和<用户,keyword,actionType>层面进行 groupBy 聚合汇总后,并以 hashmap 的格式存储到 Redis,作为计算用户实时兴趣的基础数据,按时间衰减系数 timeDecay 进行新旧兴趣的 merge 后存储。

计算兴趣。在用户的历史基础数据上,按一定的 decay 速度进行衰减,按威尔逊置信区间计算用户兴趣 score,并以 Sortedset 的格式存储到 Redis。

关于兴趣计算,已经优化的地方主要是:如何快速的计算平滑参数 alpha 和 beta,如何 daily_update 平滑参数,以及用卡方计算置信度时,是否加入平滑参数等都会对最终的兴趣分值有很大的影响,当 display 为 1 曝光数量不足的情况下,兴趣 score 和 confidence 计算出现 的 bias 问题等。

在线服务

随之知乎日益增加的用户量,以及不断丰富的业务场景和与之相对应出现的调用量上升等,对线上服务的稳定性和请求时延要求也越来越高。 旧服务本身也存在一些问题,比如:

在线服务直连 HBase,当数据热点的时候,造成某些 Region Server 的负载很高,P95 上升,轻者造成服务抖动,监控图偶发有「毛刺」现象,重者造成服务几分钟的不可用,需要平台技术人员将 Region 从负载较高的 RegionServer 上移走。

离线任务每次计算完成后一次大批量同时写入离线和在线集群,会加重 HBase 在线集群Region Server 的负载,增大 HBase get 请求的时延,从而影响线上服务稳定性和 P95。

针对问题一,我们在原来的服务架构中增加缓存机制,以此来增强服务的稳定型、减小 Region Server 的负载。

针对问题二,修改了离线计算和多集群数据同步的方式,详见「HBase多集群存储机制」部分。

Cache机制具体实现

没有 Cache 机制时,所有的 get 和 batchGet 方法直接请求到 HBase,具体如下图:

用户模型服务请求序列图

UserProfileServiceApp 启动服务,将收到的请求交由 UserProfileServiceImpl 具体处理

UserProfileServiceImp 根据请求参数,调用 GetTranslator 将 UserProfileRequest.GetRequest 转化成 HBase 中的 Get Object(在 Map 中维护每个 requestField 对应 HBase 中的 tablename,cf,column,prefix 等信息),以格式Map[String, util.List[(AvailField, Get)]]返回。

UserProfileServiceImp 用 Future 异步向 HBase 发送 get 请求,获取到结果返回。

增加 Cache 机制的具体方法,在上面的第二步中,增加一个 CacheMap,用来维护 get 中 AvailField 对应 Cache 中的 key,key 的组成格式为:「 tablename 缩写| columnfamily 缩写| columnname 缩写| rowkey 全写」。这里使用的 Redis 数据结构主要有两种,SortedSet 和 Key-Value对。服务端收到请求后先去转化 requestField 为 Cache 中的 key,从 Cache 中获取数据。对于没有获取到 requestField 的转化成 GetObject,请求 HBase 获取,将结果保存到 Cache 中并返回。

最终效果

用户模型的访问量大概为 100K QPS,每个请求转化为多个 get 请求。 增加 Cache 前 get 请求的 P95 为30ms,增加 Cache 后降低到小于 15ms,Cache 命中率 90% 以上。

HBase 多集群存储机制

离线任务和 Streaming 计算主要采用 Spark 计算实现, 结果保存到 HBase 的几种方式:

方法一:每次一条

1. 每次写进一条,调用 API 进行存储的代码如下:

valhbaseConn=ConnectionFactory.createConnection(hbaseConf)valtable=hbaseConn.getTable(TableName.valueOf("word"))x.foreach(value=>{varput=newPut(Bytes.toBytes(value.toString))put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("c1"),Bytes.toBytes(value.toString))table.put(put)})

方法二:批量写入

2. 批量写入 HBase,使用的 API:

/***{@inheritDoc}*@throwsIOException*/@Overridepublicvoidput(finalListputs)throwsIOException{getBufferedMutator().mutate(puts);if(autoFlush){flushCommits();}}

方法三:MapReduce 的 saveAsNewAPIHadoopDataset 方式写入

3. saveAsNewAPIHadoopDataset 是通用的保存到 Hadoop 存储系统的方法,调用 org.apache.hadoop.mapreduce.RecordWriter 实现。org.apache.hadoop.hbase.mapreduce.TableOutputFormat.TableRecordWriter 是其在 HBase 中的实现类。底层通过调用 hbase.client.BufferedMutator.mutate() 方式保存。

valrdd=sc.makeRDD(Array(1)).flatMap(_=>0to1000000)rdd.map(x=>{varput=newPut(Bytes.toBytes(x.toString))put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("c1"),Bytes.toBytes(x.toString))(newImmutableBytesWritable,put)}).saveAsHadoopDataset(jobConf)/***Writesakey/valuepairintothetable.*@throwsIOExceptionWhenwritingfails.*/@Overridepublicvoidwrite(KEYkey,Mutationvalue)throwsIOException{if(!(valueinstanceofPut)&&!(valueinstanceofDelete)){thrownewIOException("PassaDeleteoraPut");}mutator.mutate(value);}

方法四:BulkLoad 方式

4. BulkLoad 方式,创建 HFiles,调用 LoadIncrementalHFiles 作业将它们移到 HBase 表中。

首先需要根据表名 getRegionLocator 得到 RegionLocator,根据 RegionLocator 得到 partition,因为在 HFile 中是有序的所以,需要调用 rdd.repartitionAndSortWithinPartitions(partitioner) 将 rdd 重新排序。

HFileOutputFormat2.configureIncrementalLoad(job,table, regionLocator) 进行任务增量Load 到具体表的配置 实现并执行映射( 并减少) 作业,使用 HFileOutputFormat2 输出格式将有序的放置或者 KeyValue 对象写入HFile文件。Reduce阶段通过调用 HFileOutputFormat2.configureIncrementalLoad 配置在场景后面。执行LoadIncrementalHFiles 作业将 HFile 文件移动到系统文件。

staticvoidconfigureIncrementalLoad(Jobjob,Tabletable,RegionLocatorregionLocator,Class>cls)throwsIOException{Configurationconf=job.getConfiguration();job.setOutputKeyClass(ImmutableBytesWritable.class);job.setOutputValueClass(KeyValue.class);job.setOutputFormatClass(cls);//Basedontheconfiguredmapoutputclass,setthecorrectreducertoproperly//sorttheincomingvalues.if(KeyValue.class.equals(job.getMapOutputValueClass())){job.setReducerClass(KeyValueSortReducer.class);}elseif(Put.class.equals(job.getMapOutputValueClass())){job.setReducerClass(PutSortReducer.class);}elseif(Text.class.equals(job.getMapOutputValueClass())){job.setReducerClass(TextSortReducer.class);}else{LOG.warn("Unknownmapoutputvaluetype:"+job.getMapOutputValueClass());}conf.setStrings("io.serializations",conf.get("io.serializations"),MutationSerialization.class.getName(),ResultSerialization.class.getName(),KeyValueSerialization.class.getName());configurePartitioner(job,startKeys);//SetcompressionalgorithmsbasedoncolumnfamiliesconfigureCompression(table,conf);configureBloomType(table,conf);configureBlockSize(table,conf);configureDataBlockEncoding(table,conf);TableMapReduceUtil.addDependencyJars(job);TableMapReduceUtil.initCredentials(job);LOG.info("Incrementaltable"+table.getName()+"outputconfigured.");}publicstaticvoidconfigureIncrementalLoad(Jobjob,Tabletable,RegionLocatorregionLocator)throwsIOException{configureIncrementalLoad(job,table,regionLocator,HFileOutputFormat2.class);}valhFileLoader=newLoadIncrementalHFiles(conf)hFileLoader.doBulkLoad(hFilePath,newHTable(conf,table.getName))

将 HFile 文件 Bulk Load 到已存在的表中。 由于 HBase 的 BulkLoad 方式是绕过了 Write to WAL,Write to MemStore 及 Flush to disk 的过程,所以并不能通过 WAL 来进行一些复制数据的操作。 由于 Bulkload 方式还是对集群 RegionServer 造成很高的负载,最终采用方案三,下面是两个集群进行数据同步。

存储同步机制

技术选型 HBase 常见的 Replication 方法有 SnapShot、CopyTable/Export、BulkLoad、Replication、应用层并发读写等。 应用层并发读写 优点:应用层可以自由灵活控制对 HBase写入速度,打开或关闭两个集群间的同步,打开或关闭两个集群间具体到表或者具体到列簇的同步,对 HBase 集群性能的影响最小,缺点是增加了应用层的维护成本。 初期没有更好的集群数据同步方式的时候,用户模型和内容模型自己负责两集群间的数据同步工作。

用户模型存储多机房同步架构图

具体实现细节

第一步:定义用于在 Kafka 的 Producer 和 Consumer 中流转的统一数据 Protobuf 格式

messageColumnValue{requiredbytesqualifier=1;......}messagePutMessage{requiredstringtablename=1;......}

第二步:发送需要同步的数据到 Kafka,(如果有必要,需要对数据做相应的格式处理),这里对数据的处理,有两种方式。 第一种:如果程序中有统一的存储到 HBase 的工具(另一个项目是使用自定义的 HBaseHandler,业务层面只生成 tableName,rowKey,columnFamily,column 等值,由 HBaseHandler 统一构建成 Put 对象,并保存 HBase 中),这种方式在业务层面改动较小,理论上可以直接用原来的格式发给 Kafka,但是如果 HBaseHandler 处理的格式和 PutMessage 格式有不符的地方,做下适配即可。

/***tableName:hbasetablename*rdd:RDD[(rowkey,family,column,value)]*/defconvert(tableName:String,rdd:RDD):RDD={rdd.map{case(rowKey:String,family:String,column:String,value:Array[Byte])=>valmessage=KafkaMessages.newBuilder()valcolumnValue=ColumnValue.newBuilder()columnValue.set......(rowKey,message.build().toByteArray)}}

第二种:程序在 RDD 中直接构建 HBase 的 Put 对象,调用 PairRDD 的 saveAsNewAPIHadoopDataset 方法保存到 HBase 中。此种情况,为了兼容已有的代码,做到代码和业务逻辑的改动最小,发送到 Kafka 时,需要将 Put 对象转换为上面定义的 PutMessage Protobuf 格式,然后发送给 Kafka。

/***tableName:hbasetablenamne*rdd:RDD[(rowKey,put)]*/defconvert(tableName:String,familyNames:Array[String],rdd:RDD):RDD={rdd.map{case(_,put:Put)=>valmessage=PutMessage.newBuilder()for(familyName<- familyNames){      if(put.getFamilyMap().get(Bytes.toBytes(familyName))!=null){      val keyValueList = put.getFamilyMap()        .asInstanceOf[java.util.ArrayList[KeyValue]].asScala        for( keyvalue <- keyValueList){          message.setRowkey(ByteString.copyFrom(keyvalue.getRow))        ......        }        message.setTablename(tableName)      }    }    (null, message.build().toByteArray) }}

第三步:发送到 Kafka,不同的表发送到不同的 Topic,对每个 Topic 的消费做监控。

/***发送rdd中的内容到brokers的指定topic中*tableName:hbasetablenamne*rdd:RDD[(rowKey,put)]*/defsend[T](brokers:String,rdd:RDD[(String,T)],topic:String)(implicitcTag:ClassTag[T]):Unit={rdd.foreachPartition(partitionOfRecords=>{valproducer=getProducer[T](brokers)partitionOfRecords.map(r=>newProducerRecord[String,T](topic,r._1,r._2)).foreach(m=>producer.send(m))producer.close()})}

第四步:另启动 Streaming Consumer 或者服务消费 Kafka 中内容,将 putMessage 的 Protobuf 格式转成 HBase 的 put 对象,同时写入到在线 HBase 集群中。 Streaming 消费Kafka ,不同的表发送到不同的 Topic,对每个 Topic 的消费做监控。

valtoHBaseTagsTopic=validKafkaStreamTagsTopic.map{record=>valtableName_r=record.getTablename()valput=newPut(record.getRowkey.toByteArray)for(cv<- record.getColumnsList) {          put.addColumn(record.getFamily.toByteArray)          ......        }        if(put.isEmpty){          (new ImmutableBytesWritable(), null)        }else{          (new ImmutableBytesWritable(), put)        }    }.filter(_._2!=null)    if(!isClean) {      toHbaseTagsTopic.foreachRDD { rdd =>rdd.saveAsNewAPIHadoopDataset(AccessUtils.createOutputTableConfiguration(constants.Constants.NAMESPACE+":"+constants.Constants.TAGS_TOPIC_TABLE_NAME))}}

如下为另一种启动服务消费 Kafka 的方式。

valconsumer=newKafkaConsumer[String,Array[Byte]](probs)consumer.subscribe(topics)valrecords=consumer.poll(100)for(p<- records.partitions) {   val recordsOfPartition = records.records(p)   recordsOfPartition.foreach { r =>Try(KafkaMessages.parseFrom(r.value()))match{caseSuccess(record)=>valtableName=record.getTableNameif(validateTables.contains(tableName)){valmessageType=record.getType......try{valcolumns=record.getColumnsList.map(c=>(c.getColumn,c.getValue.toByteArray)).toArrayHBaseHandler.write(tableName)......}catch{caseex:Throwable=>LOG.error("writehbasefail")HaloClient.increment(s"content_write_hbase_fail")}}else{LOG.error(s"table$tableNameisvalid")}}}//updateoffsetvallastOffset=recordsOfPartition.get(recordsOfPartition.size-1).offset()consumer.commitSync(java.util.Collections.singletonMap(p,newOffsetAndMetadata(lastOffset+1)))}

结语

最后,目前采用的由应用控制和管理在线离线集群的同步机制,在随着平台多机房项目的推动下,平台将推出 HBase 的统一同步机制 HRP (HBase Replication Proxy),届时业务部门可以将更多的时间和精力集中在模型优化层面。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • API
    API
    +关注

    关注

    2

    文章

    2506

    浏览量

    67135
  • AI
    AI
    +关注

    关注

    91

    文章

    41976

    浏览量

    303077
  • 模型
    +关注

    关注

    1

    文章

    3873

    浏览量

    52341

原文标题:两亿多用户,六大业务场景,知乎AI用户模型服务性能如何优化?

文章出处:【微信号:rgznai100,微信公众号:rgznai100】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    【节能学院】适配688新政,园区多用户绿电直连合规落地指南

    在"双碳”目标深化推进的关键阶段,绿电直连政策体系持续完善。继2025年650号文(单用户绿电直连)明确单一用户绿电直连的实施框架后,2026年688号文(多用户绿电直连)进一步将场景
    的头像 发表于 05-22 08:04 201次阅读
    【节能学院】适配688新政,园区<b class='flag-5'>多用户</b>绿电直连合规落地指南

    多用户绿电直连新政出台,CET智能微网系统如何破解三大难题?

    2026年5月20日,国家发改委、国家能源局联合发布《关于有序推动多用户绿电直连发展有关事项的通知》(发改能源〔2026〕688号)。从单用户多用户,绿电直连政策全面升级,为工业园区、零碳园区
    的头像 发表于 05-21 18:02 382次阅读
    <b class='flag-5'>多用户</b>绿电直连新政出台,CET智能微网系统如何破解三大难题?

    行业洞察篇__数字孪生项目演进中的“双引擎”模式:场景构建与业务运维的协同路径

    单纯卖场景建模服务的模式将越来越难。 场景构建与业务运维的协同样本:种能力的对接实验 目前市场上最常见的落地路径是分
    发表于 05-14 09:56

    AI模型小龙虾-OpenClaw-0基础从入门到实战

    “长尾、轻量、多变”的部门级业务场景时,具有无可比拟的适用效率和性价比。 总结 “AI模型小龙虾 OpenClaw”之所以能成为零基础人群的首选,根本原因在于它精准卡位了非技术岗
    发表于 05-06 16:04

    AI Ceph 分布式存储教程资料大模型学习资料2026

    。在面向 C 端用户的推理场景中,向量数据库与模型权重的加载对存储系统提出了新的挑战。 实战中的 AI 存储需要支持高并发的向量检索请求,这要求存储底座具备极高的随机读写能力。同时,为
    发表于 05-01 17:35

    HM博学谷狂野AI模型第四期

    如何减少 GPU 显存碎片,提高吞吐量。通过对推理引擎源码的调试与优化,开发者将学会如何在不牺牲模型效果的前提下,榨干硬件性能,实现毫秒级的低延迟响应,这是将 AI 技术转化为高并发商
    发表于 05-01 17:30

    黑马-Java+AI新版V16零基础就业班百度云网盘下载+Java+AI全栈开发工程师

    AI 能力有条典型路径。初级做法是独立部署 AI 模型服务(Python 推理端),Java 业务
    发表于 05-01 11:29

    [完结15章]Java转 AI高薪领域必备-从0到1打通生产级AI Agent开发

    拒绝原地踏步:Java工程师AI转型的底层技术破局之道 在软件工程范式被大语言模型(LLM)彻底颠覆的今天,传统的“Java CRUD boy”正面临着前所未有的生存危机。当业务逻辑的生成可以被
    发表于 04-30 13:46

    后摩智能与亿次网联发布家庭场景专属AI超级助理龙虾密盒

    4月22日,后摩智能与亿次网联联合发布了家庭场景专属 AI 超级助理 ——龙虾密盒。它通过本地模型+本地智能体+Harness工程,让用户
    的头像 发表于 04-24 16:08 486次阅读

    工业 AI Agent 为什么能真正落地工厂?研华六大场景实战经验分享

    从经验传承到决策优化,从设备维护到供应链协同,研华iFactory.AI Agent的六大落地案例充分证明,工业AI Agent并非悬浮的技术概念,而是能深度适配制造业生产经营全链路、
    的头像 发表于 03-12 16:58 500次阅读

    如何评估工业扫码器的“好用”程度?六大指标解析

    在智能制造、物流分拣、产线追溯等场景中,工业扫码器是数据采集的核心设备。很多用户选购时只看价格与外观,却忽略了决定长期稳定性与效率的关键指标。真正好用的工业扫描器,必须兼顾速度、精度、耐用性与兼容性
    的头像 发表于 03-04 13:43 244次阅读
    如何评估工业扫码器的“好用”程度?<b class='flag-5'>六大</b>指标解析

    模型 ai coding 比较

    bug场景(抽样9题) 覆盖Bug类型:语法错误、逻辑错误、性能优化三类 核心指标:Bug修复通过率 评分逻辑:成功修复得10分,修复失败/引入新问题得0分 实测结果:DeepSeek 9/9(100
    发表于 02-19 13:43

    利用拼多多用户API进行粉丝数据分析,有效提升用户粘性

    这些API进行粉丝数据分析,并基于分析结果制定增强用户粘性的策略。 一、 拼多多用户API概览 拼多多开放平台提供了丰富的API接口,涵盖商品、交易、用户、物流等多个维度。对于粉丝数据分析,以下几个API尤为关键: 粉丝明细查询
    的头像 发表于 12-30 10:38 444次阅读
    利用拼多<b class='flag-5'>多用户</b>API进行粉丝数据分析,有效提升<b class='flag-5'>用户</b>粘性

    多用户全双工通信:为什么MIMO系统蓬勃发展?

    收音机如何同时发送和接收?我们将研究这一复杂的现象,您将掌握其要点。我们将探讨多用户全双工通信。我们还将重点介绍主要挑战并演示MIMO解决方案。最后,我们将介绍其应用和优势。
    的头像 发表于 11-13 12:01 2210次阅读
    <b class='flag-5'>多用户</b>全双工通信:为什么MIMO系统蓬勃发展?

    开源“智能预渲染框架” 几行代码实现鸿蒙应用页面“秒开”

    近日,在Gitee平台开源了其自研的鸿蒙“智能预渲染框架”,并将该框架的Har包上架到OpenHarmony三方库中心仓。该框架在鸿蒙平台首创“智能预渲染”技术,旨在破解应用复杂页面加载缓慢
    的头像 发表于 08-29 14:32 773次阅读
    <b class='flag-5'>知</b><b class='flag-5'>乎</b>开源“智能预渲染框架” 几行代码实现鸿蒙应用页面“秒开”