use crate::cmd::config::app_config; use crate::connect_or_retry; use crate::ems::{Consumer, Producer, Service}; use crate::internal::modbus::code::{ModbusCode, ModbusData, RegisterType, Rw}; use crate::internal::modbus::{read_csv_to_code, slice_sequential, MAX_BIT_CNT, MAX_WORD_CNT}; use crate::internal::utils; use anyhow::{bail, Error}; use async_trait::async_trait; use log::{error, info}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; use tokio_modbus::client::Reader; /// /// 高特BMS /// Modbus-TCP pub struct GoldBms { pub id: String, pub producer: Producer, pub consumer: Arc>, pub modbus_code: Arc>>, pub ctx: Arc>, } impl GoldBms { pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result { let id = utils::generate_random_str(12); utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "bms/bms.log").await; let config = read_csv_to_code("./config/gold_bms.csv").map_err(|e| { error!("协议生成失败:{}", e); anyhow::anyhow!(e) })?; let ctx = connect_modbus_tcp().await?; info!("BMS[{}]初始化成功!", id); Ok(GoldBms { id, producer, consumer: Arc::new(Mutex::new(consumer)), modbus_code: Arc::new(Mutex::new(config)), ctx: Arc::new(Mutex::new(ctx)), }) } //读取输入寄存器 async fn read_input_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> { let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT); for codes in seq_slice.into_iter() { if let Some(start_code) = codes.first() { let words = self .ctx .lock() .await .read_input_registers(start_code.addr, codes.len() as u16) .await??; if words.len() == codes.len() { words.iter().enumerate().for_each(|(i, x)| { codes[i].data = Some(ModbusData::Word(*x)); }); } else { bail!( "返回的数据长度不正确, register长度:{}, data长度:{}", vec.len(), words.len() ) } } } Ok(()) } //读取离散输入寄存器 async fn read_discrete_input(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> { let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_BIT_CNT); for codes in seq_slice.into_iter() { if let Some(start_code) = codes.first() { let words = self .ctx .lock() .await .read_discrete_inputs(start_code.addr, codes.len() as u16) .await??; if words.len() == codes.len() { for (i, x) in words.into_iter().enumerate() { codes[i].data = Some(ModbusData::Bit(x)) } } else { bail!( "返回的数据长度不正确, register长度:{}, data长度:{}", vec.len(), words.len() ) } } } Ok(()) } async fn read_holding_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> { let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT); for codes in seq_slice.into_iter() { if let Some(start_code) = codes.first() { let words = self .ctx .lock() .await .read_holding_registers(start_code.addr, codes.len() as u16) .await??; if words.len() == codes.len() { words.iter().enumerate().for_each(|(i, x)| { codes[i].data = Some(ModbusData::Word(*x)); }); } else { bail!( "返回的数据长度不正确, register长度:{}, data长度:{}", vec.len(), words.len() ) } } } Ok(()) } } #[async_trait] impl Service for GoldBms { async fn south(&self) { loop { tokio::time::sleep(Duration::from_millis(app_config().emu.bms.interval)).await; let code_clone = self.modbus_code.clone(); let mut modbus_guard = code_clone.lock().await; if let Err(e) = self .read_discrete_input( modbus_guard .iter_mut() .filter(|it| it.code == RegisterType::DiscreteInput) .collect(), ) .await { handle_modbus_error(&e, &self.ctx, "读取离散寄存器失败").await; } if let Err(e) = self .read_input_register( modbus_guard .iter_mut() .filter(|it| it.code == RegisterType::InputRegister) .collect(), ) .await { handle_modbus_error(&e, &self.ctx, "读取输入寄存器失败").await; } if let Err(e) = self .read_holding_register( modbus_guard .iter_mut() .filter(|it| { it.code == RegisterType::HoldingRegister && it.rw == Some(Rw::RW) }) .collect(), ) .await { handle_modbus_error(&e, &self.ctx, "读取保持寄存器失败").await; } } } async fn north(&self) {} } async fn connect_modbus_tcp() -> anyhow::Result { let config = &app_config().emu.bms.gold_bms; let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?; connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "[BMS] ModbusTCP") } async fn handle_modbus_error(e: &Error, ctx: &Mutex, log_str: &str) { //错误是tokio_modbus的则重新连接 if let Some(_) = e.downcast_ref::() { let mut ctx = ctx.lock().await; if let Ok(context) = connect_modbus_tcp().await { *ctx = context; } } else { error!("{log_str}: {}", e); } }