0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

利用平稳和离散小波变换方式从心电图数据获取心率

安费诺传感器学堂 来源:安费诺传感器学堂 2026-04-09 14:55 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

导语

在上一篇关于 CWT 的文章里,我们已经展示了连续小波变换(CWT)如何“放大”心电图(ECG)里那一瞬间的 R 波,并获取心率。这一次,我们把平移不变的小波(SWT)和离散小波(DWT)也请来比较一下,三种小波变换,同组数据,最后我们将比较检测结果的差异。

为了避免陈述一些枯燥的公式,先用几句话来概括这三种小波变换各自的特点。

CWT:像拿着一把可变倍数的放大镜,对着信号一寸寸地扫——你可以把放大率(尺度)调得很细很密,也可以把放大镜慢慢移动到任何时刻去比对。但因为你对每个放大率和每个位置都拍张‘快照’,最后会积攒一堆照片(系数很多、很冗余),所以既能看得细致入微,也要付出计算和存储的代价。

3008fc52-2ff1-11f1-90a1-92fbcf53809c.png

3066597e-2ff1-11f1-90a1-92fbcf53809c.png

如上面2个图所示CWT示意在2个尺度的小波函数下,每个尺度对应的2个时间点(T1,T2)的平移,小波函数和原信号进行逐点乘积得到曲线,以及对应局部区域乘积和的数值(红点)。

SWT:像把同一幅画用不同倍率的镜头都拍成同样像素大小的照片,这样每张照片的像素都能一一对应地比较——你不会因为放大或缩小而丢掉某个像素的位置,但照片会成堆(冗余),好处是无论把原画轻轻平移多少,照片的亮点都会一起平移,不会乱跳(平移稳定)。每个倍率的镜头包含两个滤镜,一个抓细节(高频),一个抓轮廓(低频)。如下图所示,不过只对应一种滤镜。

30c2f8e6-2ff1-11f1-90a1-92fbcf53809c.png

信号和小波最初形态(无插零膨胀)

311f25c6-2ff1-11f1-90a1-92fbcf53809c.png

3176ea68-2ff1-11f1-90a1-92fbcf53809c.png

SWT在分解信号第一层的T1和T2处局部小波系数与信号的逐点相乘并求和(红点)

31cdaf56-2ff1-11f1-90a1-92fbcf53809c.png

信号和连续插零膨胀4次的小波

32280a28-2ff1-11f1-90a1-92fbcf53809c.png

和膨胀后小波系数卷积的不是原信号

32833ef2-2ff1-11f1-90a1-92fbcf53809c.png

SWT在分解信号第四层(红点是对应时间的乘积和)

DWT:SWT通过上采样滤波器(或等价地不下采样)来实现尺度变化,从而获得平移不变性和冗余系数;DWT通过下采样实现尺度变化,但是对输入信号的平移敏感;你可以把 SWT 看作是“去下采样化并对滤波器进行按层扩展”的 DWT 变体。想象你拿着一台固定变焦的小相机(其实是一对滤波器:一个是写意让画面模糊的 low‑pass 和一个抓细节的 high‑pass)。第一轮你把整张画“拍”成两张:一张是模糊的大块(近似),一张是细碎的纹理(细节)。接着,把那张“模糊的照片”按比例缩小(下采样,只保留每隔一行/列的像素)作为下一轮的新画面,再用同样的相机继续拍——层层下去就成了一个金字塔。注意:细节那一份在每层也会被下采样并保存为该层的细节系数,所以整个过程既紧凑又可重构。但正因为每层都在“丢掉一半像素”,分解结果对原始信号的微小平移会很敏感;要想避免这一点可以考虑不下采样的 SWT(平移不变小波)或冗余变换。

好了,不多说了,这次代码较多。我们就转到心率检测这个话题上吧。

SWT检测心率

32dd6db4-2ff1-11f1-90a1-92fbcf53809c.png

SWT通过db4小波检测到的前20s峰值和心率

大家可以比较一下CWT的心率检测输出(小波变换(3):连续小波变换(CWT)方式从心电图数据获取心率)。SWT检测心率的程序中,除了要找到峰值,还要避免峰值的变化,峰值处的多个相同采样值,以及大峰后的小伪峰。程序的基本步骤如下:

读取/截取 ECG

预处理(bandpass_filter)

SWT 分解(pywt.swt)

level(分解层数):越高能覆盖越低频成分,但计算和存储开销增加

wavelet(母小波):db4、sym4、coif1 常用于 ECG;db4 与 QRS 形状相似通常表现好

经验(针对 fs=360 Hz):细节层 j 对应的近似频带(D_j 约为 fs/(2^(j+1)) 到 fs/(2^j)):

D1: 90–180 Hz, D2: 45–90 Hz, D3: 22.5–45 Hz, D4: 11.25–22.5 Hz, D5: 5.625–11.25 Hz

QRS 能量通常集中在 ~5–40 Hz → 可选 detail_levels = [3,4,5](与你代码一致)

聚合感兴趣尺度(agg)

归一化与平滑(agg -> agg_smooth)

归一化:agg = (agg - min) / ptp 便于阈值基于百分位或绝对值一致性

平滑:moving_average(agg, win_samples),win_samples = smooth_ms/1000 * fs(示例 smooth_ms=30 ms)

初始候选峰检测(较宽松):找出可能的 R 峰位置(候选),但保持一定宽松以便后续二次筛选

候选峰特征计算(height, prominence)

二次筛选(基于幅值 / 突出度 / 相对高度)

计算 BPM(瞬时 + 平滑)

Plot输出

SWT小波变换检测心率程序源码:

"""
基于 SWT 的 ECG R 峰检测与心率 (BPM) 计算示例
适合:离线分析或对检测稳定性有较高要求的场景(例如设备后台或验证)
依赖: pip install numpy scipy matplotlib pywavelets
说明要点:
- 使用 pywt.swt 做平移不变小波分解(每级返回 cA, cD, 长度与输入相同)
- 选取与 QRS 频带对应的 detail 级别(例如 fs=360Hz 时, detail level 3-5 涵盖 ~5-45Hz)
- 聚合所选 detail 的绝对值作为能量包络, 平滑后做峰检测
- 峰在原始带通信号附近做局部最大值校准以确定 R 峰准确位置
"""
importnumpyasnp
importscipy.signalassignal
importmatplotlib.pyplotasplt
importpywt
fromscipy.signal.windowsimportgaussian
importpandasaspd
# ---------- 参数(根据采样率调整) ----------
fs =360.0        # 采样率(Hz), 题目中为 360Hz
wavelet ='db4'     # 常用小波(Daubechies 4 对 ECG 效果不错)
swt_level =5      # SWT 分解层数(越高捕获越低频成分, 但计算增大)
# 推荐 detail levels(基于经验/频带映射), 对 360Hz 可用 3-5 覆盖 QRS 频段 5-40Hz
swt_detail_levels = [3,4,5]
pre_band = (0.5,45.0)  # 可选预带通滤波, 去基线与高频噪声
min_rr_sec =0.20    # 最小 RR(秒), 防止伪峰;200 ms 常见
smooth_ms =30      # 聚合能量的平滑窗口 (毫秒)
threshold_percentile =70# 自适应阈值:聚合能量的百分位
debug_plot =True
# ---------- 辅助函数 ----------
defbandpass_filter(x, fs, lowcut, highcut, order=3):
  nyq =0.5* fs
  b, a = signal.butter(order, [lowcut/nyq, highcut/nyq], btype='band')
 returnsignal.filtfilt(b, a, x)


defmoving_average(x, win_samples):
 ifwin_samples <= 1:
        return x
    kernel = np.ones(win_samples) / win_samples
    return np.convolve(x, kernel, mode='same')


def _compute_bpm_from_peaks(peaks_idx, fs, smooth_N=3):
    """
    从峰索引计算 BPM 序列。
    返回 (bpm_times, bpm_values):
      bpm_times: 对应于每次计算出的 BPM 的时间点(秒), 使用当前峰时间
      bpm_values: 平滑后的 BPM(以最近 smooth_N 个 RR 平均)
    """
    if len(peaks_idx) < 2:
        return np.array([]), np.array([])
    bpm_times = []
    bpm_values = []
    for i in range(1, len(peaks_idx)):
        rr = (peaks_idx[i] - peaks_idx[i-1]) / fs
        if rr <= 0:
            continue
        inst_bpm = 60.0 / rr
        # 平滑:最多用最近 smooth_N 个 RR
        j0 = max(1, i - smooth_N + 1)
        rr_list = [(peaks_idx[k] - peaks_idx[k-1]) / fs for k in range(j0, i+1)]
        if len(rr_list) >0:
      smooth_bpm =60.0/ (np.mean(rr_list))
   else:
      smooth_bpm = inst_bpm
    bpm_times.append(peaks_idx[i] / fs)
    bpm_values.append(smooth_bpm)
 returnnp.array(bpm_times), np.array(bpm_values)


defdetect_beats_swt(ecg, fs,
          wavelet='db4',
          level=6,
          detail_levels=(3,4,5),
          pre_band=(0.5,45.0),
          smooth_ms=30,
          initial_percentile=50,
          amp_factor=0.6,
          prom_factor=0.5,
          prom_min=0.05,
          rel_factor=0.45,
          min_rr_sec=0.20,
          debug_plot=False):
 """
  SWT-based R peak detection.
  返回: peaks_idx, bpm_times, bpm_values
  主要改进:
   - 先用初始阈值找候选峰, 再基于候选峰的中位高度/中位 prominence 做二次筛选
   - 加入相对前峰高度检查以减少大峰后的小伪峰
  """
 # 1) 预处理
 ifpre_bandisnotNone:
    ecg_f = bandpass_filter(ecg, fs, pre_band[0], pre_band[1])
 else:
    ecg_f = ecg.copy()
 
 # 2) SWT 分解
  swt_coeffs = pywt.swt(ecg_f, wavelet, level=level, start_level=0)
 
 # 3) 聚合 select detail levels 的绝对值
  L =len(ecg_f)
  agg = np.zeros(L)
 forlvlindetail_levels:
   if1<= lvl <= level:
            cA, cD = swt_coeffs[lvl-1]
            agg += np.abs(cD)
    # 归一化与平滑
    agg = (agg - np.min(agg)) / (np.ptp(agg) + 1e-12)
    win_samples = max(1, int((smooth_ms/1000.0) * fs))
    agg_smooth = moving_average(agg, win_samples)
    
    # 4) 初始候选峰(较宽松)
    initial_threshold = np.percentile(agg_smooth, initial_percentile)
    min_distance = int(min_rr_sec * fs)
    # 设置一个绝对高度阈值。所有检测到的峰值, 其实际幅值(在 agg_smooth 数组中的值)都必须大于或等于这个 initial_threshold
    peaks0, props0 = signal.find_peaks(agg_smooth, height=initial_threshold, distance=min_distance)
    if peaks0.size == 0:
        # 无候选峰 -> 返回空
   returnnp.array([], dtype=int), np.array([]), np.array([])
  heights = agg_smooth[peaks0]
  prominences = signal.peak_prominences(agg_smooth, peaks0)[0]
  median_h = np.median(heights)
  median_prom = np.median(prominences)
 
 # 5) 二次筛选:基于 height / prominence / relative-to-prev
  accepted = []
  prev_height =None
 fori, pinenumerate(peaks0):
    h = heights[i]
    prom = prominences[i]ifi < len(prominences) else 0.0
        cond_amp = (h >=max(initial_threshold, median_h * amp_factor))
    cond_prom = (prom >=max(median_prom * prom_factor, prom_min))
   ifnot(cond_amporcond_prom):
     continue
   ifprev_heightisnotNone:
     ifh < prev_height * rel_factor:
                # 允许 prominence 很大时仍保留
                if prom < max(median_prom * 0.5, prom_min):
                    continue
        accepted.append(p)
        prev_height = h
    # 若被过滤空, 则退回用初始阈值的峰作为安全兜底
    if len(accepted) == 0:
        accepted = [int(p) for p,h in zip(peaks0, heights) if h >= initial_threshold]
 
 # 6) 在带通信号上校正到局部最大(提高定位精度)
  corrected = []
  search_radius =int(0.05* fs)
 forpinaccepted:
    lo =max(0, p - search_radius)
    hi =min(L, p + search_radius +1)
    seg = ecg_f[lo:hi]
   ifseg.size ==0:
     continue
    local_idx = np.argmax(np.abs(seg))
    corr = lo + local_idx
   iflen(corrected) ==0or(corr - corrected[-1]) > (min_distance //2):
      corrected.append(int(corr))
  peaks_idx = np.array(corrected, dtype=int)
 
 # 7) 计算 BPM
  bpm_times, bpm_values = _compute_bpm_from_peaks(peaks_idx, fs, smooth_N=3)
 
 # 8) debug 绘图
 ifdebug_plot:
    t = np.arange(L) / fs
    plt.figure(figsize=(12,9))
    ax1 = plt.subplot(4,1,1)
    plt.plot(t, ecg, label='raw ECG', alpha=0.5)
    plt.plot(t, ecg_f, label='filtered ECG', linewidth=1.0)
    plt.scatter(peaks_idx/fs, ecg_f[peaks_idx], color='r', label='final R peaks')
    plt.legend(loc='upper right'); plt.title('ECG and detected peaks')
    ax2 = plt.subplot(4,1,2, sharex=ax1)
   # 展示被选 detail 的系数(堆叠)
    selected_coefs = np.vstack([np.abs(swt_coeffs[l-1][1])forlindetail_levelsif1<=l<=level])
        if selected_coefs.size >0:
      extent = [0, L/fs,min(detail_levels)-0.5,max(detail_levels)+0.5]
      plt.imshow(selected_coefs, aspect='auto', origin='lower', extent=extent)
      plt.colorbar(label='|SWT detail|')
    plt.ylabel('detail level'); plt.title('Selected SWT detail coefficients')
    ax3 = plt.subplot(4,1,3, sharex=ax1)
    plt.plot(t, agg, label='agg (norm)', alpha=0.4)
    plt.plot(t, agg_smooth, label='agg smoothed', linewidth=1.2)
    plt.hlines(initial_threshold, t[0], t[-1], colors='k', linestyles='--', label=f'init thr={initial_threshold:.3f}')
    plt.scatter(peaks0/fs, agg_smooth[peaks0], color='g', s=20, label='candidates')
    plt.scatter(np.array(accepted)/fs, agg_smooth[np.array(accepted)], facecolors='none', edgecolors='r', s=60, label='accepted (pre-correct)')
    plt.legend(loc='upper right'); plt.title('Aggregated SWT energy and candidates')
    ax4 = plt.subplot(4,1,4, sharex=ax1)
   iflen(bpm_times) >0:
      plt.plot(bpm_times, bpm_values,'-o')
      plt.xlabel('Time (s)'); plt.ylabel('BPM'); plt.title('Estimated BPM (smoothed)')
      plt.grid(True)
   else:
      plt.text(0.1,0.5,'No BPM computed (need >=2 peaks)', transform=ax4.transAxes)
    plt.tight_layout()
    plt.show()
 returnpeaks_idx, bpm_times, bpm_values


defread_ecg_from_csv(csv_path, fs=360, duration_sec=20):
 """
  从 CSV 读取 ECG 数据, 并截取前 duration_sec 秒
  参数:
    csv_path: CSV文件路径
    fs: 采样率 (Hz), 默认360
    duration_sec: 需要读取的时长(秒), 默认20
  返回:
    t: 时间轴 (秒)
    ecg_segment: 截取后的 ECG 信号
  """
 # ---------- 读取 CSV 并提取 ECG 列 ----------
  df = pd.read_csv(csv_path)
 
 if'MLII'indf.columns:
    ecg_full = df['MLII'].values.astype(float)
 elifdf.shape[1] >=2:
    ecg_full = df.iloc[:,1].values.astype(float)
 else:
   raiseValueError("未找到 ECG 列, 请确认 CSV 列名或格式。")
 
 # 计算需要截取的样本数
  n_samples_needed =int(duration_sec * fs)
 
 # 截取前 n_samples_needed 个样本
 iflen(ecg_full) >= n_samples_needed:
    ecg_segment = ecg_full[:n_samples_needed]
 else:
   print(f"警告: 原始数据只有{len(ecg_full)}个样本 (< {n_samples_needed}), 将使用全部数据")
        ecg_segment = ecg_full
        n_samples_needed = len(ecg_full)
    
    # 生成时间轴
    if 'time_s' in df.columns:
        # 如果CSV有时间列, 也相应截取
        times_full = df['time_s'].values
        t = times_full[:n_samples_needed]
    else:
        t = np.arange(n_samples_needed) / fs
    
    return t, ecg_segment


if __name__ == "__main__":
    csv_path = r"Your_path_to_ECG_data122_60s.csv"
    t, ecg = read_ecg_from_csv(csv_path=csv_path, fs=360, duration_sec= 20)
    
    peaks_swt, bpm_times_swt, bpm_values_swt = detect_beats_swt(
        ecg, fs,
        wavelet='db4',
        level=5,
        detail_levels=(3,4,5),
        pre_band=(0.5,45.0),
        smooth_ms=30,
        initial_percentile=85,
        amp_factor=0.6,
        prom_factor=0.5,
        prom_min=0.05,
        rel_factor=0.45,
        min_rr_sec=0.20,
        debug_plot=True
    )
    print("SWT 检测:", len(peaks_swt), "peaks; 最后 BPM:", (bpm_values_swt[-1] if len(bpm_values_swt)>0elseNone))

DWT检测心率

DWT检测新的程序流程和SWT类似,也是把原信号通过小波变换(这里采用的仍然是db4)分解信号,然后在信号中把需要的频率范围内的系数相加,重建信号(相对于抓住信号中需要的特征的滤波后的信号),然后在基于这个重建信号对峰值进行检测,平滑处理,再校正,然后计算BPM后,输出处理后的数据到图。

333af77c-2ff1-11f1-90a1-92fbcf53809c.png

3395410a-2ff1-11f1-90a1-92fbcf53809c.png

DWT小波变换检测心率

DWT小波变换检测心率程序源码:

"""
基于 DWT 的 ECG R 峰检测(通过重构选定 detail 级别)
适合: 计算资源受限或实时嵌入式场景(优点是计算量低)
缺点: DWT 非平移不变,检测位置对相位/起点敏感;可以通过 cycle spinning(多次平移重构求平均)缓解。
依赖: pip install numpy scipy matplotlib pywavelets
"""
importnumpyasnp
importscipy.signalassignal
importmatplotlib.pyplotasplt
importpywt
fromscipy.signal.windowsimportgaussian
importpandasaspd
# ---------- 参数 ----------
fs =360.0
wavelet ='db4'
dwt_level =4
# 对于 360Hz,QRS 主要在 D3-D5 范围(经验),可选 detail_levels_dwt = [3,4,5]
detail_levels_dwt = [3,4,5]
pre_band = (0.5,45.0)
min_rr_sec =0.20
smooth_ms =30
threshold_percentile =75
debug_plot =True
defbandpass_filter(x, fs, lowcut, highcut, order=3):
  nyq =0.5* fs
  b, a = signal.butter(order, [lowcut/nyq, highcut/nyq], btype='band')
 returnsignal.filtfilt(b, a, x)


defmoving_average(x, win_samples):
 ifwin_samples <= 1:
        return x
    kernel = np.ones(win_samples)/win_samples
    return np.convolve(x, kernel, mode='same')


def detect_beats_dwt(ecg, fs,
                     wavelet='db4',
                     level=6,
                     detail_levels=(3,4,5),
                     pre_band=(0.5,45.0),
                     smooth_ms=30,
                     threshold_percentile=70,
                     min_rr_sec=0.20,
                     debug_plot=False):
    """
    DWT 方法: 对 signal 做 wavedec 分解,然后把不需要的系数置零,再用 waverec 重构出只含 QRS 频带的信号,
    在重构信号上做能量聚合和平滑检测。
    返回: peaks_idx, bpm_times, bpm_values
    """
    # 1) 预处理
    if pre_band is not None:
        ecg_f = bandpass_filter(ecg, fs, pre_band[0], pre_band[1])
    else:
        ecg_f = ecg.copy()
    
    # 2) DWT 分解
    coeffs = pywt.wavedec(ecg_f, wavelet, level=level)
    # coeffs: [cA_n, cD_n, cD_{n-1}, ..., cD1] 长度 level+1
    
    # 3) 将不选的 detail 置零,只保留目标 detail 级别
    coeffs_sel = [None] * len(coeffs)
    # cA_n 保留为零(若想也保留可调整),这里置零以强调细节
    coeffs_sel[0] = np.zeros_like(coeffs[0])
    # detail indices in coeffs: coeffs[1] is cD_n, coeffs[-1] is cD1
    # mapping: detail level j (1..n) corresponds to coeffs index = len(coeffs)-j
    for j in range(1, level+1):
        idx = len(coeffs) - j
        if j in detail_levels:
            coeffs_sel[idx] = coeffs[idx]  # 保留
        else:
            coeffs_sel[idx] = np.zeros_like(coeffs[idx])
    
    # 4) 重构信号(只含所选 detail)
    recon = pywt.waverec(coeffs_sel, wavelet)
    # waverec 可能导致重构长度与原始略有差异,截断或填充
    recon = recon[:len(ecg_f)]
    
    # 5) 能量聚合、平滑
    agg = np.abs(recon)
    agg = (agg - np.min(agg)) / (np.ptp(agg) + 1e-12)
    win_samples = max(1, int((smooth_ms/1000.0) * fs))
    agg_smooth = moving_average(agg, win_samples)
    
    # 6) 峰检测
    threshold = np.percentile(agg_smooth, threshold_percentile)
    min_distance = int(min_rr_sec * fs)
    peaks0, props = signal.find_peaks(agg_smooth, height=threshold, distance=min_distance)
    
    # 7) 在滤波后的原始信号上校正峰位置
    corrected = []
    L = len(ecg_f)
    search_radius = int(0.05 * fs)
    for p in peaks0:
        lo = max(0, p - search_radius)
        hi = min(L, p + search_radius + 1)
        seg = ecg_f[lo:hi]
        if seg.size == 0: continue
        local_idx = np.argmax(np.abs(seg))
        corr = lo + local_idx
        if len(corrected) == 0 or (corr - corrected[-1]) > (min_distance //2):
      corrected.append(int(corr))
  peaks_idx = np.array(corrected, dtype=int)
 
 # 8) 计算 BPM
  bpm_times = []
  bpm_values = []
 foriinrange(1,len(peaks_idx)):
    rr = (peaks_idx[i] - peaks_idx[i-1]) / fs
   ifrr <= 0: continue
        inst_bpm = 60.0 / rr
        lookback = 3
        j0 = max(1, i - lookback + 1)
        rr_list = [(peaks_idx[k] - peaks_idx[k-1]) / fs for k in range(j0, i+1)]
        smooth_bpm = 60.0 / (np.mean(rr_list)) if len(rr_list)>0elseinst_bpm
    bpm_times.append(peaks_idx[i] / fs)
    bpm_values.append(smooth_bpm)
  bpm_times = np.array(bpm_times)
  bpm_values = np.array(bpm_values)
 
 # 9) debug 绘图
 ifdebug_plot:
    t = np.arange(len(ecg)) / fs
    plt.figure(figsize=(12,8))
    ax1 = plt.subplot(3,1,1)
    plt.plot(t, ecg, label='raw ECG', alpha=0.6)
    plt.plot(t, ecg_f, label='filtered', alpha=0.9)
    plt.scatter(peaks_idx/fs, ecg_f[peaks_idx], color='r', label='detected R peaks')
    plt.legend(); plt.title('ECG and peaks')
    ax2 = plt.subplot(3,1,2, sharex=ax1)
    plt.plot(t, recon, label='reconstructed (selected D levels)')
    plt.legend(); plt.title('Reconstructed detail-band signal')
    ax3 = plt.subplot(3,1,3, sharex=ax1)
    plt.plot(t, agg_smooth, label='agg_smooth')
    plt.hlines(threshold, t[0], t[-1], linestyles='--', label=f'thr={threshold:.2f}')
    plt.scatter(peaks0/fs, agg_smooth[peaks0], color='g', label='peaks before correction')
    plt.legend(); plt.title('Aggregated envelope')
    plt.tight_layout(); plt.show()
   
   # 新增: 单独的心率随时间图(独立 figure)
    plt.figure(figsize=(10,4))
   ifbpm_times.size >0:
      plt.plot(bpm_times, bpm_values, marker='o', linestyle='-', color='tab:orange')
      plt.xlabel('Time (s)')
      plt.ylabel('Heart Rate (BPM)')
      plt.title('Heart Rate over Time (smoothed)')
      plt.grid(True)
   else:
     # 若未检测到心搏,给出提示性图形
      plt.text(0.5,0.5,'No heartbeats detected to plot', ha='center', va='center', fontsize=12)
      plt.title('Heart Rate over Time (no detections)')
      plt.axis('off')
    plt.show()
 returnpeaks_idx, bpm_times, bpm_values


defread_ecg_from_csv(csv_path, fs=360, duration_sec=20):
 """
  从 CSV 读取 ECG 数据,并截取前 duration_sec 秒
  参数:
    csv_path: CSV文件路径
    fs: 采样率 (Hz),默认360
    duration_sec: 需要读取的时长(秒),默认20
  返回:
    t: 时间轴 (秒)
    ecg_segment: 截取后的 ECG 信号
  """
 # ---------- 读取 CSV 并提取 ECG 列 ----------
  df = pd.read_csv(csv_path)
 
 if'MLII'indf.columns:
    ecg_full = df['MLII'].values.astype(float)
 elifdf.shape[1] >=2:
    ecg_full = df.iloc[:,1].values.astype(float)
 else:
   raiseValueError("未找到 ECG 列,请确认 CSV 列名或格式。")
 
 # 计算需要截取的样本数
  n_samples_needed =int(duration_sec * fs)
 
 # 截取前 n_samples_needed 个样本
 iflen(ecg_full) >= n_samples_needed:
    ecg_segment = ecg_full[:n_samples_needed]
 else:
   print(f"警告: 原始数据只有{len(ecg_full)}个样本 (< {n_samples_needed}),将使用全部数据")
        ecg_segment = ecg_full
        n_samples_needed = len(ecg_full)
    
    # 生成时间轴
    if 'time_s' in df.columns:
        # 如果CSV有时间列,也相应截取
        times_full = df['time_s'].values
        t = times_full[:n_samples_needed]
    else:
        t = np.arange(n_samples_needed) / fs
    
    return t, ecg_segment


if __name__ == "__main__":
    csv_path = r"C:py_TestCodePy_codeswaveletsignal_change_detect122_60s.csv"
    t, ecg = read_ecg_from_csv(csv_path=csv_path, fs=360, duration_sec= 20)
    
    peaks_idx, bpm_times, bpm_values = detect_beats_dwt(
        ecg, fs,
        wavelet=wavelet,
        level=dwt_level,
        detail_levels=detail_levels_dwt,
        pre_band=pre_band,
        smooth_ms=smooth_ms,
        threshold_percentile=threshold_percentile,
        min_rr_sec=min_rr_sec,
        debug_plot=debug_plot
    )
    print(f"检测到 {len(peaks_idx)} 个 R 峰")
    if len(bpm_values)>0:
   print(f"最后 BPM ={bpm_values[-1]:.1f}bpm (time{bpm_times[-1]:.1f}s)")

比较CWT,SWT和DWT三种小波变换检测心率的结果

小编把三个演示程序的最后输出额外添加了心率BMP的输出,可见检测结果在心电图的前20个心率值完全相同:

CWT (bpm) SWT (bpm) DWT (bpm)
92.7 92.7 92.7
92.1 92.1 92.1
91.8 91.8 91.8
93.6 93.6 93.6
93.6 93.6 93.6
94.0 94.0 94.0
91.0 91.0 91.0
90.9 90.9 90.9
89.9 89.9 89.9
90.3 90.3 90.3
89.4 89.4 89.4
89.1 89.1 89.1
87.9 87.9 87.9
87.1 87.1 87.1
86.4 86.4 86.4
86.3 86.3 86.3
86.7 86.7 86.7
87.3 87.3 87.3
86.9 86.9 86.9
86.3 86.3 86.3

结语

代码有点冗长,但是也只触及到小波变换的冰山一角。所谓抛砖引玉,如果可以有助于各位理解小波变换和信号处理,那确实是心满意足了。

这里仍然要引用一下ECG的数据来源。[1][2]

[参考]

[1] G. B. Moody and R. G. Mark, “The impact of the MIT-BIH Arrhythmia Database,”IEEE Eng. Med. Biol. Mag., vol. 20, no. 3, pp. 45–50, May 2001, doi: 10.1109/51.932724.
[2] A. L. Goldberger et al., “PhysioBank, PhysioToolkit, and PhysioNet: Components of a new research resource for complex physiologic signals,”Circulation, vol. 101, no. 23, pp. e215–e220, Jun. 2000, doi: 10.1161/01.CIR.101.23.e215.

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 小波变换
    +关注

    关注

    2

    文章

    184

    浏览量

    30534
  • 心电图
    +关注

    关注

    1

    文章

    82

    浏览量

    26028

原文标题:小波变换(4):平稳小波变换变换(SWT)和离散小波变换(DWT)方式从心电图数据获取心率

文章出处:【微信号:安费诺传感器学堂,微信公众号:安费诺传感器学堂】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    #参考设计#可穿戴心电图设计方案

    可穿戴心电图参考设计可测量心率数据和运动,并实现物联网连接以实现健康管理。 *附件:可穿戴心电图参考设计.pdf 心电图 (ECG) 心脏
    的头像 发表于 06-28 18:19 1w次阅读
    #参考设计#可穿戴<b class='flag-5'>心电图</b>设计方案

    利用深度循环神经网络对心电图降噪

    物理现象。此外,它假设了一个非常简单的 噪声模型。然而,它比真实数据更容易学习。 因此,经过合成数据训练的网络能够更快地了 解心电图信号的\"本质\",并利用这些知识
    发表于 05-15 14:42

    可通过iPhone监控心率心电图仪参考设计

    导读】心电图仪可以采集和监控心率数据,并通过智能手机上的应用程序显示心电图心率数据,并且可以选
    发表于 11-11 15:57

    心电图数据处理和波形显示包括心率,RR值等的参数显示..

    心电图数据处理和波形显示包括心率,RR值等的参数显示....还有波形的存储与回放,算是一个小系统,给大家参考一下
    发表于 11-29 11:51

    AMEYA360设计方案丨基于 NXP 的心电图

    记录器:热敏式记录器是利用加热烧结在陶瓷基片上的半导体加热点,在遇热显色的热敏纸上烫出图形及字符的。(三)按供电方式分类按供电方式来分,可分为直流式、交流式和交、直两用式心电图机。其中
    发表于 03-07 15:00

    ad8232串口显示心率心电图

    我想用arduino实现心电图以及心率的显示,不太会用ad8232芯片
    发表于 04-24 11:11

    如何对心电图(ECG)信号进行简单的分析和心率计算

    这个例子演示了如何对心电图(ECG)信号进行简单的分析和心率计算。This example shows how to do a simple analysis
    发表于 12-30 08:38

    【快速上手手册】疯壳·四合一健康监测模组(心率血压血氧心电

    关闭模组,如下图所示。12在模式 1 和 2 中如果手没能紧贴光源,会返回 ERROR:-30 为正常现象,意味着手已经和光源脱离,重新紧贴光源即可。2.5 获取心电图原始数据在发送
    发表于 07-12 19:13

    【快速上手手册】疯壳·四合一健康监测模组(心率血压血氧心电

    关闭模组,如下图所示。12在模式 1 和 2 中如果手没能紧贴光源,会返回 ERROR:-30 为正常现象,意味着手已经和光源脱离,重新紧贴光源即可。2.5 获取心电图原始数据在发送
    发表于 08-18 10:13

    心电图,什么是心电图

    什么是心电图 心电图 心脏是循环系统中重要的器官。由于心脏不断地进行有节奏的收缩和舒张活动,
    发表于 09-04 08:27 2916次阅读
    <b class='flag-5'>心电图</b>,什么是<b class='flag-5'>心电图</b>

    怎样用Arduino微控制器和AD8232制作心电图并测量心率

    分析和监测心率的有效方法是通过心电图(ECG)心脏监测系统。
    的头像 发表于 07-30 11:09 2.1w次阅读

    苹果设计新的 Apple Watch 测量心电图(ECG)的算法

    根据 iOS 14.3 和 watchOS 7.2 betas 的开发者文档,苹果设计了一种新的 Apple Watch 测量心电图(ECG)的算法。 在官方文档中,增加了一个新的 “第 2
    的头像 发表于 12-10 09:31 3627次阅读

    在OLED屏幕上获取实时心电图

    电子发烧友网站提供《在OLED屏幕上获取实时心电图.zip》资料免费下载
    发表于 12-23 15:13 1次下载
    在OLED屏幕上<b class='flag-5'>获取</b>实时<b class='flag-5'>心电图</b>

    心电图仪简介

    心电图(ECG或EKG)是与心肌相关的电信号相对于时间的测量和图形表示。心电图的应用范围监测心率到诊断特定的心脏病。ECG测量的基础知识对于所有应用都是相同的,但电气元件的细节和要求
    的头像 发表于 02-27 16:17 5720次阅读
    <b class='flag-5'>心电图</b>仪简介

    使用MSP430FG439的心率心电图监测器

    电子发烧友网站提供《使用MSP430FG439的心率心电图监测器.pdf》资料免费下载
    发表于 10-22 09:30 0次下载
    使用MSP430FG439的<b class='flag-5'>心率</b>和<b class='flag-5'>心电图</b>监测器