在旋转机械故障诊断领域,如何从强噪声干扰的振动信号中提取敏感特征始终是核心难题。经典的深度学习模型如卷积神经网络(Convolutional Neural Networks, CNN)在实验室干净数据集上表现优异,但在面对复杂的工业实测数据时,冗余的噪声特征可能会导致模型准确率有所下降。为了解决这一问题,论文“Deep Residual Shrinkage Networks for Fault Diagnosis”提出了一种创新的结构——深度残差收缩网络(Deep Residual Shrinkage Network, DRSN)。
DRSN的核心思想在于将“软阈值化 (Soft Thresholding)”这一经典的信号处理降噪技术集成到残差网络中。通过引入注意力机制,模型能够自适应地学习每一组特征图的收缩阈值。在特征传递的过程中,那些接近于零的、被视为噪声的特征会被自动置为零,而强特征则得以保留。这种结构有助于提高模型在强噪声环境下的鲁棒性(即抗干扰能力),还实现了端到端的自适应特征提取,无需依赖复杂的专家先验知识。
1.从残差块到自适应收缩
DRSN的核心组件是“带有通道级阈值的残差收缩构建块 (Residual Shrinkage Building Unit with Channel-wise thresholds, RSBU-CW)”。该模块在传统残差学习的基础上,并行了一个用于计算阈值的子网络。
在RSBU-CW模块中,输入特征经过两次卷积和批归一化(Batch Normalization, BN)处理后,会进入一个注意力分支。首先,通过取绝对值和全局平均池化(Global Average Pooling, GAP)将空间维度的特征压缩,计算出每个通道的绝对值平均值。接着,利用两个全连接层和Sigmoid激活函数学习出一个缩放因子α,取值范围在0到1之间。收缩阈值τ的计算公式为:τ = α * average(abs(x))。
得到阈值后,模型应用软阈值化算子处理特征图,公式为:y = sign(x) * max(abs(x) - τ, 0)。这种设计允许模型为每个特征通道独立设置阈值。
图1. 深度残差收缩网络
2. 实验设置
为了验证DRSN-CW的性能,选择了轴承诊断领域的标准基准——西储大学(CWRU)轴承数据集。实验涵盖了正常状态以及内圈故障、外圈故障和滚珠故障10类标签。每个样本采用1024个采样点的滑动窗口进行切分。
图2. 类别划分
在数据工程模块,除了常规的标准化处理,还设计了一套“在线实时增强”流水线,以模拟极端的工业场景。这包括:
(1)环移位(Rolling Shift):模拟传感器采样起始时刻的不确定性。
(2)瞬态冲击注入:模拟机器偶尔出现的磕碰干扰。
(3)加性高斯白噪声(Additive White Gaussian Noise, AWGN):在训练过程中动态混合不同信噪比(Signal-to-Noise Ratio, SNR)的噪声,尽量让模型在各种环境下保持特征一致性。特别地,构建了一个SNR为-8dB的测试环境,这在工业诊断中属于较强的背景噪声干扰。
具体的TensorFlow代码如下:
""" 项目名称:深度残差收缩网络 (DRSN-CW) - 旋转机械故障诊断复现 论文参考:Zhao, M., et al. "Deep Residual Shrinkage Networks for Fault Diagnosis," IEEE TII, 2020. 算法核心逻辑: 1. 软阈值化 (Soft Thresholding):通过非线性映射,将接近于零的噪声特征置为零,保留强特征。 2. 注意力机制 (Attention):利用小型子网络自动学习每个通道的收缩阈值,实现自适应去噪。 3. 残差学习 (Residual Learning):解决深层网络梯度消失问题,确保特征传递的稳定性。 """ import os import sys import logging import numpy as np import scipy.io as sio import tensorflow as tf from tensorflow.keras import layers, Model, regularizers from sklearn.model_selection import train_test_split as split_data # ============================================================================= # 1. 环境与资源配置模块 # ============================================================================= logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s') class GPUConfig: """ 计算资源管理器:负责 TensorFlow 运行时环境的初始化与硬件加速配置。 """ @staticmethod def init_tf(): """ 配置计算后端: - 抑制冗余日志:减少非关键性的系统警告。 - 显存按需分配:防止 TensorFlow 启动时预占全部显存,允许与其他进程共用 GPU。 """ os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' physical_gpu_list = tf.config.list_physical_devices('GPU') if physical_gpu_list: try: for gpu_device in physical_gpu_list: # 开启显存动态增长模式 tf.config.experimental.set_memory_growth(gpu_device, True) logging.info("GPU 硬件加速就绪:检测到 {0} 个计算单元,已启用动态显存模式。".format(len(physical_gpu_list))) except RuntimeError as hardware_error: logging.warning("GPU 后端配置失败(可能已被占用): %s", hardware_error) else: logging.info("未检测到 GPU,系统将使用 CPU 进行计算(训练速度可能受限)。") # 执行全局初始化 GPUConfig.init_tf() # ============================================================================= # 2. 数据工程模块 (ETL - Extract, Transform, Load) # ============================================================================= class CWRULoader: """ CWRU 数据集解析器:负责原始 .mat 振动信号的读取、分段与特征重构。 """ def __init__(self, dataset_root, window_size=1024): """ :param dataset_root: 数据集存储根目录 :param window_size: 样本长度(窗口步长,通常设为 1024 或 2048) """ self.base_directory = os.path.abspath(dataset_root) self.sample_length = window_size self.sampling_interval = window_size def _parse_mat_content(self, target_file): """ 从 MATLAB 容器中提取驱动端(DE)时间序列数据。 """ try: storage = sio.loadmat(target_file) for identifier in storage.keys(): # 匹配驱动端加速度计信号键名 if 'DE_time' in identifier: return storage[identifier].flatten() except Exception as parse_error: logging.debug("读取文件 %s 异常: %s", target_file, parse_error) return None return None def load_data(self, category_dictionary): """ 构建训练数据集。 :param category_dictionary: 标签与文件名的映射关系字典。 :return: (X_data, y_label) 的 Numpy 数组。 """ feature_collection, label_collection = [], [] is_data_found = False for class_idx, name_list in category_dictionary.items(): for filename in name_list: full_path = os.path.join(self.base_directory, "{0}.mat".format(filename)) if not os.path.exists(full_path): continue vibration_series = self._parse_mat_content(full_path) if vibration_series is None: continue is_data_found = True # 非重叠滑动窗口采样:将长序列切割为定长的样本块 for pointer in range(0, len(vibration_series) - self.sample_length + 1, self.sampling_interval): sub_sequence = vibration_series[pointer : pointer + self.sample_length] feature_collection.append(sub_sequence) label_collection.append(class_idx) if not is_data_found: raise FileNotFoundError("路径下未找到 CWRU 相关 .mat 文件,请检查路径。") return np.array(feature_collection, dtype='float32'), np.array(label_collection, dtype='int32') def add_awgn(signal_input, snr_value): """ 加性高斯白噪声 (AWGN) 注入模块。 用于模拟真实工业场景下的背景噪声,测试模型的鲁棒性。 计算公式:P_noise = P_signal / 10^(SNR/10) """ signal_input = np.array(signal_input) random_engine = np.random.default_rng() # 支持固定 SNR 或 SNR 范围随机采样 target_snr = snr_value if not isinstance(snr_value, (list, tuple)) else random_engine.uniform(snr_value[0], snr_value[1]) # 计算信号功率并推导噪声标准差 signal_power = np.mean(np.square(signal_input), axis=1, keepdims=True) noise_variance = signal_power / (10 ** (target_snr / 10.0)) noise_component = random_engine.normal(0, np.sqrt(noise_variance), signal_input.shape) return (signal_input + noise_component).astype('float32') # ============================================================================= # 3. 神经网络组件定义 (DRSN Core) # ============================================================================= class SoftThresholdOperator(layers.Layer): """ 软阈值化算子 (Custom Layer): DRSN 的非线性核心,通过阈值 tau 对特征映射进行收缩处理。 公式:y = sign(x) * max(|x| - tau, 0) """ def __init__(self, **kwargs): super(SoftThresholdOperator, self).__init__(**kwargs) def call(self, inputs): """ x_conv: 输入特征图 (Batch, Steps, Channels) tau: 学习到的阈值 (Batch, Channels) """ x_conv, tau = inputs # 将阈值扩展至与特征图空间维度匹配 expanded_tau = tf.expand_dims(tau, axis=1) return tf.sign(x_conv) * tf.maximum(tf.abs(x_conv) - expanded_tau, 0.0) class RSBU_CW(layers.Layer): """ 残差收缩构建块 (Residual Shrinkage Building Unit with Channel-wise thresholds): 集成了多通道注意力机制的残差块,能够为每个通道独立生成阈值。 """ def __init__(self, filters, kernel_size, strides=1, **kwargs): super(RSBU_CW, self).__init__(**kwargs) self.num_kernels = filters self.step_size = strides self.width = kernel_size self.weight_decay = regularizers.l2(1e-4) # 恒等映射路径 (Residual Shortcut) self.shortcut = None # 主变换分支:采用经典的 BN-ReLU-Conv 结构 self.bn_alpha = layers.BatchNormalization() self.relu_alpha = layers.Activation('relu') self.conv_alpha = layers.Conv1D(filters, kernel_size, strides=strides, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay) self.bn_beta = layers.BatchNormalization() self.relu_beta = layers.Activation('relu') self.conv_beta = layers.Conv1D(filters, kernel_size, strides=1, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay) # 注意力子网络:计算通道级收缩阈值 self.gap = layers.GlobalAveragePooling1D() self.fc1 = layers.Dense(filters, kernel_initializer='he_normal') self.bn_gamma = layers.BatchNormalization() self.relu_gamma = layers.Activation('relu') self.fc2 = layers.Dense(filters, activation='sigmoid') # 归一化缩放因子 self.threshold_op = SoftThresholdOperator() def build(self, input_dim): """ 动态调整 Shortcut:当步长不为 1 或通道数变化时,使用 1x1 卷积对齐残差。 """ if self.step_size != 1 or input_dim[-1] != self.num_kernels: self.shortcut = tf.keras.Sequential([ layers.Conv1D(self.num_kernels, 1, strides=self.step_size, padding='same', use_bias=False), layers.BatchNormalization() ]) super(RSBU_CW, self).build(input_dim) def call(self, layer_inputs): """ 逻辑流:特征提取 -> 通道全局特征感知 -> 动态阈值计算 -> 软阈值降噪 -> 残差相加 """ identity = layer_inputs if self.shortcut: identity = self.shortcut(layer_inputs) # 两次卷积处理得到中间特征图 x_conv x_conv = self.bn_alpha(layer_inputs) x_conv = self.relu_alpha(x_conv) x_conv = self.conv_alpha(x_conv) x_conv = self.bn_beta(x_conv) x_conv = self.relu_beta(x_conv) x_conv = self.conv_beta(x_conv) # 计算特征图各通道的绝对值均值作为全局统计量 x_abs = tf.abs(x_conv) abs_mean = self.gap(x_abs) # 通过子网络输出 alpha (0,1),阈值 tau = alpha * abs_mean z = self.fc1(abs_mean) z = self.bn_gamma(z) z = self.relu_gamma(z) alpha = self.fc2(z) tau = tf.multiply(alpha, abs_mean) # 应用软阈值收缩并进行残差融合 denoised_output = self.threshold_op([x_conv, tau]) return layers.Add()([denoised_output, identity]) class DRSN_CW(Model): """ DRSN-CW 完整架构: 将多个 RSBU 模块顺序堆叠,最后通过全连接层进行故障分类。 """ def __init__(self, num_classes): super(DRSN_CW, self).__init__(name="Bearing_Fault_DRSN") self.weight_decay = regularizers.l2(1e-4) # 输入层:初步感知一维时序信号 self.conv1 = layers.Conv1D(32, 15, strides=2, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay) self.bn1 = layers.BatchNormalization() self.relu1 = layers.Activation('relu') # 构建收缩残差块序列 (特征维度由 32 逐渐扩展至 128) self.rsbu_blocks = [ RSBU_CW(32, 5, strides=2), RSBU_CW(32, 5, strides=1), RSBU_CW(64, 5, strides=2), RSBU_CW(64, 5, strides=1), RSBU_CW(128, 5, strides=2), RSBU_CW(128, 5, strides=1) ] # 输出头:降维后映射至分类空间 self.post_norm = layers.BatchNormalization() self.post_relu = layers.Activation('relu') self.gap_layer = layers.GlobalAveragePooling1D() self.classifier = layers.Dense(num_classes, activation='softmax', kernel_regularizer=self.weight_decay) def call(self, network_input): """ 端到端正向推理流程。 """ x = self.conv1(network_input) x = self.bn1(x) x = self.relu1(x) for block in self.rsbu_blocks: x = block(x) x = self.post_norm(x) x = self.post_relu(x) x = self.gap_layer(x) return self.classifier(x) # ============================================================================= # 4. 训练、增强与性能评估流 # ============================================================================= def train_and_test(dataset_path, seq_len=1024): """ 全流程控制器:涵盖数据预处理、在线增强、模型训练及极端环境(-8dB)评估。 """ # 定义故障类别(基于 CWRU 文件命名规则) label_map = { 0: ['Normal_0', 'Normal_1', 'Normal_2', 'Normal_3'], 1: ['IR007_0', 'IR007_1', 'IR007_2', 'IR007_3'], 2: ['IR014_0', 'IR014_1', 'IR014_2', 'IR014_3'], 3: ['IR021_0', 'IR021_1', 'IR021_2', 'IR021_3'], 4: ['B007_0', 'B007_1', 'B007_2', 'B007_3'], 5: ['B014_0', 'B014_1', 'B014_2', 'B014_3'], 6: ['B021_0', 'B021_1', 'B021_2', 'B021_3'], 7: ['OR007@6_0', 'OR007@6_1', 'OR007@6_2', 'OR007@6_3'], 8: ['OR014@6_0', 'OR014@6_1', 'OR014@6_2', 'OR014@6_3'], 9: ['OR021@6_0', 'OR021@6_1', 'OR021@6_2', 'OR021@6_3'] } data_engine = CWRULoader(dataset_root=dataset_path, window_size=seq_len) try: signals, labels = data_engine.load_data(label_map) except Exception as data_err: logging.error("数据加载失败: %s", data_err) return # 随机划分:70% 训练,15% 验证,15% 测试 train_x_pre, temp_x, train_y_pre, temp_y = split_data( signals, labels, test_size=0.3, random_state=42 ) val_x_pre, test_x_pre, val_y_pre, test_y_pre = split_data( temp_x, temp_y, test_size=0.5, random_state=42 ) # 标准化处理:使用训练集均值和标准差,防止测试信息泄露 mu, sigma = np.mean(train_x_pre), np.std(train_x_pre) def normalize(obs): return ((obs - mu) / sigma).reshape(-1, seq_len, 1) train_set_x = normalize(train_x_pre) val_set_x = normalize(val_x_pre) test_set_x = normalize(test_x_pre) # 标签进行 One-hot 编码 num_classes = len(label_map) train_set_y = tf.keras.utils.to_categorical(train_y_pre, num_classes).astype('float32') val_set_y = tf.keras.utils.to_categorical(val_y_pre, num_classes).astype('float32') test_set_y = tf.keras.utils.to_categorical(test_y_pre, num_classes).astype('float32') # 测试环境:注入极强噪声(-8dB)以验证模型在极端工业背景下的表现 val_x_awgn = add_awgn(val_set_x, snr_value=-8) test_x_awgn = add_awgn(test_set_x, snr_value=-8) def augment_batch(feat_batch, label_batch): """ 在线实时增强 (Online Data Augmentation): 1. 循环移位:模拟采样时刻的不确定性。 2. 瞬态冲击:模拟偶然出现的机器磕碰声。 3. 混合噪声:提升模型的抗噪阈值。 """ rand_gen = np.random.default_rng() augmented_x = feat_batch.copy() batch_n, steps_n, _ = augmented_x.shape # 随机相位平移 for sample_idx in range(batch_n): offset = rand_gen.integers(0, steps_n) augmented_x[sample_idx, :, 0] = np.roll(augmented_x[sample_idx, :, 0], offset) # 脉冲冲击噪声注入 (10% 概率) if rand_gen.random() > 0.9: for sample_idx in range(batch_n): if rand_gen.random() > 0.5: num_spikes = rand_gen.integers(1, 3) positions = rand_gen.integers(0, steps_n, num_spikes) spike_mag = np.std(augmented_x[sample_idx]) * rand_gen.uniform(1.5, 2.5) augmented_x[sample_idx, positions, 0] += spike_mag * rand_gen.choice([-1, 1], size=num_spikes) # 动态 SNR 混合 (50% 概率) if rand_gen.random() > 0.5: augmented_x = add_awgn(augmented_x, snr_value=(-8, 8)) return augmented_x.astype(np.float32), label_batch.astype(np.float32) def _tensor_spec_binding(f_tensor, l_tensor): """ 为 tf.data 显式绑定形状信息 """ f_tensor.set_shape([None, seq_len, 1]) l_tensor.set_shape([None, num_classes]) return f_tensor, l_tensor # 利用 tf.data 构建高吞吐数据流水线 training_pipeline = tf.data.Dataset.from_tensor_slices((train_set_x.astype('float32'), train_set_y)) training_pipeline = training_pipeline.shuffle(len(train_set_x)).batch(64) training_pipeline = training_pipeline.map( lambda x, y: tf.numpy_function(augment_batch, [x, y], [tf.float32, tf.float32]), num_parallel_calls=tf.data.AUTOTUNE ).map(_tensor_spec_binding).prefetch(tf.data.AUTOTUNE) # 模型实例化与编译 model_instance = DRSN_CW(num_classes=num_classes) model_instance.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.0), # 交叉熵损失 metrics=['accuracy'] ) logging.info("诊断系统启动:分类数=%d, 序列长度=%d", num_classes, seq_len) # 动态学习率调整与早停保护 optimization_callbacks = [ tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-6, verbose=1), tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) ] # 模型拟合 model_instance.fit( training_pipeline, epochs=100, validation_data=(val_x_awgn, val_set_y), callbacks=optimization_callbacks, verbose=2 ) # 在极低信噪比环境下最终验证性能 final_loss, final_acc = model_instance.evaluate(test_x_awgn, test_set_y, verbose=0) print("n" + "="*50) print("模型评估报告 (DRSN-CW)") print("评估背景:-8dB SNR (强噪声干扰环境)") print("最终识别准确率: {0:.2f}%".format(final_acc * 100)) print("="*50) # ============================================================================= # 程序入口 # ============================================================================= if __name__ == "__main__": # 配置默认的数据搜索目录 DATA_PATH = os.path.join(os.getcwd(), 'data_path') if not os.path.exists(DATA_PATH): logging.warning("未找到默认数据目录: %s", DATA_PATH) user_input_path = input("请输入 CWRU 原始数据集 (.mat) 所在的完整路径: ").strip() if user_input_path: DATA_PATH = user_input_path else: logging.critical("未提供有效路径,程序退出。") sys.exit(0) # 启动训练与测试 train_and_test(DATA_PATH, seq_len=1024)
3.强噪声环境下的诊断性能分析与复现总结
在复现实验中,使用了Adam优化器进行训练,并结合了学习率动态调整 (ReduceLROnPlateau) 策略。在注入了-8dB的高斯噪声后,DRSN-CW依然保持了90%以上的测试准确率。
图3. 实验结果
论文原文:
论文标题: Deep residual shrinkage networks for fault diagnosis
出版期刊: IEEE Transactions on Industrial Informatics. 2020, 16(7): 4681-4690.
DOI: 10.1109/TII.2019.2943898
https://ieeexplore.ieee.org/document/8850096
审核编辑 黄宇
-
神经网络
+关注
关注
42文章
4847浏览量
108388 -
python
+关注
关注
59文章
4892浏览量
90426
发布评论请先 登录
工业级 AI 神经网络语音处理模组 A-59 设计与应用研究
人工智能-Python深度学习进阶与应用技术:工程师高培解读
面向嵌入式部署的神经网络优化:模型压缩深度解析
自动驾驶中常提的卷积神经网络是个啥?
NMSIS神经网络库使用介绍
在Ubuntu20.04系统中训练神经网络模型的一些经验
CICC2033神经网络部署相关操作
液态神经网络(LNN):时间连续性与动态适应性的神经网络
如何在机器视觉中部署深度学习神经网络
无刷电机小波神经网络转子位置检测方法的研究
神经网络专家系统在电机故障诊断中的应用
神经网络RAS在异步电机转速估计中的仿真研究
基于FPGA搭建神经网络的步骤解析
面向强噪声数据的深度神经网络:深度残差收缩网络的Python编程复现
评论