The GY-DAQ-2480-E is a high-performance Ethernet-based DAS data acquisition card dedicated to distributed fibre optic acoustic sensing systems, featuring 250MSps 14-bit dual-channel synchronous trigger sampling and built-in coherent fading suppression demodulation algorithm. It supports Gigabit Ethernet or 10G optical port high-speed transmission, real-time upload of raw/demodulated data, and remote independent deployment for DAS data acquisition over distances exceeding 100 kilometres. The card realizes full-cable demodulation, with phase data directly available as vibration signals, reducing system CPU load and development complexity.
Huang
Email: Hqy@ybphotonics.com
Download>>
The GY-DAQ-2480 is a network interface-based DAS full-demodulation acquisition card featuring dual-channel 250 MSps synchronous trigger sampling.As a network interface DAS high-speed data acquisition card, it communicates with the host computer via Gigabit Ethernet or 10G optical ports. Utilising large-capacity data buffering and high-speed data transmission engine technology, it supports real-time uploading of raw and demodulated data. Data transfer rates reach up to 1000Mb/s, with optical port sampling transmission achieving 10Gb/s.The acquisition card employs a dual-channel 14-bit high-speed ADC with a sampling rate of 250 MSps, supporting DAS data acquisition over distances exceeding 100 kilometres.Featuring an integrated phase demodulation algorithm, it can directly upload amplitude-phase demodulated data. The phase data may be utilised directly by users as vibration signals without further demodulation processing, enabling full-cable demodulation. Alternatively, raw sampled data may be uploaded for user-conducted demodulation.
Compared to conventional IQ demodulation acquisition cards, this acquisition card incorporates an integrated phase demodulation algorithm module. This enables coherent fading suppression and global demodulation, significantly enhancing the accuracy and stability of the demodulated signal. It lays the foundation for subsequent signal processing and recognition. This eliminates the need for users to perform untwisting operations after acquiring data, reduces the system's CPU computational demands, and substantially lowers development complexity. It enables straightforward operation where data is ready for use upon reading, with phase directly representing vibration.

Detailed Specifications
| Functionality | ADC | Transmission Rate | 1Gb/s (Gigabit) or 10Gb/s (fibre port) |
|---|---|---|---|
| Resolution | 14Bit | Analogue Bandwidth | 120MHz |
| Sampling Rate | 250MSps | Resolution | 0.4m, 0.8m, 1.6m, 3.2m, 6.4m available |
| Number of Channels | 2 | Calibration Scale | 2-32 Sampling points |
| Demodulation Algorithm | Coherent Attenuation Suppression for All-Optical Cable Demodulation | Trigger Source | Internal trigger: 3.3V/5V TTL outputExternal trigger: 3.3V TTL input |
| Input Range | 2Vpp | Transmission Interface | Gigabit Ethernet port or 10G optical port available |
| Coupling Method | DC | Board Dimensions | 150mm(L)x100mm(W)x28.2mm(D) |
| Impedance | 50Ω | Operating Temperature | -25℃-70℃ |
| Signal-to-Noise Ratio | 60dBc SFDR | Storage Temperature | -40℃-80℃ |
| Acquisition Method | Triggered collection | Programming Environment | C, C++, Python, MATLAB, etc. |
| Power Consumption | <12W | System Platform | Windows、Linux |
This acquisition card is also widely compatible with DAS fibre optic sensing coherent demodulation solutions, and is compatible with DAS modules from other manufacturers offering similar solutions.
150mm(L)x100mm(W)x28.2mm(D)
.
1. Gigabit Ethernet interface: For data transmission and parameter configuration
2. 10G optical port: For data transmission and parameter configuration
3. Ch1: Acquisition channel 1
4. Ch2: Acquisition channel 2
5. Trig-in: External trigger, 3.3V input
6. Trig-out: Internal acquisition trigger output, 3.3V/5V selectable
Note: Only one model can be selected between Gigabit Ethernet and 10G optical port.
| Model | Description | Remarks |
|---|---|---|
| GY-DAQ-2480-E | Gigabit Ethernet Port Transmission | |
| GY-DAQ-2480-OE | 10 Gigabit Ethernet port transmission | Please specify if you need to use the optical port. |

Capture Card Communication Protocol: UDP.
Supported operating systems: Windows and Linux. Since it uses the UDP network protocol, no drivers need to be installed.
Programming languages: Programming languages: C, C++, Python, and other languages that support UDP programming are all acceptable.
Python call example code
import sys
import socket
import struct
import time
import threading
import numpy as np
import sounddevice as sd
from collections import deque
from typing import Optional, Callable, Any
import pyqtgraph as pg
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QHBoxLayout, QVBoxLayout,QTextEdit,
QLabel, QLineEdit, QPushButton, QGroupBox, QFormLayout,QComboBox,
QMessageBox, QSpinBox
)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt5.QtGui import QFont
# -------------------------- DVS采集卡类 --------------------------
class DVSCollector:
# 通信参数
FPGA_IP = "192.168.137.2"
FPGA_PORT = 6789
PC_DATA_PORT = 6788
PC_CMD_PORT = 6787
PC_IP = "192.168.137.3"
# 帧头定义
SEND_HEADER = b'\xA5\x5A\xAA\x55\x55\xAA' # 0xA5 5A AA 55 55 AA
REV_HEADER = b'\x5A\xA5\x55\xAA\xAA\x55' # 0x5A A5 55 AA AA 55
# 功能码定义
FUNC_SEND_CMD = 0x0001
FUNC_READ_DATA = 0x0002
FUNC_SAMPLE_DATA = 0x0003
# 命令码定义
CMD_START_STOP = 0x0001 # 开始/结束命令
CMD_SAMPLES = 0x0002 # 采样长度
CMD_PULSE_FREQ = 0x0004 # 脉冲频率
CMD_DATA_TYPE = 0x0008 # 设置数据类型
CMD_SAMPLE_DELAY = 0x0010 # 采样延时点数
CMD_PULSE_WIDTH = 0x0011 # 脉冲宽度
CMD_GAUGE = 0x0034 # 设置解调标距
CMD_RESOLUTION = 0x0026 # 设置采样分辨率
CMD_BIAS_VOLTAGE = 0x0023 # 偏置电压设置
CMD_TRIGGER_SELECT = 0x0025 # 设置触发选择
def __init__(self):
#命令发送端口
self.cmd_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.cmd_socket.bind((self.PC_IP, self.PC_CMD_PORT))
#数据接收端口
self.data_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.data_socket.bind((self.PC_IP, self.PC_DATA_PORT))
# 设置超时
self.cmd_socket.settimeout(5.0)
self.data_socket.settimeout(1.0)
# 数据回调函数
self.data_callback = None
self.is_listening = False
self.listen_thread = None
print(f"端口初始化完成")
print(f"FPGA地址: {self.FPGA_IP}:{self.FPGA_PORT}")
print(f"数据端口: {self.PC_IP}:{self.PC_DATA_PORT}")
print(f"命令端口: {self.PC_IP}:{self.PC_CMD_PORT}")
def _build_command_packet(self,cmd_code: int, data_value: int,func_code: int) -> bytes:
"""构建命令报文"""
# 数据字段固定8字节
data_bytes = data_value.to_bytes(8, byteorder='big', signed=False)
data_length = len(data_bytes)
# 备用字段
reserved = 0
# 使用struct打包固定部分
# 格式: 帧头(6s) + 功能码(H) + 命令码(H) + 数据长度(I) + 备用(H)
packet_format = ">6sHHIH"
fixed_part = struct.pack(
packet_format,
self.SEND_HEADER,
func_code,
cmd_code,
data_length,
reserved
)
return fixed_part + data_bytes
def send_command(self, cmd_code: int, data_value: int, func_code: int = FUNC_SEND_CMD) -> bool:
"""
发送命令到FPGA
"""
try:
packet = self._build_command_packet( cmd_code, data_value,func_code)
# 发送命令
self.cmd_socket.sendto(packet, (self.FPGA_IP, self.FPGA_PORT))
# 等待响应
try:
response, addr = self.cmd_socket.recvfrom(1024)
return self._parse_response(response, cmd_code)
except socket.timeout:
print("命令响应超时,尝试重发...")
# 重发一次
self.cmd_socket.sendto(packet, (self.FPGA_IP, self.FPGA_PORT))
try:
response, addr = self.cmd_socket.recvfrom(1024)
return self._parse_response(response, cmd_code)
except socket.timeout:
print("重发后仍无响应,命令发送失败")
return False
except Exception as e:
print(f"发送命令失败: {e}")
return False
def _parse_response(self, response: bytes, expected_cmd: int) -> bool:
"""
解析命令响应
"""
try:
# 响应格式: 帧头(6s) + 功能码(H) + 保留(H) + 数据长度(H) + 命令(H) + 结果(H)
if len(response) < 12:
print("响应长度不足")
return False
header = response[0:6]
if header != self.REV_HEADER:
print(f"响应帧头不匹配: {header.hex()}")
return False
# 解析其他字段
func_code = struct.unpack(">H", response[6:8])[0]
reserved = struct.unpack(">H", response[8:10])[0]
data_length = struct.unpack(">H", response[10:12])[0]
if data_length >= 4:
cmd_code = struct.unpack(">H", response[12:14])[0]
result = struct.unpack(">H", response[14:16])[0]
if cmd_code == expected_cmd:
print(f"命令码匹配: 期望0x{expected_cmd:04X}, 实际0x{cmd_code:04X},结果={result}")
return True,result
else:
print(f"命令码不匹配: 期望0x{expected_cmd:04X}, 实际0x{cmd_code:04X}")
return False,result
else:
print("响应数据长度不足")
return False
except Exception as e:
print(f"解析响应失败: {e}")
return False
def _parse_sample_data(self, packet: bytes) -> dict:
"""
解析采样数据包
"""
try:
# 数据包格式: 帧头(6s) + 功能码(H) + 保留(H) + 数据标志(H) + 包序号(H) + 包长度(H) + 数据
if len(packet) < 16:
return None
header = packet[0:6]
if header != self.REV_HEADER:
return None
func_code = struct.unpack(">H", packet[6:8])[0]
reserved = struct.unpack(">H", packet[8:10])[0]
data_flag = struct.unpack(">H", packet[10:12])[0]
packet_seq = struct.unpack(">H", packet[12:14])[0]
packet_length = struct.unpack(">H", packet[14:16])[0]
# 提取采样数据 (int16格式)
sample_data = packet[16:16+packet_length]
sample_count = len(sample_data) // 2 # 每个采样点2字节
# 转换为numpy数组
sample_data = np.frombuffer(sample_data, dtype=np.int16)
return {
"func_code": func_code,
"data_flag": data_flag,
"packet_seq": packet_seq,
"packet_length": packet_length,
"sample_count": sample_count,
"sample_data": sample_data,
"is_end": (data_flag == 0x1100) # 数据结束标志
}
except Exception as e:
print(f"解析采样数据失败: {e}")
return None
def _data_listener(self):
"""
数据监听线程
"""
buffer = {} # 用于重组分片数据
current_trigger = 0
while self.is_listening:
try:
data, addr = self.data_socket.recvfrom(2048) # 最大1424+16字节
parsed = self._parse_sample_data(data)
if parsed:
# 处理数据包序列
trigger_id = current_trigger
if parsed["is_end"]:
current_trigger += 1
# 调用回调函数
if self.data_callback:
self.data_callback(parsed, trigger_id)
except socket.timeout:
continue
except ConnectionResetError:
print("连接被重置")
continue
except OSError as e:
print(f"套接字错误: {e}")
continue
except Exception as e:
if self.is_listening: # 只有在监听状态下才打印错误
print(f"数据接收错误: {e}")
def start_data_listener(self, callback: Callable[[dict, int], None]):
"""
启动数据监听
"""
self.data_callback = callback
self.is_listening = True
self.listen_thread = threading.Thread(target=self._data_listener)
self.listen_thread.daemon = True
self.listen_thread.start()
print("数据监听已启动")
def stop_data_listener(self):
"""
停止数据监听
"""
self.is_listening = False
if self.listen_thread:
self.listen_thread.join(timeout=2.0)
print("数据监听已停止")
# 设备控制命令方法
def start_collected(self) -> bool:
"""开始采集"""
return self.send_command(self.CMD_START_STOP, 1)
def stop_collected(self) -> bool:
"""停止采集"""
self.stop_data_listener()
return self.send_command(self.CMD_START_STOP, 0)
def set_sample_length(self, length: int) -> bool:
"""设置采样点数 """
if length % 256 != 0:
print("采样点数必须是256的倍数")
return False
return self.send_command(self.CMD_SAMPLES, length)
def set_pulse_frequency(self, freq: int) -> bool:
"""设置脉冲频率 (默认2000)"""
return self.send_command(self.CMD_PULSE_FREQ, freq)
def set_sample_delay(self, delay: int) -> bool:
"""设置采样延时 (默认100)"""
return self.send_command(self.CMD_SAMPLE_DELAY, delay)
def set_pulse_width(self, width_ns: int) -> bool:
"""设置脉冲宽度 (ns, 默认100)"""
return self.send_command(self.CMD_PULSE_WIDTH, width_ns)
def set_data_type(self, data_type: int) -> bool:
"""设置数据类型 """
return self.send_command(self.CMD_DATA_TYPE, data_type)
def set_resolutin(self, rate_index: int) -> bool:
"""设置分辨率 (0-4)"""
if rate_index < 0 or rate_index > 4:
print("分辨率索引必须是0-4)")
return False
return self.send_command(self.CMD_RESOLUTION, rate_index)
def set_bias_voltage(self, voltage_mv: int) -> bool:
"""设置偏置电压 (±1000mV, 0=0V, 1000=+1V, -1000=-1V)"""
if voltage_mv < -1000 or voltage_mv > 1000:
print("偏置电压范围: -1000 到 +1000 mV")
return False
# 转换为无符号表示
if voltage_mv < 0:
voltage_mv = 0xFFFFFFFFFFFFF000 + voltage_mv
return self.send_command(self.CMD_BIAS_VOLTAGE, voltage_mv)
def set_gauge(self,gauge: int) -> bool:
"""设置解调标距 (默认16)"""
return self.send_command(self.CMD_GAUGE, gauge)
def set_trigger_source(self, external: bool) -> bool:
"""设置触发源 (False=内部, True=外部)"""
value =1 if external else 0
return self.send_command(self.CMD_TRIGGER_SELECT, value)
def search_parames(self, parames):
value = 0x0000000000000000
func_code = self.FUNC_READ_DATA
try:
if parames == 1:
return self.send_command(self.CMD_RESOLUTION, value, func_code) #分辨率
elif parames == 2:
return self.send_command(self.CMD_SAMPLES, value, func_code) #采样点数
elif parames == 3:
return self.send_command(self.CMD_PULSE_FREQ, value, func_code) #脉冲频率
elif parames == 4:
return self.send_command(self.CMD_PULSE_WIDTH, value, func_code) #脉冲宽度
elif parames == 5:
return self.send_command(self.CMD_SAMPLE_DELAY, value, func_code) #采样延时
elif parames == 6:
return self.send_command(self.CMD_GAUGE, value, func_code) #解调标距
elif parames == 7:
return self.send_command(self.CMD_DATA_TYPE, value, func_code) #数据类型
elif parames == 8:
return self.send_command(self.CMD_BIAS_VOLTAGE, value, func_code) #偏置电压
elif parames == 9:
return self.send_command(self.CMD_TRIGGER_SELECT, value, func_code) #触发源
else:
print(f"未知参数:{parames}")
return False, "未知参数"
except Exception as e:
print(f"搜索参数时发生错误: {e}")
return False, str(e)
def close(self):
"""关闭连接"""
self.stop_data_listener()
self.cmd_socket.close()
self.data_socket.close()
print("DVS采集卡连接已关闭")
# -------------------------- 数据接收线程(避免阻塞UI) --------------------------
class DataReceiveThread(QThread):
data_signal = pyqtSignal(dict, int) # 发送解析后的数据和trigger_id
def __init__(self, dvs_collector):
super().__init__()
self.dvs = dvs_collector
def data_callback(self, data_packet, trigger_id):
"""转发数据到UI线程"""
self.data_signal.emit(data_packet, trigger_id)
def run(self):
"""启动数据监听"""
self.dvs.start_data_listener(self.data_callback)
# 保持线程运行直到监听停止
while self.dvs.is_listening:
time.sleep(0.1)
# -------------------------- 主窗口界面 --------------------------
class DVSUI(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DVS采集卡控制界面")
self.setGeometry(350, 100, 1300, 900)
try:
# 初始化DVS采集卡
self.dvs = DVSCollector()
self.collecting = False # 采集状态标记
self.data_receive_thread = None
#绘图更新定时器(控制更新频率,避免UI过载)
self.plot_update_timer = QTimer()
self.plot_update_timer.setInterval(50)
self.plot_update_timer.timeout.connect(self._update_plots)
# 创建UI布局
self._init_ui()
# 初始化所有数据
self._init_all_data()
# 初始化绘图
self._init_plots()
except Exception as e:
print("初始化失败", f"无法初始化DVS采集卡: {str(e)}")
sys.exit(1)
def _init_ui(self):
"""初始化界面布局"""
# 中心部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 整体水平布局(左图右参)
main_layout = QHBoxLayout(central_widget)
main_layout.setSpacing(20)
main_layout.setContentsMargins(20, 20, 20, 20)
# -------------------------- 左侧:绘图区域 --------------------------
left_widget = QWidget()
left_layout = QVBoxLayout(left_widget)
left_layout.setSpacing(15)
# 单点相位数据曲线
self.plot1_widget = pg.PlotWidget(title="单点相位数据曲线")
self.plot1_widget.setXRange(0, 4096)
self.plot1_widget.setYRange(0,100)
self.plot1_widget.setLabel('left', '幅度')
self.plot1_widget.setLabel('bottom', '采样位置')
left_layout.addWidget(self.plot1_widget)
# 全相位曲线
self.plot2_widget = pg.PlotWidget(title="全相位曲线")
self.plot2_widget.setXRange(0, 4096)
self.plot2_widget.setYRange(0,50)
self.plot2_widget.setLabel('left', '幅度')
self.plot2_widget.setLabel('bottom', '采样位置')
left_layout.addWidget(self.plot2_widget)
# 全相位瀑布图
self.waterfall_widget = pg.GraphicsLayoutWidget()
# 创建绘图项
self.plot_item = self.waterfall_widget.addPlot(title=f'全相位数据瀑布图')
self.plot_item.setXRange(0, 2048)
self.plot_item.setLabel('bottom', '实际距离 (m)')
self.plot_item.setLabel('left', '脉冲计数 (n)')
self.waterfall_img = pg.ImageItem()
self.plot_item.addItem(self.waterfall_img)
# 颜色条
self.cmap = pg.colormap.get("magma")
self.waterfall_cbar = pg.ColorBarItem(
values=(0, 0.1),
colorMap=self.cmap,
label='幅度'
)
self.waterfall_cbar.setImageItem(self.waterfall_img)
self.waterfall_widget.addItem(self.waterfall_cbar, 0, 1)
left_layout.addWidget(self.waterfall_widget)
# -------------------------- 右侧:参数设置和控制区域 --------------------------
right_widget = QWidget()
right_layout = QVBoxLayout(right_widget)
right_layout.setSpacing(20)
right_layout.setAlignment(Qt.AlignTop)
# 参数设置分组框
sample_param_group = QGroupBox("采集参数设置")
sample_param_group.setFont(QFont("Arial", 12, QFont.Bold))
form_layout = QFormLayout()
form_layout.setSpacing(15)
# 分辨率
self.resolutin_combo = QComboBox()
self.resolutin_combo.setFont(QFont("Arial", 12))
resolution_options = ["0.4 m", "0.8 m", "1.6 m", "3.2 m", "6.4 m"]
self.resolutin_combo.addItems(resolution_options)
self.resolutin_combo.setCurrentIndex(2)
form_layout.addRow(QLabel("分辨率:"), self.resolutin_combo)
# 采样点数
self.samples_spin = QSpinBox()
self.samples_spin.setRange(256, 99999)
self.samples_spin.setSingleStep(256)
self.samples_spin.setValue(4096)
form_layout.addRow(QLabel("采样点数:"), self.samples_spin)
# 脉冲频率
self.pulse_freq_spin = QSpinBox()
self.pulse_freq_spin.setRange(100, 10000000)
self.pulse_freq_spin.setValue(2000)
form_layout.addRow(QLabel("脉冲频率(Hz):"), self.pulse_freq_spin)
# 脉冲宽度(ns)
self.pulse_width_spin = QSpinBox()
self.pulse_width_spin.setRange(0, 99999)
self.pulse_width_spin.setSingleStep(4)
self.pulse_width_spin.setValue(100)
form_layout.addRow(QLabel("脉冲宽度(ns):"), self.pulse_width_spin)
# 采样延时
self.sample_delay_spin = QSpinBox()
self.sample_delay_spin.setRange(0, 1000)
self.sample_delay_spin.setValue(100)
form_layout.addRow(QLabel("采样延时:"), self.sample_delay_spin)
# 解调标距
self.gauge_spin = QSpinBox()
self.gauge_spin.setRange(1, 100)
self.gauge_spin.setValue(16)
form_layout.addRow(QLabel("解调标距:"), self.gauge_spin)
# 数据类型
self.channels_combo = QComboBox()
self.channels_combo.setFont(QFont("Arial", 12))
channels_options = ["CH1","CH2"]
self.channels_combo.addItems(channels_options)
self.resolutin_combo.setCurrentIndex(0)
form_layout.addRow(QLabel("当前通道:"), self.channels_combo)
# 触发源(内部/外部)
self.trigger_external = False
self.trigger_btn = QPushButton("内部触发")
self.trigger_btn.clicked.connect(self._toggle_trigger)
form_layout.addRow(QLabel("触发源:"), self.trigger_btn)
# 偏置电压
self.bias_voltage_spin = QSpinBox()
self.bias_voltage_spin.setRange(-1, 1)
self.bias_voltage_spin.setValue(0)
form_layout.addRow(QLabel("偏置电压(mV):"), self.bias_voltage_spin)
sample_param_group.setLayout(form_layout)
right_layout.addWidget(sample_param_group)
# 音频参数分组框
pos_param_group = QGroupBox("音频参数设置")
pos_param_group.setFont(QFont("Arial", 12, QFont.Bold))
pos_v_layout = QVBoxLayout()
pos_param_group.setLayout(pos_v_layout)
form_layout1 = QFormLayout()
form_layout1.setSpacing(15)
# 音频位置
self.pos_spin = QSpinBox()
self.pos_spin.setRange(0, 4095)
self.pos_spin.setValue(99)
form_layout1.addRow(QLabel("音频位置:"), self.pos_spin)
pos_v_layout.addLayout(form_layout1)
# 开始/暂停播放按钮
self.play_btn = QPushButton("开始播放")
self.play_btn.setFont(QFont("Arial", 12, QFont.Bold))
self.play_btn.clicked.connect(self.toggle_audio_play)
pos_v_layout.addWidget(self.play_btn, alignment=Qt.AlignCenter)
right_layout.addWidget(pos_param_group)
# 开始/暂停采集按钮
self.start_stop_btn = QPushButton("开始采集")
self.start_stop_btn.setFont(QFont("Arial", 12, QFont.Bold))
self.start_stop_btn.clicked.connect(self._toggle_collect)
right_layout.addWidget(self.start_stop_btn)
#查询参数
query_widget = QWidget()
query_layout = QHBoxLayout(query_widget)
# 查询结果显示标签
self.query_result_label = QLabel("查询参数")
self.query_result_label.setFont(QFont("Arial", 12, QFont.Bold))
self.search_parames = QComboBox()
self.search_parames.setFont(QFont("Arial", 12, QFont.Bold))
query_items = [
"分辨率",
"采样点数",
"脉冲频率(Hz)",
"脉冲宽度(ns)",
"采样延时",
"解调标距",
"数据类型",
"偏置电压(mV)",
"触发源"
]
self.search_parames.addItems(query_items)
self.search_parames.activated.connect(self.search_result)
# 添加到查询布局
query_layout.addWidget(self.query_result_label)
query_layout.addWidget(self.search_parames)
# 将容器控件添加到right_layout
right_layout.addWidget(query_widget)
self.textEdit = QTextEdit()
right_layout.addWidget(self.textEdit)
# 填充右侧空白
right_layout.addStretch()
# 添加左右部件到主布局
main_layout.addWidget(left_widget,9)
main_layout.addWidget(right_widget, 1)
def _init_all_data(self):
"""初始化所有数据缓存和绘图相关参数(采集启动时调用)"""
# 原始数据缓存
self.raw_data = np.array([], dtype=np.int16)
# 音频缓冲区
self.audio_buffer_length = 5000
self.audio_buffer = np.array([], dtype=np.float64)
self.is_playing = False
self.audio_queue = deque(maxlen=30)
# 待绘图数据
self.data1 = np.array([], dtype=np.float64)
self.data2 = np.array([], dtype=np.float64)
# 瀑布图数据缓存
self.waterfall_data = np.zeros((100, self.samples_spin.value()))
print("所有数据和参数已初始化完成")
def _init_plots(self):
"""初始化绘图元素"""
#单点相位曲线
self.curve1 = self.plot1_widget.plot(pen='g', name="单点相位曲线")
#全相位曲线
self.curve2 = self.plot2_widget.plot(pen='b', name="全相位曲线")
# 瀑布图初始化
self.waterfall_img.setImage(self.waterfall_data.T)
def _toggle_trigger(self):
"""切换触发源"""
self.trigger_external = not self.trigger_external
if self.trigger_external:
self.trigger_btn.setText("外部触发")
else:
self.trigger_btn.setText("内部触发")
def _apply_parameters(self):
"""应用参数设置到DVS采集卡"""
try:
# 获取参数
self.resolutin = self.resolutin_combo.currentIndex()
self.samples = self.samples_spin.value()
self.pulse_freq = self.pulse_freq_spin.value()
self.pulse_width = self.pulse_width_spin.value()
self.sample_delay = self.sample_delay_spin.value()
self.gauge = self.gauge_spin.value()
self.channels = self.channels_combo.currentIndex()+1
self.bias_voltage = self.bias_voltage_spin.value()
self.trigger_external = self.trigger_external
self.pos = self.pos_spin.value()
# 应用参数
success = self.dvs.set_resolutin(self.resolutin)
self.textEdit.append(f"设置分辨率: {'成功' if success else '失败'}")
success = self.dvs.set_sample_length(self.samples)
self.textEdit.append(f"设置采样长度: {'成功' if success else '失败'}")
success = self.dvs.set_pulse_frequency(self.pulse_freq)
self.textEdit.append(f"设置频率: {'成功' if success else '失败'}")
success = self.dvs.set_pulse_width(self.pulse_width)
self.textEdit.append(f"设置脉宽: {'成功' if success else '失败'}")
success = self.dvs.set_sample_delay(self.sample_delay)
self.textEdit.append(f"设置采样延时: {'成功' if success else '失败'}")
success = self.dvs.set_gauge(self.gauge)
self.textEdit.append(f"设置标距: {'成功' if success else '失败'}")
success = self.dvs.set_data_type(3)
self.textEdit.append(f"设置数据类型: {'成功' if success else '失败'}")
success = self.dvs.set_bias_voltage(self.bias_voltage)
self.textEdit.append(f"设置偏置电压: {'成功' if success else '失败'}")
success = self.dvs.set_trigger_source(self.trigger_external)
self.textEdit.append(f"设置触发源: {'成功' if success else '失败'}")
print("成功", "参数设置已应用")
except Exception as e:
print("失败", f"应用参数失败: {str(e)}")
def _toggle_collect(self):
"""切换采集状态(开始/暂停)"""
if not self.collecting:
# 开始采集
self._start_collect()
else:
# 停止采集
self._stop_collect()
def _start_collect(self):
"""启动采集"""
try:
self._init_all_data()
# 先应用参数
self._apply_parameters()
# 启动采集
if self.dvs.start_collected():
self.collecting = True
self.start_stop_btn.setText("暂停采集")
# 启动数据接收线程
self.data_receive_thread = DataReceiveThread(self.dvs)
self.data_receive_thread.data_signal.connect(self._handle_data)
self.data_receive_thread.start()
# 启动绘图更新定时器
self.plot_update_timer.start()
print("成功", "采集已启动")
else:
print("失败", "无法启动采集")
except Exception as e:
print( "失败", f"启动采集失败: {str(e)}")
def _stop_collect(self):
"""停止采集"""
try:
self.dvs.stop_collected()
self.collecting = False
self.start_stop_btn.setText("开始采集")
# 停止绘图更新定时器
self.plot_update_timer.stop()
# 停止数据接收线程
if self.data_receive_thread:
self.data_receive_thread.terminate()
self.data_receive_thread.wait()
print( "成功", "采集已暂停")
except Exception as e:
print( "失败", f"停止采集失败: {str(e)}")
def search_result(self):
parames = int(self.search_parames.currentIndex()+1)
_,result= self.dvs.search_parames(parames)
if parames == 1:
if result == 0:
result = 0.4
self.textEdit.append(f"分辨率: {result}m")
elif result == 1:
result = 0.8
self.textEdit.append(f"分辨率: {result}m")
elif result == 2:
result = 1.6
self.textEdit.append(f"分辨率: {result}m")
elif result == 3:
result = 3.2
self.textEdit.append(f"分辨率: {result}m")
elif result == 4:
result = 6.4
self.textEdit.append(f"分辨率: {result}m")
elif parames == 2:
self.textEdit.append(f"采样点数: {result}")
elif parames == 3:
self.textEdit.append(f"脉冲频率: {result}ns")
elif parames == 4:
self.textEdit.append(f"脉冲宽度: {result}ns")
elif parames == 5:
self.textEdit.append(f"采样延时: {result}")
elif parames == 6:
self.textEdit.append(f"解调标距: {result}")
elif parames == 7:
self.textEdit.append(f"数据类型: {result}")
elif parames == 8:
self.textEdit.append(f"偏置电压: {result}")
elif parames == 9:
self.textEdit.append(f"触发源: {result}")
else:
self.textEdit.setText("未知参数")
def _handle_data(self, data_packet, trigger_id):
"""
处理接收到的数据(只缓存,不直接更新UI)
:param data_packet: 数据包包典,需包含'sample_data'和'is_end'字段
:param trigger_id: 触发ID(预留扩展使用)
"""
# 边界处理:空数据包包直接返回,避免无效拼接
if not isinstance(data_packet, dict) or 'sample_data' not in data_packet:
return
sample_data = data_packet['sample_data']
self.raw_data = np.append(self.raw_data, sample_data)
# 数据接收完成时,处理数据并清空原始缓存
if data_packet.get('is_end', False):
self._process_raw_data()
self.raw_data = np.array([], dtype=np.int16)
def _process_raw_data(self):
"""
处理原始缓存数据,根据数据类型生成对应的数据1和数据2
:return: 处理后的data1, data2
"""
# 边界处理:原始数据为空时,返回空数组
if len(self.raw_data) == 0:
return np.array([], dtype=np.float64), np.array([], dtype=np.float64)
raw_data = self.raw_data.astype(np.float64)
if self.channels == 1:
data_phy = raw_data[0::2] / 512.0
data1 = self._update_audio_buffer(data_phy[self.pos])
data2 = np.square(data_phy)
else:
data_phy =raw_data[1::2] / 512.0
data1 = self._update_audio_buffer(data_phy[self.pos])
data2 = np.square(data_phy)
if self.is_playing:
self.audio_queue.append(data1.astype(np.float32))
self.pending_data(data1, data2)
def _update_audio_buffer(self, data):
"""
更新音频缓冲区(保持固定长度,超出部分截断头部)
:param data: 归一化后的相位数据
:return: 更新后的缓冲区数据
"""
# 更新缓冲区1,保持最大长度
self.audio_buffer = np.append(self.audio_buffer, data)
if len(self.audio_buffer) > self.audio_buffer_length:
self.audio_buffer = self.audio_buffer[-self.audio_buffer_length:] # 截断头部,保留尾部最新数据
return self.audio_buffer.copy()
def pending_data(self,data1,data2):
self.data1 = data1
self.data2 = data2
def _update_plots(self):
"""定时器触发:统一更新所有绘图(避免UI过载)"""
self.curve1.setData(self.data1)
self.curve2.setData( self.data2)
sample_count = len(self.data2)
# 更新瀑布图:添加新数据,移除最旧数据
if sample_count <= self.waterfall_data.shape[1]:
# 补零(如果采样点数不足)
new_row = np.zeros(self.waterfall_data.shape[1])
new_row[:self.samples_spin.value()] = self.data2
else:
# 截断(如果采样点数超出)
new_row = self.data2[:self.waterfall_data.shape[1]]
# 滚动更新瀑布图数据
self.waterfall_data = np.roll(self.waterfall_data, -1, axis=0)
self.waterfall_data[-1, :] = new_row
cb_values = self.waterfall_cbar.values
cb_min = cb_values[0]
cb_max = cb_values[1]
self.waterfall_img.setImage(self.waterfall_data.T, levels=(cb_min, cb_max))
def _calculate_blocksize(self, pulse_frequency):
"""计算音频块大小"""
blocksize = int(pulse_frequency * 0.05)
blocksize = max(128, min(blocksize, 2048))
return blocksize
def _init_audio_stream(self):
"""初始化音频流"""
try:
audio_sample_rate = int(self.pulse_freq_spin.value())
self.current_blocksize = self._calculate_blocksize(audio_sample_rate)
self.audio_stream = sd.OutputStream(
samplerate=audio_sample_rate,
channels=1,
dtype='float32',
callback=self._audio_callback,
blocksize=self.current_blocksize
)
return True
except Exception as e:
print(f"音频流初始化失败: {str(e)}")
print(f"音频流错误: {e}")
return False
def _audio_callback(self, outdata, frames, time, status):
"""音频回调函数"""
if status:
print(f"音频回调警告: {status}")
if self.audio_queue:
data = self.audio_queue.popleft()
if len(data) < frames:
data = np.pad(data, (0, frames - len(data)), mode='constant')
outdata[:] = data[:frames].reshape(-1, 1)
else:
outdata.fill(0.0)
def toggle_audio_play(self):
"""切换音频播放状态"""
if not self.is_playing:
if self._init_audio_stream():
self.audio_stream.start()
self.is_playing = True
self.play_btn.setText("暂停音频")
else:
if self.audio_stream:
self.audio_stream.stop()
self.audio_stream.close()
self.audio_stream = None
self.audio_queue.clear()
self.is_playing = False
self.play_btn.setText("播放音频")
def closeEvent(self, event):
"""窗口关闭事件:释放资源"""
if self.collecting:
self._stop_collect()
self.dvs.close()
event.accept()
# -------------------------- 程序入口 --------------------------
if __name__ == "__main__":
app = QApplication(sys.argv)
window = DVSUI()
window.show()
sys.exit(app.exec_())