0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

使用MCC118对交流电进行监测的过程分享

科技观察员 来源:mc_six 作者:mc_six 2022-04-24 17:07 次阅读

我们是通讯行业动力维护专业,因为外市电(三相交流电)的波动会对我们的设备造成一定的影响。于是想做一个交流电监测的应用,监测交流电的过压或欠压,所以我做了这么一个实时监测的工具。它可以在过压和欠压一定程度的时候告警。或者进行长期监测并记录电压数据,分析可能发生过压或欠压的原因。

树莓派作为一个简单易用功能强大的计算平台,很好地支持 Python。而且刚好有 MCC 118 数据采集模块(HAT)可以直接扩展出数据采集功能,MCC 118 提供 8 通道模拟电压输入,基于树莓派的数据采集/数据记录系统。每张 MCC 118 最大采样率为 100kS/s,可以进行单电压点和电压波形采集。

项目的代码就是在 MCC DAQ HATs 提供的 SDK 示例代码中,continuous_scan.py 的基础上修改来的。实现一个高速的采集及告警设备,来监测外市电的波动情况。

长期监测主要通过长期数据记录,分析是否有未超出告警范围的波动,以分析供电质量。

poYBAGJlE0uAF81sAAEbEmV8oM8986.jpg

这只是一个工程样机,可以进行实时(延时3秒+)告警和储存发生告警时的电压波形;可以进行长期监测(不实时告警)、也可以长期监测并实时告警。

内部构造如下图,其中 MCC 118 在图中正上方。

pYYBAGJlE06ADvcVAAI6zCcdlLE958.jpg

警告分为4种:本机的光警告、本机的声警告、继电器输出的外部警告、物联网平台(OneNet)警告。

目前支持两路三相交流电监测,每路三相电的 A、B、C 相分别用:黄、绿、红,发光二极管警告。

物联网平台的警告页面。分别是:一路三相电的一相警告,以及一路三相电的三相警告。

poYBAGJlE1GALJJ7AAEwhkqziQk248.jpg

采集到的正常交流电波形如下图。

pYYBAGJlE1OAKh_mAACXvlzRJkA949.jpg

数据分析部分是用 NumPy 进行交流电每个周期的最大值、最小值判断,然后确定是否报警,是否写入硬盘。

还有一个程序每一做任何判断,把采集到的所有数据都直接记录到硬盘,大概一小时会生成 1.2G 的数据。

也是用 Python 写的脚本,通过 NumPy 把数据做一个转换 ,然后用 NumPy 的 amax 和 amin 对数值进行分析判断。如果数据超过阈值,就写硬盘文件,同时用 GPIO 输出控制 LED 来发出预警信号

以下是完整代码,其中关键代码已经添加了注释。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
MCC 118 Functions Demonstrated:
mcc118.a_in_scan_start
mcc118.a_in_scan_read
mcc118.a_in_scan_stop
mcc118.a_in_scan_cleanup

Purpose:
Perform a continuous acquisition on 1 or more channels.

Description:
Continuously acquires blocks of analog input data for a
user-specified group of channels until the acquisition is
stopped by the user. The last sample of data for each channel
is displayed for each block of data received from the device.
MCC118共8个通道,可以监测两个三相变压器的6路市电,分别为变压器1的A、B、C相和变压器2的A、B、C相
"""
from __future__ import print_function
from daqhats import mcc118, OptionFlags, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string, \
chan_list_to_mask
import os
import threading
import numpy as np
import datetime
import time
from socket import *
import RPi.GPIO as GPIO
# ===GPIO setup=====================================================================
GPIO.setmode(GPIO.BCM)
led_list=[5,6,19,16,20,21] #LED使用的GPIO
GPIO.setup(led_list, GPIO.OUT, initial=GPIO.LOW)
delta = datetime.timedelta(minutes=5)
# ===================================================================================
READ_ALL_AVAILABLE = -1
CURSOR_BACK_2 = '\x1b[2D's
ERASE_TO_END_OF_LINE = '\x1b[0K'
# ===================================================================================
arraydata=[[0 for i in range(72000)] for h in range(10)] #定义一个采样缓存数组,72000为read_request_size*信道数,10为#连续采样的次数c_time

# =======OneNet param===================================================================
device_id = "Your 设备ID" # Your 设备ID
api_key = "Your API_KEY" # Your API_KEY
HOST = 'api.heclouds.com'
PORT = 80
BUFSIZ = 1024
ADDR = (HOST, PORT)
# =======Channel set==================================================================
# Store the channels in a list and convert the list to a channel mask that
# can be passed as a parameter to the MCC 118 functions.
channels = [0, 1, 2, 3, 4, 5]
channel_mask = chan_list_to_mask(channels)
num_channels = len(channels)
samples_per_channel = 0
options = OptionFlags.CONTINUOUS
scan_rate = 10000.0 #采样速率 最大值为100k/信道数
read_request_size = 12000 #每通道此次采样的点数
c_time = 10 #连续采样的次数
timeout = 5.0
# =======上下限=================================================================
volt=220 #根据需要设定标准电压
thread=0.2 #根据需要设定
upline=volt*(1+thread)*1.414*0.013558 #0.013558是根据电阻分压确定的
downline=volt*(1-thread)*1.414*0.013558 #0.013558是根据电阻分压确定的
# =======LED监测=================================================================
base_time=datetime.datetime.now()
#定义时间临时数组
led_stime=[base_time,base_time,base_time,base_time,base_time,base_time]
#定义状态临时数组
led_status=[-1,-1,-1,-1,-1,-1]

#=====================================================================================
def led_detect():
global led_stime,led_status
#变压器的A、B、C相
transno=["Trans1A","Trans1B","Trans1C","Trans2A","Trans2B","Trans2C"]
try:
#判断每个LED的状态
while PORT > 0 :
time.sleep(1)
if GPIO.input(5) == GPIO.HIGH and led_status[0]<0 :       
led_stime[0]=datetime.datetime.now()
led_status[0]=1

if GPIO.input(6) == GPIO.HIGH and led_status[1]<0 :
led_stime[1]=datetime.datetime.now()
led_status[1]=1

if GPIO.input(19) == GPIO.HIGH and led_status[2]<0 :
led_stime[2]=datetime.datetime.now()
led_status[2]=1

if GPIO.input(16) == GPIO.HIGH and led_status[3]<0 :
led_stime[3]=datetime.datetime.now()
led_status[3]=1

if GPIO.input(20) == GPIO.HIGH and led_status[4]<0 :
led_stime[4]=datetime.datetime.now()
led_status[4]=1

if GPIO.input(21) == GPIO.HIGH and led_status[5]<0 :
led_stime[5]=datetime.datetime.now()
led_status[5]=1
#相应引脚延时5分钟熄灭,并向平台发送数据清除告警
for i in range(6):
now=datetime.datetime.now()
if now > ( led_stime[i] + delta ) and led_status[i]>=0 :
GPIO.output(led_list[i], GPIO.LOW)
led_status[i]=-1
t5 = threading.Thread(target=senddata, args=(transno[i],"0"))
t5.start()

except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')

# ========senddata to OneNet==============================================================
def senddata(T_dev,alm):
Content = ""
Content += "{"datastreams":[{"id": ""+T_dev+"","datapoints": [{"value": " + alm + "}]}"
Content += "]}\r\n"
value = len(Content)
data = ""
data += "POST /devices/" + device_id + "/datapoints HTTP/1.1\r\n"
data += "Host:api.heclouds.com\r\n"
data += "Connection: close\r\n"
data += "api-key:" + api_key + "\r\n"
data += "Content-Length:" + str(value) + "\r\n"
data += "\r\n"
data += Content

tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR)
tcpCliSock.send(data.encode())
#data1 = tcpCliSock.recv(BUFSIZ).decode()
#print(data1)
# =====writefile==========================================================================
def writefile(arrayatt, fnat):
print("write file ", datetime.datetime.now())
np.savetxt(fnat, arrayatt.T, fmt="%1.2f", delimiter=" ")
print("\033[0;31m","finish write ", datetime.datetime.now(),"\033[0m")

#====Analyse and writefile and Alarm=======================================================
def analyse(array,fnat):
break_1_1 = -1
break_2_1 = -1
break_1_2 = -1
break_2_2 = -1
break_1_3 = -1
break_2_3 = -1
break_1 = -1
break_2 = -1
print("analyse file ", datetime.datetime.now())
#等于read_request_size = 12000
lll = read_request_size
file1 = "1-"+fnat
file2 = "2-"+fnat
a=np.array(array)
b = np.empty([3, c_time*read_request_size], dtype=float) # 变压器1定义一个数组
c = np.empty([3, c_time*read_request_size], dtype=float) # 变压器2定义一个数组
#板子的数据记录格式是:CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、
#通过下面拆分成6个CH每个CH一个list以便进行分CH判断
b[0] = np.concatenate((a[0, 0::6], a[1, 0::6], a[2, 0::6], a[3, 0::6], a[4, 0::6], a[5, 0::6], a[6, 0::6],a[7, 0::6], a[8, 0::6], a[9, 0::6])) #提取变压器1,A相数据
b[1] = np.concatenate((a[0, 1::6], a[1, 1::6], a[2, 1::6], a[3, 1::6], a[4, 1::6], a[5, 1::6], a[6, 1::6],a[7, 1::6], a[8, 1::6], a[9, 1::6])) #提取变压器1,B相数据
b[2] = np.concatenate((a[0, 2::6], a[1, 2::6], a[2, 2::6], a[3, 2::6], a[4, 2::6], a[5, 2::6], a[6, 2::6],a[7, 2::6], a[8, 2::6], a[9, 2::6])) #提取变压器1,C相数据
c[0] = np.concatenate((a[0, 3::6], a[1, 3::6], a[2, 3::6], a[3, 3::6], a[4, 3::6], a[5, 3::6], a[6, 3::6],a[7, 3::6], a[8, 3::6], a[9, 3::6])) #提取变压器2,A相数据
c[1] = np.concatenate((a[0, 4::6], a[1, 4::6], a[2, 4::6], a[3, 4::6], a[4, 4::6], a[5, 4::6], a[6, 4::6],a[7, 4::6], a[8, 4::6], a[9, 4::6])) #提取变压器2,B相数据
c[2] = np.concatenate((a[0, 5::6], a[1, 5::6], a[2, 5::6], a[3, 5::6], a[4, 5::6], a[5, 5::6], a[6, 5::6],a[7, 5::6], a[8, 5::6], a[9, 5::6])) #提取变压器2,C相数据
#=====判断越线=================================================================================
# 200 是 scan_rate = 10000.0 除以 50(交流电的频率)得出 每个周期 采样200个点
# 600 是 120000/200=600 read_request_size*c_time本次总的采样点数,除以每周期200点,总共600个周期
# 如果 scan_rate = 5000.0 就是 100, alens 是 120000/100=1200

for i in range(600):
#每个CH拆分成每个正弦波周期进行峰值判断
a1 = b[0][(i * 200):((i + 1) * 200 - 1)]
b1 = b[1][(i * 200):((i + 1) * 200 - 1)]
c1 = b[2][(i * 200):((i + 1) * 200 - 1)]
a2 = c[0][(i * 200):((i + 1) * 200 - 1)]
b2 = c[1][(i * 200):((i + 1) * 200 - 1)]
c2 = c[2][(i * 200):((i + 1) * 200 - 1)]

#每一项分别判断上下限并分别告警
if np.amax(a1) > upline or np.amax(a1) < downline :
if break_1_1 <0:
GPIO.output(5, GPIO.HIGH) #相应引脚置高电平,点亮LED
break_1_1 = 1
t3 = threading.Thread(target=senddata, args=("Trans1A","100")) #调用上传进程
t3.start()
if np.amax(b1) > upline or np.amax(b1) < downline  :
if break_1_2< 0:
GPIO.output(6, GPIO.HIGH) #相应引脚置高电平,点亮LED
break_1_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans1B","100")) #调用上传进程
t3.start()
if np.amax(c1) > upline or np.amax(c1) < downline:
if break_1_3 < 0:
GPIO.output(19, GPIO.HIGH) #相应引脚置高电平,点亮LED
break_1_3 = 1
t3 = threading.Thread(target=senddata, args=("Trans1C","100")) #调用上传进程
t3.start()
if np.amax(a2) > upline or np.amax(a2) < downline:
if break_2_1 < 0:
GPIO.output(16, GPIO.HIGH) #相应引脚置高电平,点亮LED
break_2_1=1
t3 = threading.Thread(target=senddata, args=("Trans2A","100")) #调用上传进程
t3.start()
if np.amax(b2) > upline or np.amax(b2) < downline:
if break_2_2 < 0:
GPIO.output(20, GPIO.HIGH) #相应引脚置高电平,点亮LED
break_2_1 =1
t3 = threading.Thread(target=senddata, args=("Trans2B","100")) #调用上传进程
t3.start()
if np.amax(c2) > upline or np.amax(c2) < downline:
if break_2_3 < 0:
GPIO.output(21, GPIO.HIGH) #相应引脚置高电平,点亮LED
break_2_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans2C","100")) #调用上传进程
t3.start()
#每个变压器任何一项有告警就调用写文件进程写文件
if np.amax(a1) > upline or np.amax(a1) < downline or np.amax(b1) > upline or np.amax(b1) < downline or np.amax(c1) > upline or np.amax(c1) < downline:
if break_1 < 0:
t1 = threading.Thread(target=writefile, args=(b, file1,)) #调用写文件进程
t1.start()
break_1 = 2

if np.amax(a2) > upline or np.amax(a2) < downline or np.amax(b2) > upline or np.amax(b2) < downline or np.amax(c2) > upline or np.amax(c2) < downline:
if break_2 < 0:
t1 = threading.Thread(target=writefile, args=(c, file2,)) #调用写文件进程
t1.start()
break_2 = 2
print("\033[0;32m","analyse finish ", datetime.datetime.now(),"\033[0m") #font color
# ============================================================================================
def main():
"""
This function is executed automatically when the module is run directly.
"""
# ============================================================================================
try:

# Select an MCC 118 HAT device to use.
address = select_hat_device(HatIDs.MCC_118)
hat = mcc118(address)
'''
print('\nSelected MCC 118 HAT device at address', address)
actual_scan_rate = hat.a_in_scan_actual_rate(num_channels, scan_rate)
print('\nMCC 118 continuous scan example')
print(' Functions demonstrated:')
print(' mcc118.a_in_scan_start')
print(' mcc118.a_in_scan_read')
print(' mcc118.a_in_scan_stop')
print(' Channels: ', end='')
print(', '.join([str(chan) for chan in channels]))
print(' Requested scan rate: ', scan_rate)
print(' Actual scan rate: ', actual_scan_rate)
print(' Options: ', enum_mask_to_string(OptionFlags, options))
# ============================================================================================
try:
input('\nPress ENTER to continue ...')
except (NameError, SyntaxError):
pass
'''
# Configure and start the scan.
# Since the continuous option is being used, the samples_per_channel
# parameter is ignored if the value is less than the default internal
# buffer size (10000 * num_channels in this case). If a larger internal
# buffer size is desired, set the value of this parameter accordingly.
print('Starting scan ... Press Ctrl-C to stop\n')
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)
# ============================================================================================
try:
while True:
fna = str(datetime.datetime.now()) + ".txt"
#连续采样10次
for i in range(c_time):
read_result = hat.a_in_scan_read(read_request_size, timeout) # read channel Data
arraydata[i]=read_result.data

if read_result.hardware_overrun:
print('\n\nHardware overrun\n')
break
elif read_result.buffer_overrun:
print('\n\nBuffer overrun\n')
break

hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)

#调用分析进程
arraydatat = arraydata
t2 = threading.Thread(target=analyse, args=(arraydatat, fna, ))
t2.start()

except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()

except (HatError, ValueError) as err:
print('\n', err)


if __name__ == '__main__':
#表用进程实时监测告警状态,判断是否到时间消除告警
t4 = threading.Thread(target=led_detect)
t4.start()
main()

程序部署
参考这篇教程安装好 MCC DAQ HATs 的 SDK 和代码示例。
https://shumeipai.nxez.com/2018/10/18/get-started-with-the-evaluation-for-mcc-118.html
将上面的程序保存为 main.py 然后在程序所在目录运行下面的命令即可启动程序。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 交流电
    +关注

    关注

    14

    文章

    466

    浏览量

    33570
  • 树莓派
    +关注

    关注

    113

    文章

    1639

    浏览量

    104741
收藏 人收藏

    评论

    相关推荐

    树莓派MCC118

    可以使用外部时钟和触发作为输入选项,进行同步采集。用户可以混合和搭配堆栈中的MCCHAT。模拟输入MCC 118 提供8个12位单端模拟输入。模拟信号电压范围为 ±10 V。采样率• 单板: 最大
    发表于 01-21 09:22

    流电交流电的定义

    交流电,是指大小和方向随时间作周期性变化的一种电流。交流电是用交流发电机发出的,在发电过程中,多对磁极是按一定的角度均匀分布在一个圆周上,使得发电
    发表于 01-25 15:01

    什么是直流电什么是交流电

    交流电,是指大小和方向随时间作周期性变化的一种电流。交流电是用交流发电机发出的,在发电过程中,多对磁极是按一定的角度均匀分布在一个圆周上,使得发电
    发表于 09-13 06:57

    交流电变成直流电的转变过程

    3.交流电变成直流电的转变过程称为滤波。...
    发表于 09-17 08:39

    浅析交流电路与正弦交流电

    什么叫交流电?怎样去计算正弦交流电的瞬时值、最大值与最小值?怎样去计算正弦交流电的电动势、电压与电流?
    发表于 10-09 06:50

    交流电和直流电的区别

    交流电,简称为AC。交流电也称“交变电流”,简称“交流”。电流方向随时间作周期性变化的为交流电。它的最基本的形式是正弦电流。当法拉第发现了电磁感应后,产生
    发表于 11-05 15:14 16.4w次阅读

    交流电频率监测方法

    交流电的频率是指它单位时间内周期性变化的次数,单位是赫兹(Hz),与周期成倒数关系。日常生活中的交流电的频率一般为50Hz,而无线电技术中涉及的交流电频率一般较大,达到千赫兹(KHz)甚至兆赫兹(MHz)的度量。
    发表于 12-25 15:34 1.4w次阅读
    <b class='flag-5'>交流电</b>频率<b class='flag-5'>监测</b>方法

    交流电机分为哪几种_交流电机分类有哪些_交流电机的电源和变频调速

    由于交流电力系统的巨大发展,交流电机已成为最常用的电机。本文首先介绍了交流电机分类及原理,其次阐述了交流电机优缺点及交流电机分为哪几种,最后
    发表于 05-10 10:19 3.6w次阅读

    流电交流电的区别与交流电电压计算公式

    流电交流电的区别与交流电电压计算公式(深圳市村田电源技术有限公司武汉分公司)-直流电交流电的区别与
    发表于 09-15 15:59 35次下载
    直<b class='flag-5'>流电</b>和<b class='flag-5'>交流电</b>的区别与<b class='flag-5'>交流电</b>电压计算公式

    交流及正弦交流电

    交流及正弦交流电(深圳市核达中远通电源技术有限公司电话多少)-交流及正弦交流电...........................
    发表于 09-24 13:16 9次下载
    <b class='flag-5'>交流</b>及正弦<b class='flag-5'>交流电</b>

    交流电源的原理和应用范围

    我们生活中有很多不同的电器——手电筒、微波炉、空调、插头等等,那么,这些电器中哪些是使用直流电源的,又有哪些是使用交流电源的呢?交流电源又是如何进行工作的呢?下面,让我们一起对
    发表于 01-06 15:46 7次下载
    <b class='flag-5'>交流电</b>源的原理和应用范围

    一文详解交流电交流电路基本知识

    本文通过图文,分享交流电和分类、常见交流电源、交流发电机基本工作原理、交流电路等交流电交流电
    的头像 发表于 01-30 16:33 5005次阅读
    一文详解<b class='flag-5'>交流电</b>与<b class='flag-5'>交流电</b>路基本知识

    交流电机是谁发明的 交流电机分类

    交流电机是由美籍塞尔维亚裔科学家尼古拉·特斯拉发明的。交流电机是用于实现机械能和交流电能相互转换的机械。由于交流电力系统的巨大发展,交流电
    发表于 03-24 17:39 850次阅读

    交流电机是什么意思

    交流电机简介 交流电机是用于实现机械能和交流电能相互转换的机械。由于交流电力系统的巨大发展,交流电机已成为最常用的电机。
    发表于 07-05 13:38 928次阅读

    浅谈舰船交流电网绝缘监测及故障定位的研究及产品选型

    摘要:交流电网和电气设备的绝缘状况直接影响舰船电力系统安全,其绝缘电阻的下降是一个不可避免的过程,成为了电网安全的严重隐患。电气设备绝缘材料的劣化过程是不可逆的,对舰船交流电
    的头像 发表于 07-06 09:01 228次阅读
    浅谈舰船<b class='flag-5'>交流电</b>网绝缘<b class='flag-5'>监测</b>及故障定位的研究及产品选型