浏览代码

【功能】分段读取寄存器

mikasa 1 月之前
父节点
当前提交
603097f487
共有 5 个文件被更改,包括 124 次插入42 次删除
  1. 46 0
      Cargo.lock
  2. 2 0
      Cargo.toml
  3. 26 21
      src/ems/pcs/pcs.rs
  4. 6 1
      src/internal/modbus/code.rs
  5. 44 20
      src/internal/modbus/mod.rs

+ 46 - 0
Cargo.lock

@@ -329,6 +329,25 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
+dependencies = [
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-utils"
 version = "0.8.21"
@@ -356,6 +375,12 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "either"
+version = "1.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
+
 [[package]]
 name = "equivalent"
 version = "1.0.2"
@@ -575,6 +600,7 @@ dependencies = [
  "mosquitto-rs",
  "openssl",
  "rand",
+ "rayon",
  "serde",
  "serde_yml",
  "thiserror 2.0.12",
@@ -949,6 +975,26 @@ dependencies = [
  "getrandom",
 ]
 
+[[package]]
+name = "rayon"
+version = "1.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
+dependencies = [
+ "either",
+ "rayon-core",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
+dependencies = [
+ "crossbeam-deque",
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "redox_syscall"
 version = "0.5.10"

+ 2 - 0
Cargo.toml

@@ -42,5 +42,7 @@ csv = "1.3.1"
 # 异步轻量级mqtt
 mosquitto-rs = "0.11.2"
 openssl = { version = "0.10.71", features = ["vendored"] }
+# 多核并行处理
+rayon = "1.10.0"
 
 

+ 26 - 21
src/ems/pcs/pcs.rs

@@ -2,6 +2,7 @@ use crate::cmd::config::app_config;
 use crate::ems::pcs::pcs_conf;
 use crate::ems::{Consumer, Producer, Service};
 use crate::internal::modbus::code::ModbusCode;
+use crate::internal::modbus::slice_sequential;
 use crate::internal::utils;
 use anyhow::bail;
 use log::{error, info};
@@ -29,7 +30,6 @@ async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
     Ok(ctx)
 }
 
-
 impl Pcs {
     /// 初始化PCS
     pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
@@ -75,28 +75,31 @@ impl Pcs {
                     && it.code == Some(FunctionCode::ReadInputRegisters.value())
             })
             .collect();
-
-        if let Some(start_register) = input_registers.first() {
-            if let Some(addr) = start_register.addr {
-                let words = self
-                    .ctx
-                    .lock()
-                    .await
-                    .read_input_registers(addr, input_registers.len() as u16)
-                    .await??;
-                if words.len() == input_registers.len() {
-                    for (i, word2) in words.chunks(2).enumerate() {
-                        if let Some(tcp_code) = input_registers.get_mut(i) {
+        //连续子序列分组
+        let seq_slice = slice_sequential(input_registers.as_mut_slice());
+        //遍历分组
+        for codes in seq_slice.into_iter() {
+            if let Some(start_code) = codes.first() {
+                if let Some(start_addr) = start_code.addr {
+                    let words = self
+                        .ctx
+                        .lock()
+                        .await
+                        //读取输入寄存器
+                        .read_input_registers(start_addr, codes.len() as u16)
+                        .await??;
+                    if words.len() == codes.len() {
+                        for (i, word) in words.chunks(2).enumerate() {
                             //大端序
-                            tcp_code.data = Some([word2[0] as u8, word2[1] as u8]);
+                            codes[i].data = Some([word[0] as u8, word[1] as u8]);
                         }
+                    } else {
+                        bail!(
+                            "返回的数据长度不正确, register长度:{}, data长度:{}",
+                            input_registers.len(),
+                            words.len()
+                        )
                     }
-                } else {
-                    bail!(
-                        "返回的数据长度不正确, register长度:{}, data长度:{}",
-                        input_registers.len(),
-                        words.len()
-                    )
                 }
             }
         }
@@ -114,7 +117,9 @@ impl Service for Pcs {
             ))
             .await;
             match self.read_input_register().await {
-                Ok(_) => {}
+                Ok(_) => {
+
+                }
                 Err(e) => {
                     error!("{}", e.to_string());
                     //断线重连

+ 6 - 1
src/internal/modbus/code.rs

@@ -17,13 +17,18 @@ pub struct ModbusCode {
 
 impl ModbusCode {}
 
-
 impl BaseModbusCode for ModbusCode {
     fn addr(&self) -> Option<u16> {
         self.addr
     }
 }
 
+impl BaseModbusCode for &mut ModbusCode {
+    fn addr(&self) -> Option<u16> {
+        self.addr
+    }
+}
+
 impl TryFrom<PcsCsvData> for ModbusCode {
     type Error = anyhow::Error;
 

+ 44 - 20
src/internal/modbus/mod.rs

@@ -13,7 +13,36 @@ pub enum ModbusDataType {
     S16,
 }
 
-/// 将序列按照连续子序列分割出来
+/// 将序列按照连续子序列分组
+/// 如: [1,3,2,7,8,11,10] 分割为 [[1,2,3],[7,8],[10,11]]
+pub fn slice_sequential<T>(codes: &mut [T]) -> Vec<&mut [T]>
+where
+    T: BaseModbusCode,
+{
+    codes.sort_by(|a, b| a.addr().cmp(&b.addr()));
+    let mut result = Vec::new();
+    let mut remaining = codes;
+    while !remaining.is_empty() {
+        let mut len = 1;
+        while len < remaining.len() {
+            if let (Some(prev_addr), Some(curr_addr)) =
+                (remaining[len - 1].addr(), remaining[len].addr())
+            {
+                if prev_addr + 1 == curr_addr && len < MAX_REGISTER_COUNT {
+                    len += 1;
+                } else {
+                    break;
+                }
+            }
+        }
+        let (group, rest) = remaining.split_at_mut(len);
+        result.push(group);
+        remaining = rest;
+    }
+    result
+}
+
+/// 将序列按照连续子序列分组
 /// 如: [1,3,2,7,8,11,10] 分割为 [[1,2,3],[7,8],[10,11]]
 pub fn sequential<T: BaseModbusCode>(mut codes: Vec<T>) -> Vec<Vec<T>> {
     //按地址升序排序
@@ -50,39 +79,34 @@ pub fn sequential<T: BaseModbusCode>(mut codes: Vec<T>) -> Vec<Vec<T>> {
     vec
 }
 
+#[cfg(test)]
 mod test {
     use super::*;
 
-    #[derive(Debug)]
-    struct TestModbusCode {
-        addr: Option<u16>,
-    }
+    struct TestModbusCode(Option<u16>);
 
     impl BaseModbusCode for TestModbusCode {
         fn addr(&self) -> Option<u16> {
-            self.addr
+            self.0
         }
     }
 
     #[test]
     fn test_sequential() {
-        let base = TestModbusCode { addr: Some(1u16) };
-        let vec = sequential(vec![
+        let base = TestModbusCode { 0: Some(1u16) };
+        let mut codes = vec![
             base,
-            TestModbusCode { addr: Some(3u16) },
-            TestModbusCode { addr: Some(2u16) },
-            TestModbusCode { addr: Some(7u16) },
-            TestModbusCode { addr: Some(8u16) },
-            TestModbusCode { addr: Some(11u16) },
-            TestModbusCode { addr: Some(10u16) },
-        ]);
+            TestModbusCode { 0: Some(3u16) },
+            TestModbusCode { 0: Some(2u16) },
+            TestModbusCode { 0: Some(7u16) },
+            TestModbusCode { 0: Some(8u16) },
+            TestModbusCode { 0: Some(11u16) },
+            TestModbusCode { 0: Some(10u16) },
+        ];
+        let vec = slice_sequential(codes.as_mut_slice());
         let vec = vec
             .into_iter()
-            .map(|it| {
-                it.into_iter()
-                    .map(|i| i.addr.unwrap())
-                    .collect::<Vec<u16>>()
-            })
+            .map(|it| it.into_iter().map(|i| i.0.unwrap()).collect::<Vec<u16>>())
             .collect::<Vec<Vec<u16>>>();
         assert_eq!(vec, vec![vec![1, 2, 3], vec![7, 8], vec![10, 11],]);
     }