123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- use crate::cmd::config::app_config;
- use crate::connect_or_retry;
- use crate::ems::{Consumer, Producer, Service};
- use crate::internal::modbus::code::ModbusCode;
- use crate::internal::modbus::{read_csv_to_code, slice_sequential};
- use crate::internal::utils;
- use anyhow::bail;
- use log::{error, info};
- use std::net::SocketAddr;
- use std::sync::Arc;
- use tokio::sync::Mutex;
- use tokio_modbus::client::Reader;
- use tokio_modbus::FunctionCode;
- pub struct Pcs {
- pub id: String,
- pub producer: Producer,
- pub consumer: Arc<Mutex<Consumer>>,
- pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
- pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
- }
- async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
- let config = &app_config().emu.pcs;
- let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
- connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "ModbusTCP")
- }
- impl Pcs {
- /// 初始化PCS
- pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
- let id = utils::generate_random_str(12);
- utils::log::init_log("inpower_iot_mgc_rs::ems::pcs::*", "pcs/pcs.log").await;
- let ctx = connect_modbus_tcp().await?;
- let modbus_code = match read_csv_to_code("./config/pcs.csv") {
- Ok(c) => c,
- Err(e) => {
- error!("【PCS】协议生成失败: {}", e.to_string());
- return Err(e);
- }
- };
- info!("PCS [{}] 初始化成功", id);
- Ok(Pcs {
- id,
- producer,
- consumer: Arc::new(Mutex::new(consumer)),
- modbus_code: Arc::new(Mutex::new(modbus_code)),
- ctx: Arc::new(Mutex::new(ctx)),
- })
- }
- async fn read_input_register(&self) -> anyhow::Result<()> {
- let modbus_code_clone = self.modbus_code.clone();
- let mut modbus_guard = modbus_code_clone.lock().await;
- // 从整个配置中过滤出输入寄存器集合
- let mut input_registers: Vec<_> = modbus_guard
- .iter_mut()
- .filter(|it| {
- it.addr.is_some()
- && it.name.is_some()
- && it.code == Some(FunctionCode::ReadInputRegisters.value())
- })
- .collect();
- //连续子序列分组
- let seq_slice = slice_sequential(input_registers.as_mut_slice());
- //遍历分组
- for codes in seq_slice.into_iter() {
- if let Some(start_code) = codes.first() {
- if let Some(start_addr) = start_code.addr {
- let words = self
- .ctx
- .lock()
- .await
- //读取输入寄存器
- .read_input_registers(start_addr, codes.len() as u16)
- .await??;
- if words.len() == codes.len() {
- for (i, word) in words.chunks(2).enumerate() {
- //大端序
- codes[i].data = Some([word[0] as u8, word[1] as u8]);
- }
- } else {
- bail!(
- "返回的数据长度不正确, register长度:{}, data长度:{}",
- input_registers.len(),
- words.len()
- )
- }
- }
- }
- }
- Ok(())
- }
- // async fn read_holding_registers() -> anyhow::Result<()> {
- // todo!("")
- // }
- }
- #[async_trait::async_trait]
- impl Service for Pcs {
- async fn read_task(&self) {
- let _producer = self.producer.clone();
- loop {
- tokio::time::sleep(tokio::time::Duration::from_millis(
- app_config().emu.pcs.interval,
- ))
- .await;
- match self.read_input_register().await {
- Ok(_) => {}
- Err(e) => {
- error!("{}", e.to_string());
- //断线重连
- let context = connect_modbus_tcp().await.unwrap();
- *self.ctx.lock().await = context;
- }
- }
- }
- }
- async fn write_task(&self) {
- let consumer = self.consumer.clone();
- let _ctx = self.ctx.clone();
- loop {
- let string = consumer.lock().await.recv().await.unwrap();
- info!("[PCS] {}", string);
- }
- }
- }
|