use crate::cmd::config::app_config; use crate::connect_or_retry; use crate::ems::{Consumer, Producer, Service}; use crate::internal::modbus::code::ModbusCode; use crate::internal::modbus::{read_csv_to_code, slice_sequential}; use crate::internal::utils; use anyhow::bail; use log::{error, info}; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::Mutex; use tokio_modbus::client::Reader; use tokio_modbus::FunctionCode; pub struct Pcs { pub id: String, pub producer: Producer, pub consumer: Arc>, pub modbus_code: Arc>>, pub ctx: Arc>, } async fn connect_modbus_tcp() -> anyhow::Result { let config = &app_config().emu.pcs; let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?; connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "ModbusTCP") } impl Pcs { /// 初始化PCS pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result { let id = utils::generate_random_str(12); utils::log::init_log("inpower_iot_mgc_rs::ems::pcs::*", "pcs/pcs.log").await; let ctx = connect_modbus_tcp().await?; let modbus_code = match read_csv_to_code("./config/pcs.csv") { Ok(c) => c, Err(e) => { error!("【PCS】协议生成失败: {}", e.to_string()); return Err(e); } }; info!("PCS [{}] 初始化成功", id); Ok(Pcs { id, producer, consumer: Arc::new(Mutex::new(consumer)), modbus_code: Arc::new(Mutex::new(modbus_code)), ctx: Arc::new(Mutex::new(ctx)), }) } async fn read_input_register(&self) -> anyhow::Result<()> { let modbus_code_clone = self.modbus_code.clone(); let mut modbus_guard = modbus_code_clone.lock().await; // 从整个配置中过滤出输入寄存器集合 let mut input_registers: Vec<_> = modbus_guard .iter_mut() .filter(|it| { it.addr.is_some() && it.name.is_some() && it.code == Some(FunctionCode::ReadInputRegisters.value()) }) .collect(); //连续子序列分组 let seq_slice = slice_sequential(input_registers.as_mut_slice()); //遍历分组 for codes in seq_slice.into_iter() { if let Some(start_code) = codes.first() { if let Some(start_addr) = start_code.addr { let words = self .ctx .lock() .await //读取输入寄存器 .read_input_registers(start_addr, codes.len() as u16) .await??; if words.len() == codes.len() { for (i, word) in words.chunks(2).enumerate() { //大端序 codes[i].data = Some([word[0] as u8, word[1] as u8]); } } else { bail!( "返回的数据长度不正确, register长度:{}, data长度:{}", input_registers.len(), words.len() ) } } } } Ok(()) } // async fn read_holding_registers() -> anyhow::Result<()> { // todo!("") // } } #[async_trait::async_trait] impl Service for Pcs { async fn read_task(&self) { let _producer = self.producer.clone(); loop { tokio::time::sleep(tokio::time::Duration::from_millis( app_config().emu.pcs.interval, )) .await; match self.read_input_register().await { Ok(_) => {} Err(e) => { error!("{}", e.to_string()); //断线重连 let context = connect_modbus_tcp().await.unwrap(); *self.ctx.lock().await = context; } } } } async fn write_task(&self) { let consumer = self.consumer.clone(); let _ctx = self.ctx.clone(); loop { let string = consumer.lock().await.recv().await.unwrap(); info!("[PCS] {}", string); } } }