0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

关于PriorityBlockingQueue中队列操作

openEuler 来源:openEuler 作者:openEuler 2022-05-07 16:43 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

编者按:笔者在使用PriorityBlockingQueue实现按照优先级处理任务时遇到一类NPE问题,经过分析发现根本原因是在任务出队列时调用比较器异常,进而导致后续任务出队列抛出NullPointerException。本文通过完整的案例复现来演示在什么情况会触发该问题,同时给出了处理建议。希望读者在编程时加以借鉴,避免再次遇到此类问题。

背景知识

PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,使用一个全局ReentrantLock来控制某一时刻只有一个线程可以进行元素出队和入队操作,并且每次出队都返回优先级别最高的或者最低的元素。PriorityBlockingQueue通过以下两种方式实现元素优先级排序:

  1. 入队元素实现Comparable接口来比较元素优先级;
  2. PriorityBlockingQueue构造函数指定Comparator来比较元素优先级;

关于PriorityBlockingQueue中队列操作的部分,基本和PriorityQueue逻辑一致,只不过在操作时加锁了。在本文中我们主要关注PriorityBlockingQueue出队的take方法,该方法通过调用dequeue方法将元素出队列。当没有元素可以出队的时候,线程就会阻塞等待。

publicEtake()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
Eresult;
try{
//尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。
while((result=dequeue())==null)
notEmpty.await();
}finally{
lock.unlock();
}
returnresult;
}

现象

在某个业务服务中使用PriorityBlockingQueue实现按照优先级处理任务,某一天环境中的服务突然间不处理任务了,查看后台日志,发现一直抛出NullPointerException。将进程堆dump出来,使用MAT发现某个PriorityBlockingQueue中的size值比实际元素个数多1个(入队时已经对任务进行非空校验)。

异常堆栈如下:

java.lang.NullPointerException
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
...

MAT结果:

4f7ab378-cd26-11ec-bce3-dac502259ad0.png

原因分析

在此我们分析下PriorityBlockingQueue是如何出队列的,PriorityBlockingQueue最终通过调用dequeue方法出队列,dequeue方法处理逻辑如下:

  1. 将根节点(array[0])赋值给result;
  2. array[n] 赋值给 arrary[0];
  3. 将 array[n] 设置为 null;
  4. 调用siftDownComparable或siftDownUsingComparator对队列元素重新排序;
  5. size大小减1;
  6. 返回result;

如果在第4步中出现异常,就会出现队列中的元素个数比实际的元素个数多1个的现象。此时size未发生改变,arry[n]已经被置为null,再进行siftDown操作时就会抛出NullPointerException。继续分析第4步中在什么情况下会出现异常,通过代码走读我们可以发现只有在调用Comparable#compareTo或者Comparator#compare方法进行元素比较的时候才可能出现异常。这块代码的处理逻辑和业务相关,如果业务代码处理不当抛出异常,就会导致上述现象。

/**
*Mechanicsforpoll().Callonlywhileholdinglock.
*/
privateEdequeue(){
intn=size-1;
if(n< 0)
returnnull;
else{
Object[]array=queue;
Eresult=(E)array[0];//step1
Ex=(E)array[n];//step2
array[n]=null;//step3
ComparatorsuperE>cmp=comparator;
if(cmp==null)//step4 如果指定了comparator,就按照指定的comparator来比较。否则就按照默认的
siftDownComparable(0,x,array,n);
else
siftDownUsingComparator(0,x,array,n,cmp);
size=n;//step5
returnresult;//step6
}
}

privatestaticvoidsiftDownComparable(intk,Tx,Object[]array,intn){
if(n>0){
ComparablesuperT>key=(ComparablesuperT>)x;
inthalf=n>>>1;
while(k< half) {
            intchild=(k<< 1)+1;
Objectc=array[child];
intright=child+1;
if(right< n && ((ComparablesuperT>)c).compareTo((T)array[right])>0)
c=array[child=right];
if(key.compareTo((T)c)<= 0)
break;
array[k]=c;
k=child;
}
array[k]=key;
}
}
privatestaticvoidsiftDownUsingComparator(intk,Tx,Object[]array,intn,
ComparatorsuperT>cmp){
if(n>0){
inthalf=n>>>1;
while(k< half) {
            intchild=(k<< 1)+1;
Objectc=array[child];
intright=child+1;
if(right< n && cmp.compare((T) c, (T) array[right]) >0)
c=array[child=right];
if(cmp.compare(x,(T)c)<= 0)
break;
array[k]=c;
k=child;
}
array[k]=x;
}
}

复现代码

importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.PriorityBlockingQueue;

publicclassPriorityBlockingQueueTest{
staticclassEntityimplementsComparable<Entity>{
privateintid;
privateStringname;
privatebooleanflag;

publicvoidsetFlag(booleanflag){
this.flag=flag;
}

publicEntity(intid,Stringname){
this.id=id;
this.name=name;
}

@Override
publicintcompareTo(Entityentity){
if(flag){
thrownewRuntimeException("TestException");
}
if(entity==null||this.id>entity.id){
return1;
}
returnthis.id==entity.id?0:-1;
}
}

publicstaticvoidmain(String[]args){
intnum=5;
PriorityBlockingQueuepriorityBlockingQueue=newPriorityBlockingQueue<>();
Listentities=newArrayList<>();
for(inti=0;i< num; i++) {
            Entity entity = newEntity(i,"entity"+i);
entities.add(entity);
priorityBlockingQueue.offer(entity);
}

entities.get(num-1).setFlag(true);
intsize=entities.size();
for(inti=0;i< size; i++) {
            try{
priorityBlockingQueue.take();
}catch(Exceptione){
e.printStackTrace();
}
}
}

执行结果如下:

java.lang.RuntimeException:TestException
atPriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:31)
atPriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:8)
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
atPriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)
java.lang.NullPointerException
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
atPriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)

规避方案

可以通过以下两种方法规避:

  • 在take方法出现NPE时,清除队列元素,将未处理的元素重新进入队列;
  • 在 Comparable#compareTo 或 Comparator#compare 方法中做好异常处理,对异常情况进行默认操作;

建议使用后者。

案例引申

使用PriorityBlockingQueue作为缓存队列来创建线程池时,使用submit提交任务会出现 java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to 异常,而使用execute没有问题。

观察submit源码可以发现在submit内部代码会将Runable封装成RunnableFuture对象,然后调用execute提交任务。

publicFuturesubmit(Runnabletask){
if(task==null)thrownewNullPointerException();
RunnableFutureftask=newTaskFor(task,null);
execute(ftask);
returnftask;
}

以Comparable为例,任务入队列时,最终会调用siftUpComparable方法。该方法第一步将RunnableFuture强转为Comparable类型,而RunnableFuture类未实现Comparable接口,进而抛出ClassCastException异常。

publicbooleanoffer(Ee){
if(e==null)
thrownewNullPointerException();
finalReentrantLocklock=this.lock;
lock.lock();
intn,cap;
Object[]array;
while((n=size)>=(cap=(array=queue).length))
tryGrow(array,cap);
try{
ComparatorsuperE>cmp=comparator;
if(cmp==null)
siftUpComparable(n,e,array);
else
siftUpUsingComparator(n,e,array,cmp);
size=n+1;
notEmpty.signal();
}finally{
lock.unlock();
}
returntrue;
}

privatestaticvoidsiftUpComparable(intk,Tx,Object[]array){
ComparablesuperT>key=(ComparablesuperT>)x;
while(k>0){
intparent=(k-1)>>>1;
Objecte=array[parent];
if(key.compareTo((T)e)>=0)
break;
array[k]=e;
k=parent;
}
array[k]=key;
}

这也是常见的比较器调用异常案例,本文不再赘述,可自行参考其他文章。

总结

在使用PriorityBlockingQueue时,注意在比较器中做好异常处理,避免出现类似问题。

审核编辑 :李倩


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 比较器
    +关注

    关注

    14

    文章

    1936

    浏览量

    112077
  • 数组
    +关注

    关注

    1

    文章

    420

    浏览量

    27463

原文标题:毕昇 JDK | PriorityBlockingQueue比较器异常导致的NPE问题分析

文章出处:【微信号:openEulercommunity,微信公众号:openEuler】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    RDMA设计43:队列删除及连接断开功能测试

    在接收到正确的删除队列请求后,首先进入连接断开流程,即四次挥手断链,如图中红框部分。随后进行队列删除操作。返回队列操作状态 0x2b,符合设
    的头像 发表于 02-24 07:50 655次阅读
    RDMA设计43:<b class='flag-5'>队列</b>删除及连接断开功能测试

    RDMA设计41:队列管理及连接建立功能验证与分析2

    写入队列控制寄存器后,队列管理模块根据创建信息判断是否能够创建队列并返回队列操作状态(qp_modify_status)。如果能创建
    发表于 02-21 08:27

    RDMA设计25:队列管理模块之发送模块详细设计分析

    发送队列存储为所有发送队列共用的存储空间,根据用户环境和开发板环境不同可由 BRAM、URAM 或 LUTRAM 实现。发送队列管理单元则负责管理这个存储空间,并处理用户指令和发送队列
    的头像 发表于 01-25 16:27 5041次阅读
    RDMA设计25:<b class='flag-5'>队列</b>管理模块之发送模块详细设计分析

    RDMA设计26:队列管理模块设计之接收队列模块详细分析

    本文主要交流设计思路,在本博客已给出相关博文100多篇,希望对初学者有用。注意这里只是抛砖引玉,切莫认为参考这就可以完成商用IP设计。 (2)接收队列 接收队列由一个接收队列管理单元组成。与发送
    发表于 01-22 09:03

    RDMA设计24:队列管理模块设计

    队列管理模块采用管理与存储分离的结构进行设计,由发送队列存储、发送队列管理、接收队列管理、完成条目解析、异常完成条目处理和 Round-Robin 仲裁组成。
    的头像 发表于 01-20 11:45 1548次阅读
    RDMA设计24:<b class='flag-5'>队列</b>管理模块设计

    RDMA设计18:队列管理模块设计3

    标识。故而只使用一个虚拟完成队列足以满足接收队列管理单元和发送队列管理单元的信息需求。这样的设计在一定程度上缩减存储资源开销的同时,也会进一步减小用户操作难度,使模块不再需要为每一个发
    发表于 01-05 09:04

    RDMA设计17:队列管理模块设计2

    。 (2)接收队列 接收队列由一个接收队列管理单元组成。与发送队列类似的是,接收队列管理单元也由若干表单构成,其中包括 RQ1 表单和用户
    发表于 01-04 14:54

    NVMe高速传输之摆脱XDMA设计54:如何测试队列管理功能2

    。 如图1 所示, 删除所有提交队列和完成队列, 删除成功。 然后分别创建一个I/O 完成和提交队列, 连续多次删除, 打印信息返回错误值为 2, 表示操作数量错误,所有
    发表于 12-10 08:33

    优先级队列介绍

    队列(Queue)的知识点:「概念」:队列是一种先进先出(FIFO)的数据结构,类似于排队的概念。「基本操作」:enqueue(item): 将元素添加到队列的末尾。dequeue()
    发表于 11-26 07:56

    基于环形队列的UART收发回显实验

    在实际项目开发中,由于有些串口不具备FIFO(如SCI1和SCI2)或FIFO的buffer比较小,这可能会在数据处理速度小于数据接收速度的时候,导致数据的丢失。因此我们可以设计一个队列来避免这一
    的头像 发表于 10-27 13:51 2138次阅读
    基于环形<b class='flag-5'>队列</b>的UART收发回显实验

    NVMe高速传输之摆脱XDMA设计41:队列管理功能验证与分析5

    0, 表示队列不使能, 后续两次删除操作返回 cr_status值为 2, 表示操作数量错误。 同样对于 I/O 完成队列, 连续删除 3 次, 测试结果表示第一次成功删除, 后两次
    发表于 10-23 16:24

    NVMe高速传输之摆脱XDMA设计40:队列管理功能验证与分析4

    测试结果 NVMe删除队列是指通过NVMe协议提供的命令删除指定的I/O完成队列(CQ)或提交队列(SQ)。该操作一般用于释放存储设备中的资源,确保
    发表于 10-22 10:14

    RabbitMQ消息队列解决方案

    在现代分布式系统架构中,消息队列作为核心组件,承担着系统解耦、异步处理、流量削峰等重要职责。RabbitMQ作为一款成熟的消息队列中间件,以其高可用性、高可靠性和丰富的特性,成为众多企业的首选方案。本文将从运维工程师的角度,详细阐述RabbitMQ从单机部署到集群搭建的完
    的头像 发表于 07-08 15:55 763次阅读

    RDMA简介5之RoCE V2队列分析

    操作中,只有RECEIVE操作会被添加到接收队列。SEND/RECEIVE操作的完整流程,如图1所示,首先由应用程序创建一个工作请求(WR),并将其提交到相应的工作
    发表于 06-05 17:28

    NVME控制器之队列管理模块

    队列管理模块是整个NVMe Host控制器的核心模块,该模块实现了提交队列与完成队列的管理,多队列请求的仲裁判决等功能。队列管理模块中含有数
    的头像 发表于 05-03 15:32 761次阅读
    NVME控制器之<b class='flag-5'>队列</b>管理模块