123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- use crate::cmd::config::app_config;
- use crate::ems::pcs::pcs_conf;
- use crate::ems::{Consumer, Producer, Service};
- use crate::internal::modbus::code::ModbusCode;
- use crate::internal::utils;
- use anyhow::bail;
- use log::{error, info};
- use std::sync::Arc;
- use tokio::sync::Mutex;
- use tokio_modbus::client::{Reader, Writer};
- use tokio_modbus::FunctionCode;
- pub struct Pcs {
- pub id: String,
- pub producer: Producer,
- pub consumer: Arc<Mutex<Consumer>>,
- pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
- pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
- }
- /// Modbus-TCP连接
- /// FIXME 考虑重连机制
- async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
- let pcs_config = &app_config().emu.pcs;
- let ctx = tokio_modbus::client::tcp::connect(
- format!("{}:{}", pcs_config.host, pcs_config.port).parse()?,
- )
- .await?;
- Ok(ctx)
- }
- impl Pcs {
- /// 初始化PCS
- pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
- let id = utils::generate_random_str(12);
- utils::log::init_log("inpower_iot_mgc_rs::ems::pcs::*", "pcs/pcs.log").await;
- let ctx = match connect_modbus_tcp().await {
- Ok(c) => c,
- Err(e) => {
- error!(
- "【PCS】Modbus Tcp {}连接失败: {}",
- &app_config().emu.pcs.host,
- e.to_string()
- );
- return Err(e);
- }
- };
- let modbus_code = match pcs_conf() {
- Ok(c) => c,
- Err(e) => {
- error!("【PCS】协议生成失败: {}", e.to_string());
- return Err(e);
- }
- };
- info!("PCS [{}] 初始化成功", id);
- Ok(Pcs {
- id,
- producer,
- consumer: Arc::new(Mutex::new(consumer)),
- modbus_code: Arc::new(Mutex::new(modbus_code)),
- ctx: Arc::new(Mutex::new(ctx)),
- })
- }
- async fn read_input_register(&self) -> anyhow::Result<()> {
- let modbus_code_clone = self.modbus_code.clone();
- let mut modbus_guard = modbus_code_clone.lock().await;
- // 从整个配置中过滤出输入寄存器集合
- let mut input_registers: Vec<_> = modbus_guard
- .iter_mut()
- .filter(|it| {
- it.addr.is_some()
- && it.name.is_some()
- && it.code == Some(FunctionCode::ReadInputRegisters.value())
- })
- .collect();
- if let Some(start_register) = input_registers.first() {
- if let Some(addr) = start_register.addr {
- let words = self
- .ctx
- .lock()
- .await
- .read_input_registers(addr, input_registers.len() as u16)
- .await??;
- if words.len() == input_registers.len() {
- for (i, word2) in words.chunks(2).enumerate() {
- if let Some(tcp_code) = input_registers.get_mut(i) {
- //大端序
- tcp_code.data = Some([word2[0] as u8, word2[1] as u8]);
- }
- }
- } else {
- bail!(
- "返回的数据长度不正确, register长度:{}, data长度:{}",
- input_registers.len(),
- words.len()
- )
- }
- }
- }
- Ok(())
- }
- }
- #[async_trait::async_trait]
- impl Service for Pcs {
- async fn read_task(&self) {
- let producer = self.producer.clone();
- loop {
- tokio::time::sleep(tokio::time::Duration::from_millis(
- app_config().emu.ems.read_interval,
- ))
- .await;
- match self.read_input_register().await {
- Ok(_) => {}
- Err(e) => {
- error!("{}", e.to_string());
- //断线重连
- let context = connect_modbus_tcp().await.unwrap();
- *self.ctx.lock().await = context;
- }
- }
- }
- }
- async fn write_task(&self) {
- let consumer = self.consumer.clone();
- let ctx = self.ctx.clone();
- loop {
- let string = consumer.lock().await.recv().await.unwrap();
- info!("[PCS] {}", string);
- let _ = ctx
- .lock()
- .await
- .write_multiple_registers(1000, &[0xAE10])
- .await;
- }
- }
- }
|