浏览代码

【功能】BMS接收南向MQTT并报文下发

mikasa 1 周之前
父节点
当前提交
3b6335eb8c
共有 5 个文件被更改,包括 135 次插入92 次删除
  1. 1 1
      config/gold_bms.csv
  2. 48 8
      src/ems/bms/gold/gold_bms.rs
  3. 27 63
      src/ems/emu/emu.rs
  4. 3 11
      src/ems/emu/mod.rs
  5. 56 9
      src/internal/modbus/code.rs

+ 1 - 1
config/gold_bms.csv

@@ -215,7 +215,7 @@ address,code,name,type,rw,factor,offset,key
 91,2,极柱温差大 3 级报警阈值,u16,rw,0.1,-40,packBusbarDiffTempHighAlarmThd
 92,2,极柱温差大报警回差值,u16,rw,0.1,0,packBusbarDiffTempHighFaultReturn Diff
 99,2,控制指令模式,u16,rw,1,,containerControMode
-100,2,上下电控制指令,u16,rw,1,,powerOnOffControl 
+100,2,上下电控制指令,u16,rw,1,,powerOnOffControl
 101,2,DO 控制,u16,rw,1,,doControl
 102,2,风扇启动温度,u16,rw,1,,
 103,2,风扇关闭温度,u16,rw,1,,

+ 48 - 8
src/ems/bms/gold/gold_bms.rs

@@ -1,6 +1,6 @@
 use crate::cmd::config::app_config;
 use crate::ems::{Consumer, Producer, Service};
-use crate::internal::modbus::code::{ModbusCode, RegisterType, Rw};
+use crate::internal::modbus::code::{ModbusCode, ModbusReqCode, RegisterType, Rw};
 use crate::internal::modbus::{read_csv_to_code, slice_sequential, MAX_BIT_CNT, MAX_WORD_CNT};
 use crate::internal::utils;
 use crate::internal::utils::mqtt::mqtt_client;
@@ -12,17 +12,16 @@ use mosquitto_rs::QoS;
 use std::net::SocketAddr;
 use std::sync::Arc;
 use std::time::Duration;
-use tokio::sync::Mutex;
-use tokio::time;
+use tokio::sync::{Mutex, MutexGuard};
 use tokio::time::Instant;
-use tokio_modbus::client::Reader;
+use tokio_modbus::client::{Reader, Writer};
 
 ///
 /// 高特BMS
 /// Modbus-TCP
 pub struct GoldBms {
     pub id: String,
-    pub producer: Producer,
+    pub _producer: Producer,
     pub consumer: Arc<Mutex<Consumer>>,
     pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
     pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
@@ -42,7 +41,7 @@ impl GoldBms {
         info!("BMS[{}]初始化成功!", id);
         Ok(GoldBms {
             id,
-            producer,
+            _producer: producer,
             consumer: Arc::new(Mutex::new(consumer)),
             modbus_code: Arc::new(Mutex::new(config)),
             ctx: Arc::new(Mutex::new(ctx)),
@@ -83,9 +82,9 @@ impl GoldBms {
 impl Service for GoldBms {
     async fn south(&self) {
         let code_clone = self.modbus_code.clone();
-        let mut modbus_guard = code_clone.lock().await;
         loop {
             tokio::time::sleep(Duration::from_millis(app_config().emu.bms.interval)).await;
+            let mut modbus_guard = code_clone.lock().await;
             let time = Instant::now();
             if let Err(e) = self
                 .read_discrete_input(
@@ -130,7 +129,29 @@ impl Service for GoldBms {
         }
     }
 
-    async fn north(&self) {}
+    async fn north(&self) {
+        let consumer = self.consumer.clone();
+        loop {
+            let ctx = self.ctx.clone();
+            let modbus_code_clone = self.modbus_code.clone();
+            if let Ok(msg) = consumer.lock().await.recv().await {
+                if msg.id == self.id {
+                    info!("[BMS]接收到指令: {}", msg);
+                    let instant = Instant::now();
+                    let modbus_code = modbus_code_clone.lock().await;
+                    let mut req_code = ModbusReqCode::build(modbus_code.as_slice(), &msg);
+                    let vec = slice_sequential(req_code.as_mut_slice(), MAX_WORD_CNT);
+                    let context_guard = ctx.lock().await;
+                    match down_link(vec, context_guard).await {
+                        Ok(()) => info!("数据下行成功, 耗时 {:?}", instant.elapsed()),
+                        Err(e) => {
+                            error!("数据下行失败:{}", e)
+                        }
+                    }
+                }
+            }
+        }
+    }
 }
 
 async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
@@ -150,3 +171,22 @@ async fn handle_modbus_error(e: &Error, ctx: &Mutex<tokio_modbus::client::Contex
         error!("{log_str}: {}", e);
     }
 }
+
+async fn down_link(
+    req_codes: Vec<&mut [ModbusReqCode<'_>]>,
+    mut context_guard: MutexGuard<'_, tokio_modbus::client::Context>,
+) -> anyhow::Result<()> {
+    for codes in req_codes.into_iter() {
+        if codes.len() > 1 {
+            let words: Vec<_> = codes.iter().map(|it| it.trans_val()).collect();
+            context_guard
+                .write_multiple_registers(codes[0].addr, words.as_slice())
+                .await??;
+        } else {
+            context_guard
+                .write_single_register(codes[0].addr, codes[0].trans_val())
+                .await??
+        }
+    }
+    Ok(())
+}

+ 27 - 63
src/ems/emu/emu.rs

@@ -75,28 +75,6 @@ impl Emu {
                 device_clone.start().await;
             }))
         }
-        // let x = &self.devices;
-        // let a = x.get(&"pcs".to_string()).unwrap().clone();
-        // tokio::spawn(async move {
-        //     loop {
-        //         let str = a.channel.lock().await.recv().await.unwrap();
-        //         info!("[PCS] {}", str)
-        //     }
-        // });
-        // let b = x.get(&"bms".to_string()).unwrap().clone();
-        // tokio::spawn(async move {
-        //     loop {
-        //         let str = b.channel.lock().await.recv().await.unwrap();
-        //         info!("[BMS] {}", str)
-        //     }
-        // });
-        // let sender = self.tx.clone();
-        // tokio::spawn(async move {
-        //     loop {
-        //         sender.send(String::from("BMS发出了信号")).unwrap();
-        //         tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
-        //     }
-        // });
         for handler in handles {
             match handler.await {
                 Ok(_) => {}
@@ -147,51 +125,37 @@ async fn mqtt_msg_handler(
     sender: &broadcast::Sender<BroadcastMessage>,
     devices: &Arc<HashMap<DevType, Arc<Device>>>,
 ) -> anyhow::Result<()> {
-    use serde_json::Value;
     let json = String::from_utf8(msg.payload)?;
-    let v: Value = serde_json::from_str(json.as_str())?;
-    //{"key":"open_pcs","value":12}
-    if let (Some(k), Some(v)) = (v.get("key"), v.get("value")) {
-        //key must string, value must number
-        if k.is_string() && v.is_number() {
-            if let (Some(key), Some(value)) = (k.as_str(), v.as_number()) {
-                //topic 必须匹配 /asw/${dev}/${index}/setyt
-                if let Some((dev, _index)) = extract_with_regex(msg.topic.as_str()) {
-                    //广播给设备带上dev_id
-                    let id = match dev {
-                        "bms" => {
-                            let bms = devices
-                                .get(&DevType::BMS)
-                                .ok_or(anyhow::anyhow!("设备容器中找不到BMS"))?;
-                            Some(bms.id.as_str())
-                        }
-                        "pcs" => {
-                            let pcs = devices
-                                .get(&DevType::PCS)
-                                .ok_or(anyhow::anyhow!("设备容器中找不到PCS"))?;
-                            Some(pcs.id.as_str())
-                        }
-                        other => {
-                            error!("[mqtt] 目前没有<{}>这个设备", other);
-                            None
-                        }
-                    };
-                    if let Some(id) = id {
-                        sender.send(BroadcastMessage {
-                            id: id.to_string(),
-                            key: key.to_string(),
-                            value: Arc::new(value.clone()),
-                        })?;
-                    }
-                } else {
-                    error!("[mqtt] topic解析错误: {}", msg.topic)
-                }
+    let v: HashMap<String, f32> = serde_json::from_str(json.as_str())?;
+    //topic 必须匹配 /asw/${dev}/${index}/setyt
+    if let Some((dev, _index)) = extract_with_regex(msg.topic.as_str()) {
+        //广播给设备带上dev_id
+        let id = match dev {
+            "bms" => {
+                let bms = devices
+                    .get(&DevType::BMS)
+                    .ok_or(anyhow::anyhow!("设备容器中找不到BMS"))?;
+                Some(bms.id.as_str())
+            }
+            "pcs" => {
+                let pcs = devices
+                    .get(&DevType::PCS)
+                    .ok_or(anyhow::anyhow!("设备容器中找不到PCS"))?;
+                Some(pcs.id.as_str())
+            }
+            other => {
+                error!("[mqtt] 目前没有<{}>这个设备", other);
+                None
             }
-        } else {
-            error!("[mqtt] JSON解析错误, key is {}, value is {}", k, v)
+        };
+        if let Some(id) = id {
+            sender.send(BroadcastMessage {
+                id: id.to_string(),
+                value: v,
+            })?;
         }
     } else {
-        error!("[mqtt] JSON解析错误")
+        error!("[mqtt] topic解析错误: {}", msg.topic)
     }
     Ok(())
 }

+ 3 - 11
src/ems/emu/mod.rs

@@ -1,6 +1,5 @@
-use serde_json::Number;
+use std::collections::HashMap;
 use std::fmt::{Display, Formatter};
-use std::sync::Arc;
 
 pub mod device;
 pub mod emu;
@@ -8,19 +7,12 @@ pub mod emu;
 #[derive(Clone, Debug)]
 pub struct BroadcastMessage {
     pub id: String,
-    pub key: String,
-    pub value: Arc<dyn std::any::Any + Send + Sync>,
+    pub value: HashMap<String, f32>,
 }
 
 impl Display for BroadcastMessage {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        if let Some(val) = self.value.downcast_ref::<Number>() {
-            write!(f, "({}, {})", self.key, val)
-        } else if let Some(val) = self.value.downcast_ref::<String>() {
-            write!(f, "({}, {})", self.key, val)
-        } else {
-            write!(f, "({}, {:?})", self.key, self.value.type_id())
-        }
+        write!(f, "{}: {:?}", self.id, self.value)
     }
 }
 

+ 56 - 9
src/internal/modbus/code.rs

@@ -1,3 +1,4 @@
+use crate::ems::emu::BroadcastMessage;
 use crate::internal::modbus::{BaseModbusCode, CsvData};
 use anyhow::anyhow;
 use serde_json::{Number, Value};
@@ -5,14 +6,10 @@ use serde_json::{Number, Value};
 /// 寄存器类型
 #[derive(Debug, Eq, PartialEq)]
 pub enum RegisterType {
-    //线圈寄存器 0
-    Coil,
-    //离散输入寄存器 1
-    DiscreteInput,
-    //保持寄存器 2
-    HoldingRegister,
-    //输入寄存器 3
-    InputRegister,
+    Coil,            //线圈寄存器 0
+    DiscreteInput,   //离散输入寄存器 1
+    HoldingRegister, //保持寄存器 2
+    InputRegister,   //输入寄存器 3
 }
 
 impl TryFrom<u8> for RegisterType {
@@ -84,7 +81,6 @@ pub struct ModbusCode {
 }
 
 impl ModbusCode {
-
     /// 计算rawValue = data * factor + offset
     pub fn json_value(&self) -> Option<Value> {
         if self.data_type.is_some() && self.data.is_some() {
@@ -165,3 +161,54 @@ impl TryFrom<CsvData> for ModbusCode {
         })
     }
 }
+
+///modbus下发的值
+#[derive(Debug)]
+pub struct ModbusReqCode<'a> {
+    pub key: &'a str,
+    pub addr: u16,
+    pub value: f32,
+    pub factor: &'a Option<f32>,
+    pub offset: &'a Option<i16>,
+}
+
+impl BaseModbusCode for ModbusReqCode<'_> {
+    fn addr(&self) -> u16 {
+        self.addr
+    }
+}
+
+impl ModbusReqCode<'_> {
+    pub fn build<'a>(
+        modbus_code: &'a [ModbusCode],
+        msg: &'a BroadcastMessage,
+    ) -> Vec<ModbusReqCode<'a>> {
+        let mut requests = vec![];
+        let codes: Vec<_> = modbus_code
+            .iter()
+            .filter(|code| code.code == RegisterType::HoldingRegister)
+            .collect();
+        for code in codes.iter() {
+            if code.key.is_some() {
+                let msg_key = code.key.as_ref().unwrap().as_str();
+                for (k, v) in msg.value.iter() {
+                    if msg_key == k {
+                        requests.push(ModbusReqCode {
+                            key: k,
+                            addr: code.addr,
+                            value: *v,
+                            factor: &code.factor,
+                            offset: &code.offset,
+                        })
+                    }
+                }
+            }
+        }
+        requests
+    }
+
+    pub fn trans_val(&self) -> u16 {
+        let v = (self.value - self.offset.unwrap_or(0) as f32) / self.factor.unwrap_or(1f32);
+        v as u16
+    }
+}