Browse Source

Merge remote-tracking branch 'origin/master'

yuanan 2 tuần trước cách đây
mục cha
commit
58dd535903
5 tập tin đã thay đổi với 180 bổ sung81 xóa
  1. 141 23
      src/ems/bms/gold/gold_bms.rs
  2. 6 6
      src/ems/emu/emu.rs
  3. 12 14
      src/ems/pcs/pcs.rs
  4. 11 29
      src/internal/modbus/code.rs
  5. 10 9
      src/internal/modbus/mod.rs

+ 141 - 23
src/ems/bms/gold/gold_bms.rs

@@ -1,10 +1,12 @@
 use crate::cmd::config::app_config;
 use crate::connect_or_retry;
 use crate::ems::{Consumer, Producer, Service};
-use crate::internal::modbus::code::ModbusCode;
+use crate::internal::modbus::code::{ModbusCode, ModbusData, RegisterType, Rw};
+use crate::internal::modbus::{read_csv_to_code, slice_sequential, MAX_BIT_CNT, MAX_WORD_CNT};
 use crate::internal::utils;
+use anyhow::{bail, Error};
 use async_trait::async_trait;
-use log::info;
+use log::{error, info};
 use std::net::SocketAddr;
 use std::sync::Arc;
 use std::time::Duration;
@@ -26,22 +28,98 @@ impl GoldBms {
     pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
         let id = utils::generate_random_str(12);
         utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "bms/bms.log").await;
-        let context = connect_modbus_tcp().await?;
+        let config = read_csv_to_code("./config/gold_bms.csv").map_err(|e| {
+            error!("协议生成失败:{}", e);
+            anyhow::anyhow!(e)
+        })?;
+        let ctx = connect_modbus_tcp().await?;
         info!("BMS[{}]初始化成功!", id);
         Ok(GoldBms {
             id,
             producer,
             consumer: Arc::new(Mutex::new(consumer)),
-            modbus_code: Arc::new(Default::default()),
-            ctx: Arc::new(Mutex::new(context)),
+            modbus_code: Arc::new(Mutex::new(config)),
+            ctx: Arc::new(Mutex::new(ctx)),
         })
     }
-    async fn read_input_register(&self) -> anyhow::Result<()> {
-        self.ctx
-            .lock()
-            .await
-            .read_discrete_inputs(0, 10)
-            .await??;
+
+    //读取输入寄存器
+    async fn read_input_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
+        let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
+        for codes in seq_slice.into_iter() {
+            if let Some(start_code) = codes.first() {
+                let words = self
+                    .ctx
+                    .lock()
+                    .await
+                    .read_input_registers(start_code.addr, codes.len() as u16)
+                    .await??;
+                if words.len() == codes.len() {
+                    words.iter().enumerate().for_each(|(i, x)| {
+                        codes[i].data = Some(ModbusData::Word(*x));
+                    });
+                } else {
+                    bail!(
+                        "返回的数据长度不正确, register长度:{}, data长度:{}",
+                        vec.len(),
+                        words.len()
+                    )
+                }
+            }
+        }
+        Ok(())
+    }
+
+    //读取离散输入寄存器
+    async fn read_discrete_input(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
+        let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_BIT_CNT);
+        for codes in seq_slice.into_iter() {
+            if let Some(start_code) = codes.first() {
+                let words = self
+                    .ctx
+                    .lock()
+                    .await
+                    .read_discrete_inputs(start_code.addr, codes.len() as u16)
+                    .await??;
+                if words.len() == codes.len() {
+                    for (i, x) in words.into_iter().enumerate() {
+                        codes[i].data = Some(ModbusData::Bit(x))
+                    }
+                } else {
+                    bail!(
+                        "返回的数据长度不正确, register长度:{}, data长度:{}",
+                        vec.len(),
+                        words.len()
+                    )
+                }
+            }
+        }
+        Ok(())
+    }
+
+    async fn read_holding_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
+        let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
+        for codes in seq_slice.into_iter() {
+            if let Some(start_code) = codes.first() {
+                let words = self
+                    .ctx
+                    .lock()
+                    .await
+                    .read_holding_registers(start_code.addr, codes.len() as u16)
+                    .await??;
+                if words.len() == codes.len() {
+                    words.iter().enumerate().for_each(|(i, x)| {
+                        codes[i].data = Some(ModbusData::Word(*x));
+                    });
+                } else {
+                    bail!(
+                        "返回的数据长度不正确, register长度:{}, data长度:{}",
+                        vec.len(),
+                        words.len()
+                    )
+                }
+            }
+        }
         Ok(())
     }
 }
@@ -51,25 +129,65 @@ impl Service for GoldBms {
     async fn south(&self) {
         loop {
             tokio::time::sleep(Duration::from_millis(app_config().emu.bms.interval)).await;
-            match self.read_input_register().await {
-                Ok(()) => {
-                    println!("读取数据成功!");
-                }
-                Err(e) => {
-                    println!("Modbus 读取失败: {:?},断开连接", e);
-                    let mut x = self.ctx.lock().await;
-                    *x = connect_modbus_tcp().await.unwrap();
-                }
+            let code_clone = self.modbus_code.clone();
+            let mut modbus_guard = code_clone.lock().await;
+            if let Err(e) = self
+                .read_discrete_input(
+                    modbus_guard
+                        .iter_mut()
+                        .filter(|it| it.code == RegisterType::DiscreteInput)
+                        .collect(),
+                )
+                .await
+            {
+                handle_modbus_error(&e, &self.ctx, "读取离散寄存器失败").await;
+            }
+
+            if let Err(e) = self
+                .read_input_register(
+                    modbus_guard
+                        .iter_mut()
+                        .filter(|it| it.code == RegisterType::InputRegister)
+                        .collect(),
+                )
+                .await
+            {
+                handle_modbus_error(&e, &self.ctx, "读取输入寄存器失败").await;
+            }
+
+            if let Err(e) = self
+                .read_holding_register(
+                    modbus_guard
+                        .iter_mut()
+                        .filter(|it| {
+                            it.code == RegisterType::HoldingRegister && it.rw == Some(Rw::RW)
+                        })
+                        .collect(),
+                )
+                .await
+            {
+                handle_modbus_error(&e, &self.ctx, "读取保持寄存器失败").await;
             }
         }
     }
 
-    async fn north(&self) {
-    }
+    async fn north(&self) {}
 }
 
 async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
     let config = &app_config().emu.bms.gold_bms;
     let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
-    connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "ModbusTCP")
+    connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "[BMS] ModbusTCP")
+}
+
+async fn handle_modbus_error(e: &Error, ctx: &Mutex<tokio_modbus::client::Context>, log_str: &str) {
+    //错误是tokio_modbus的则重新连接
+    if let Some(_) = e.downcast_ref::<tokio_modbus::Error>() {
+        let mut ctx = ctx.lock().await;
+        if let Ok(context) = connect_modbus_tcp().await {
+            *ctx = context;
+        }
+    } else {
+        error!("{log_str}: {}", e);
+    }
 }

+ 6 - 6
src/ems/emu/emu.rs

@@ -49,12 +49,12 @@ impl Emu {
                 );
             }
 
-            // if let Ok(p) = pcs {
-            //     devices.insert(
-            //         DevType::PCS,
-            //         Device::new(p.id.clone(), "PCS", Arc::new(p), pcs_rx),
-            //     );
-            // }
+            if let Ok(p) = pcs {
+                devices.insert(
+                    DevType::PCS,
+                    Device::new(p.id.clone(), "PCS", Arc::new(p), pcs_rx),
+                );
+            }
         }
         info!("EMU初始化完成");
         Emu {

+ 12 - 14
src/ems/pcs/pcs.rs

@@ -2,7 +2,7 @@ use crate::cmd::config::app_config;
 use crate::connect_or_retry;
 use crate::ems::{Consumer, Producer, Service};
 use crate::internal::modbus::code::{ModbusCode, RegisterType};
-use crate::internal::modbus::{read_csv_to_code, slice_sequential};
+use crate::internal::modbus::{ slice_sequential, MAX_WORD_CNT};
 use crate::internal::utils;
 use anyhow::bail;
 use log::{error, info};
@@ -22,7 +22,7 @@ pub struct Pcs {
 async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
     let config = &app_config().emu.pcs;
     let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
-    connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "ModbusTCP")
+    connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "[PCS] ModbusTCP")
 }
 
 impl Pcs {
@@ -31,13 +31,14 @@ impl Pcs {
         let id = utils::generate_random_str(12);
         utils::log::init_log("inpower_iot_mgc_rs::ems::pcs::*", "pcs/pcs.log").await;
         let ctx = connect_modbus_tcp().await?;
-        let modbus_code = match read_csv_to_code("./config/pcs.csv") {
-            Ok(c) => c,
-            Err(e) => {
-                error!("【PCS】协议生成失败: {}", e.to_string());
-                return Err(e);
-            }
-        };
+        let modbus_code = vec![];
+        // match read_csv_to_code("./config/pcs.csv") {
+        //     Ok(c) => c,
+        //     Err(e) => {
+        //         error!("【PCS】协议生成失败: {}", e.to_string());
+        //         return Err(e);
+        //     }
+        // };
         info!("PCS [{}] 初始化成功", id);
         Ok(Pcs {
             id,
@@ -57,7 +58,7 @@ impl Pcs {
             .filter(|it| it.code == RegisterType::InputRegister)
             .collect();
         //连续子序列分组
-        let seq_slice = slice_sequential(input_registers.as_mut_slice());
+        let seq_slice = slice_sequential(input_registers.as_mut_slice(), MAX_WORD_CNT);
         //遍历分组
         for codes in seq_slice.into_iter() {
             if let Some(start_code) = codes.first() {
@@ -69,10 +70,7 @@ impl Pcs {
                     .read_input_registers(start_code.addr, codes.len() as u16)
                     .await??;
                 if words.len() == codes.len() {
-                    for (i, word) in words.chunks(2).enumerate() {
-                        //大端序
-                        codes[i].data = Some([word[0] as u8, word[1] as u8]);
-                    }
+
                 } else {
                     bail!(
                         "返回的数据长度不正确, register长度:{}, data长度:{}",

+ 11 - 29
src/internal/modbus/code.rs

@@ -1,6 +1,5 @@
 use crate::internal::modbus::{BaseModbusCode, CsvData};
 use anyhow::anyhow;
-use std::collections::HashMap;
 
 /// 寄存器类型
 #[derive(Debug, Eq, PartialEq)]
@@ -58,7 +57,7 @@ impl TryFrom<String> for ModbusDataType {
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Eq, PartialEq)]
 pub enum Rw {
     R,
     W,
@@ -78,18 +77,22 @@ impl TryFrom<String> for Rw {
     }
 }
 
+#[derive(Debug)]
+pub enum ModbusData {
+    Word(u16),
+    Bit(bool),
+}
+
 #[derive(Debug)]
 pub struct ModbusCode {
     pub addr: u16,
     pub code: RegisterType,
-    pub data: Option<[u8; 2]>,
+    pub data: Option<ModbusData>,
     pub name: String,
     pub data_type: Option<ModbusDataType>,
     pub factor: Option<f32>,
     pub rw: Option<Rw>,
-    pub unit: Option<String>,
-    pub note: Option<HashMap<i32, String>>,
-    pub desc: Option<String>,
+    pub offset: Option<i16>,
 }
 
 impl ModbusCode {}
@@ -121,27 +124,8 @@ impl TryFrom<CsvData> for ModbusCode {
             None => None,
             Some(it) => Some(it.try_into()?),
         };
+        let offset = value.offset;
         let factor = value.factor;
-        let unit = value.unit;
-        let desc = value.desc;
-        let mut note = None;
-        if let Some(n) = value.note {
-            let mut hash_map = HashMap::new();
-            for x in n.split('\n').into_iter() {
-                let kv = x.split(':').collect::<Vec<&str>>();
-                if let Some(key) = kv.get(0) {
-                    let k = if key.starts_with("0x") {
-                        i32::from_str_radix(key.trim().replace("0x", "").as_ref(), 16)?
-                    } else {
-                        key.parse::<i32>()?
-                    };
-                    if let Some(val) = kv.get(1) {
-                        hash_map.insert(k, val.trim().to_string());
-                    }
-                }
-            }
-            note = Some(hash_map);
-        }
         Ok(ModbusCode {
             addr,
             code,
@@ -150,9 +134,7 @@ impl TryFrom<CsvData> for ModbusCode {
             data_type,
             factor,
             rw,
-            unit,
-            note,
-            desc,
+            offset,
         })
     }
 }

+ 10 - 9
src/internal/modbus/mod.rs

@@ -3,6 +3,11 @@ use serde::{Deserialize, Serialize};
 
 pub mod code;
 
+///最长寄存器字节数量
+pub const MAX_WORD_CNT: usize = 120;
+///最长寄存器位数量
+pub const MAX_BIT_CNT: usize = 2000;
+
 /// CSV对应的结构体
 #[derive(Debug, Serialize, Deserialize)]
 pub struct CsvData {
@@ -13,9 +18,7 @@ pub struct CsvData {
     pub t: Option<String>,
     pub rw: Option<String>,
     pub factor: Option<f32>,
-    pub unit: Option<String>,
-    pub note: Option<String>,
-    pub desc: Option<String>,
+    pub offset: Option<i16>,
 }
 
 /// 从路径读取csv文档并解析为集合
@@ -38,11 +41,9 @@ pub trait BaseModbusCode {
     fn addr(&self) -> u16;
 }
 
-const MAX_REGISTER_COUNT: usize = 120;
-
 /// 将序列按照连续子序列分组
 /// 如: [1,3,2,7,8,11,10] 分割为 [[1,2,3],[7,8],[10,11]]
-pub fn slice_sequential<T>(codes: &mut [T]) -> Vec<&mut [T]>
+pub fn slice_sequential<T>(codes: &mut [T], max_count: usize) -> Vec<&mut [T]>
 where
     T: BaseModbusCode,
 {
@@ -54,7 +55,7 @@ where
         while len < remaining.len() {
             let prev_addr = remaining[len - 1].addr();
             let curr_addr = remaining[len].addr();
-            if prev_addr + 1 == curr_addr && len < MAX_REGISTER_COUNT {
+            if prev_addr + 1 == curr_addr && len < max_count {
                 len += 1;
             } else {
                 break;
@@ -69,7 +70,7 @@ where
 
 /// 将序列按照连续子序列分组
 /// 如: [1,3,2,7,8,11,10] 分割为 [[1,2,3],[7,8],[10,11]]
-pub fn sequential<T: BaseModbusCode>(mut codes: Vec<T>) -> Vec<Vec<T>> {
+pub fn sequential<T: BaseModbusCode>(mut codes: Vec<T>, max_count: usize) -> Vec<Vec<T>> {
     //按地址升序排序
     codes.sort_by(|a, b| a.addr().cmp(&b.addr()));
     if codes.is_empty() {
@@ -85,7 +86,7 @@ pub fn sequential<T: BaseModbusCode>(mut codes: Vec<T>) -> Vec<Vec<T>> {
                 let prev_addr = prev.addr();
                 let curr_addr = current.addr();
                 if prev_addr + 1 == curr_addr {
-                    if current_group.len() >= MAX_REGISTER_COUNT {
+                    if current_group.len() >= max_count {
                         vec.push(current_group);
                         current_group = Vec::new();
                     }