0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Tenacity重试模块实践

科技绿洲 来源:Python实用宝典 作者:Python实用宝典 2023-11-02 11:33 次阅读

为了避免由于一些网络或等其他不可控因素,而引起的功能性问题。比如在发送请求时,会因为网络不稳定,往往会有请求超时的问题。

这种情况下,我们通常会在代码中加入重试的代码。重试的代码本身不难实现,但如何写得优雅、易用,是我们要考虑的问题。

这里要给大家介绍的是一个第三方库 - Tenacity ,它实现了几乎我们可以使用到的所有重试场景,比如:

  1. 在什么情况下才进行重试?
  2. 重试几次呢?
  3. 重试多久后结束?
  4. 每次重试的间隔多长呢?
  5. 重试失败后的回调?

在使用它之前 ,先要安装它

$ pip install tenacity

1. 最基本的重试

无条件重试,重试之间无间隔

from tenacity import retry

@retry
def test_retry():
    print("等待重试,重试无间隔执行...")
    raise Exception

test_retry()

无条件重试,但是在重试之前要等待 2 秒

from tenacity import retry, wait_fixed

@retry(wait=wait_fixed(2))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

2. 设置停止基本条件

只重试7 次

from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(7))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

重试 10 秒后不再重试

from tenacity import retry, stop_after_delay

@retry(stop=stop_after_delay(10))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

或者上面两个条件满足一个就结束重试

from tenacity import retry, stop_after_delay, stop_after_attempt

@retry(stop=(stop_after_delay(10) | stop_after_attempt(7)))
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

3. 设置何时进行重试

在出现特定错误/异常(比如请求超时)的情况下,再进行重试

from requests import exceptions
from tenacity import retry, retry_if_exception_type

@retry(retry=retry_if_exception_type(exceptions.Timeout))
def test_retry():
    print("等待重试...")
    raise exceptions.Timeout

test_retry()

在满足自定义条件时,再进行重试。

如下示例,当 test_retry 函数返回值为 False 时,再进行重试

from tenacity import retry, stop_after_attempt, retry_if_result

def is_false(value):
    return value is False

@retry(stop=stop_after_attempt(3),
       retry=retry_if_result(is_false))
def test_retry():
    return False

test_retry()

4. 重试后错误重新抛出

当出现异常后,tenacity 会进行重试,若重试后还是失败,默认情况下,往上抛出的异常会变成 RetryError,而不是最根本的原因。

因此可以加一个参数reraise=True),使得当重试失败后,往外抛出的异常还是原来的那个。

from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(7), reraise=True)
def test_retry():
    print("等待重试...")
    raise Exception

test_retry()

5. 设置回调函数

当最后一次重试失败后,可以执行一个回调函数

from tenacity import *

def return_last_value(retry_state):
    print("执行回调函数")
    return retry_state.outcome.result()  # 表示返回原函数的返回值

def is_false(value):
    return value is False

@retry(stop=stop_after_attempt(3),
       retry_error_callback=return_last_value,
       retry=retry_if_result(is_false))
def test_retry():
    print("等待重试中...")
    return False

print(test_retry())

输出如下

等待重试中...
等待重试中...
等待重试中...
执行回调函数
False
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 模块
    +关注

    关注

    7

    文章

    2494

    浏览量

    46603
  • 参数
    +关注

    关注

    11

    文章

    1398

    浏览量

    31504
  • 代码
    +关注

    关注

    30

    文章

    4558

    浏览量

    66926
收藏 人收藏

    评论

    相关推荐

    wifi (STA) 最大重试次数是多少?

    您好,目前 STA 应用程序一直在尝试连接到接入点。我需要限制重试次数。这个限制是可配置的吗?万一,怎么办?“esp_wifi_connect()”是否不断重试并生成事件“WIFI_EVENT_STA_DISCONNECTED”?万一,我该如何设置最大
    发表于 03-02 08:46

    K32W041自SDK v2.6.8起不再重试是怎么回事?

    我们将 K32W041 与我们自己的 802.15.4 协议一起使用,即不是 Zigbee 或 Thread,并且自 SDK 发布 v2.6.8 以来,我们观察到传输重试的行为发生了变化
    发表于 04-21 07:30

    具有优先权的M/G/1重试可修排队系统

    在服务台忙的情况下,到达服务台的顾客以概率q进入无限位置的优先队列而以概率p进入无限位置的重试轨道(orbit),并且按照先到先服务(FCFS)规则排队,假定只有队首的顾客允许重试,同时考
    发表于 04-01 20:27 20次下载

    HBase客户端实践重试机制

    现在,网易视频云与大家分享HBase客户端实践重试机制。 在运维HBase的这段时间里,发现业务用户一方面比较关注HBase本身服务的读写性能:吞吐量以及读写延迟,另一方面也会比较关注HBase
    发表于 10-10 10:15 0次下载
    HBase客户端<b class='flag-5'>实践</b><b class='flag-5'>重试</b>机制

    Tenacity跨平台的音频编辑器

    ./oschina_soft/tenacity.zip
    发表于 06-01 15:37 4次下载
    <b class='flag-5'>Tenacity</b>跨平台的音频编辑器

    如何在RocketMQ中合理使用重试机制

    RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试
    的头像 发表于 11-23 10:15 811次阅读

    一个Spring注解轻松搞定循环重试功能!

    重试等待策略,默认使用@Backoff,@Backoff的value默认为1000L,我们设置为2000L;multiplier(指定延迟倍数)默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试
    的头像 发表于 04-24 11:43 462次阅读
    一个Spring注解轻松搞定循环<b class='flag-5'>重试</b>功能!

    全自动三轴荷重试验机的整体设计方案及测试原理

    全自动三轴荷重试验机的整体设计方案及测试原理?|深圳磐石测控仪器
    的头像 发表于 08-08 09:34 683次阅读
    全自动三轴荷<b class='flag-5'>重试</b>验机的整体设计方案及测试原理

    三轴荷重试验机原理与结构及领域

    三轴荷重试验机原理与结构及领域?|深圳市磐石测控仪器有限公司
    的头像 发表于 08-15 09:23 613次阅读
    三轴荷<b class='flag-5'>重试</b>验机原理与结构及领域

    PS-1305S小型荷重试验机的优势所在

    PS-1305S小型荷重试验机的优势所在?|深圳磐石测控
    的头像 发表于 09-19 09:20 709次阅读
    PS-1305S小型荷<b class='flag-5'>重试</b>验机的优势所在

    三轴荷重试验机的工作原理是什么

    三轴荷重试验机的工作原理是什么
    的头像 发表于 10-20 09:08 2032次阅读
    三轴荷<b class='flag-5'>重试</b>验机的工作原理是什么

    Python 在什么情况下才进行重试

    ,但如何写得优雅、易用,是我们要考虑的问题。 这里要给大家介绍的是一个第三方库 - Tenacity (标题中的重试机制并并不准确,它不是 Python 的内置模块,因此并不能称之为机制),它实现了几乎我们可以使用到的所有
    的头像 发表于 10-21 11:18 197次阅读

    Python中retrying库的有参数重试

    有参数重试 (1) stop_max_attempt_number 在retry中传入stop_max_attempt_number参数后可以指定失败重试的次数 @retry
    的头像 发表于 11-14 11:08 281次阅读
    Python中retrying库的有参数<b class='flag-5'>重试</b>

    小三轴荷重试验机是什么?有哪些介绍

    小三轴荷重试验机是什么?有哪些介绍
    的头像 发表于 12-07 09:09 247次阅读
    小三轴荷<b class='flag-5'>重试</b>验机是什么?有哪些介绍

    三轴荷重试验机的用途和工作原理

    三轴荷重试验机的用途和工作原理!|深圳市磐石测控仪器有限公司
    的头像 发表于 12-08 09:06 367次阅读
    三轴荷<b class='flag-5'>重试</b>验机的用途和工作原理