2026-06-16 11:35:00
Physical Layer: TTL 5V Baud Rate: 115200 Data Bits: 8 Parity Bit: None Stop Bit: 1
Modbus-RTU, Slave Address: 1
| Address | Length | Read/Write | Data Type | Name | Value Range | Unit |
|---|---|---|---|---|---|---|
| 0x0000 | 1 | RW | uint16 | EDFA Switch | 0 - Off, 1 - On | - |
| 0x0001 | 1 | RW | uint16 | EDFA Current Setpoint | [0, 1000] | DAC Code |
| 0x0002 | 1 | RW | uint16 | EDFA Responsivity | [0, 4095] | μA/W |
| 0x0003 | 1 | RW | uint16 | Raman 1 Switch | 0 - Off, 1 - On | - |
| 0x0004 | 1 | RW | uint16 | Raman 1 Current Setpoint | [0, 2500] | DAC Code |
| 0x0005 | 1 | RW | int16 | Raman 1 Responsivity | [0, 4095] | μA/W |
| 0x0006 | 1 | RW | uint16 | Raman 2 Switch | 0 - Off, 1 - On | - |
| 0x0007 | 1 | RW | uint16 | Raman 2 Current Setpoint | [0, 2500] | DAC Code |
| 0x0008 | 1 | RW | int16 | Raman 2 Responsivity | [0, 4095] | μA/W |
| 0x0009 | 1 | RW | uint16 | VOA1 Setpoint | [0, 4095] | DAC Code |
| 0x000A | 1 | RW | uint16 | VOA2 Setpoint | [0, 4095] | DAC Code |
| 0x000B | 1 | RW | uint16 | VOA3 Setpoint | [0, 4095] | DAC Code |
| 0x000C | 1 | R | int16 | Module Temperature | - | 0.1℃ |
| 0x000D | 1 | R | uint16 | EDFA Temperature Variation | - | DAC Code |
| 0x000E | 1 | R | uint16 | EDFA Output Optical Power | - | mW |
| 0x000F | 1 | R | uint16 | EDFA Current Reading | - | mA |
| 0x0010 | 1 | R | uint16 | Raman 1 Temperature Variation | - | DAC Code |
| 0x0011 | 1 | R | uint16 | Raman 1 Output Optical Power | - | mW |
| 0x0012 | 1 | R | uint16 | Raman 1 Current Reading | - | mA |
| 0x0013 | 1 | R | uint16 | Raman 2 Temperature Variation | - | DAC Code |
| 0x0014 | 1 | R | uint16 | Raman 2 Output Optical Power | - | mW |
| 0x0015 | 1 | R | uint16 | Raman 2 Current Reading | - | mA |
main.py
import sys,os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from com import *
if __name__ == '__main__':
device = read_config()
print(device.to_string())
#result = update_laser_config('edfa_current', 256)
#if result != "":
# print("修改失败:", result)
com.py
import serial
from device import DeviceRegisters
#from config.config import global_config
select_cmd = bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])
def calculate_crc(data):
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
if (crc & 1) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc
def parse(response_data):
data = response_data[3:-2]
crc_received = response_data[-2:]
crc_calculated = calculate_crc(response_data[:-2])
crc_calculated_byte = crc_calculated.to_bytes(2, byteorder='little')
# 检查 CRC 是否匹配
if crc_received != crc_calculated_byte:
print(f"CRC 校验失败,接收到的 CRC 为 {crc_received.hex()},计算出的 CRC 为 {crc_calculated_byte.hex()}")
return None
device = DeviceRegisters()
device.from_modbus_response(data)
# print(device.to_string())
return device
def connect():
# 配置串行端口参数
port ='/dev/ttyUSB0'
baudrate = 115200
data_bits = serial.EIGHTBITS
parity = serial.PARITY_NONE
stop_bits = serial.STOPBITS_ONE
# 创建串行连接
ser = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=data_bits,
parity=parity,
stopbits=stop_bits,
timeout=1 # 设置读取超时时间(秒)
)
# 检查串行端口是否打开
if ser.is_open:
# print(f"串行端口 {port} 已打开.")
return ser
return None
def combo_cmd(command):
crc = calculate_crc(command)
command.append(crc & 0xFF) # 低字节
command.append((crc >> 8) & 0xFF) # 高字节
# print(f"组合完成的数据: {command.hex()}")
return command
def read_config():
ser = connect()
if ser is None:
return False
try:
# 发送请求指令
ser.write(combo_cmd(select_cmd.copy()))
# 读取响应数据
response_data = ser.read(49) # 读取 47 字节(从站地址 + 功能码 + 数据字节数 + 44 字节数据 + 2 字节CRC)
if len(response_data) < 49:
print("响应数据长度不足,可能没有收到完整的数据。", response_data)
# 关闭串行端口
ser.close()
# print(f"串行端口已关闭.")
except Exception as e:
print(f"发送或读取数据时出错: {e}")
# 解析数据
return parse(response_data)
def set_update_command(value, type):
high_byte = (value >> 8) & 0xFF
low_byte = value & 0xFF
command = bytearray([])
if type == 'edfa_current':
command = bytearray([0x01, 0x06, 0x00, 0x01, high_byte, low_byte])
elif type == 'voa1_voltage':
command = bytearray([0x01, 0x06, 0x00, 0x09, high_byte, low_byte])
elif type == 'voa2_voltage':
command = bytearray([0x01, 0x06, 0x00, 0x0a, high_byte, low_byte])
elif type == 'voa3_voltage':
command = bytearray([0x01, 0x06, 0x00, 0x0b, high_byte, low_byte])
else:
return None
return combo_cmd(command)
def update_laser_config(type, value):
cmd = set_update_command(value, type)
if cmd is None:
return "不支持的参数"
ser = connect()
if ser is None:
return "串口连接失败"
n = ser.write(cmd)
ser.close()
if n <= 0:
return "发送命令失败"
print(type, "更新成功")
return ""
device.py
class DeviceRegisters:
def __init__(self):
# EDFA开关,初始值为 0
self.edfa_status = 0
# EDFA电流设定值,初始值为 0
self.edfa_current = 0
# EDFA响应度,初始值为 0
self.edfa_responsivity = 0
# EDFA温度变化量,初始值为 0
self.edfa_temperature = 0
# EDFA出光功率,初始值为 0
self.edfa_pump_power = 0
# EDFA电流读取值,初始值为 0
self.edfa_pump_current = 0
# VOA电压值,初始值为 0
self.voa1_voltage = 0
self.voa2_voltage = 0
self.voa3_voltage = 0
# 模块温度值,初始值为 0
self.module_temperature = 0
def from_modbus_response(self, response_data):
"""
从 Modbus 响应数据中解析值
假设 response_data 已经去掉了从站地址、功能码和 CRC 校验码
"""
self.edfa_status = (response_data[0] << 8) + response_data[1]
self.edfa_current = (response_data[2] << 8) + response_data[3]
self.edfa_responsivity = (response_data[4] << 8) + response_data[5]
self.voa1_voltage = (response_data[18] << 8) + response_data[19]
self.voa2_voltage = (response_data[20] << 8) + response_data[21]
self.voa3_voltage = (response_data[22] << 8) + response_data[23]
self.module_temperature = ((response_data[24] << 8) + response_data[25]) / 10
# self.edfa_temperature = (response_data[14] << 8) + response_data[15]
self.edfa_pump_power = (response_data[28] << 8) + response_data[29]
self.edfa_pump_current = (response_data[30] << 8) + response_data[31]
def to_string(self):
"""
返回值的字符串表示
"""
return (
f"EDFA开关: {self.edfa_status}\n"
f"EDFA电流设定值: {self.edfa_current}\n"
f"EDFA响应度: {self.edfa_responsivity}\n"
f"VOA1 电压值: {self.voa1_voltage}\n"
f"VOA2 电压值: {self.voa2_voltage}\n"
f"VOA3 电压值: {self.voa3_voltage}\n"
f"模块温度: {self.module_temperature}\n"
f"EDFA温度变化量: {self.edfa_temperature}\n"
f"EDFA出光功率: {self.edfa_pump_power}\n"
f"EDFA电流读取值: {self.edfa_pump_current}\n"
)
def as_dict(self):
return self.__dict__
the value read