Selaa lähdekoodia

【功能】BMS点位封装为json并推到MQTT

mikasa 1 viikko sitten
vanhempi
sitoutus
04bf38f817
5 muutettua tiedostoa jossa 148 lisäystä ja 93 poistoa
  1. 3 4
      emu-config.yaml
  2. 31 72
      src/ems/bms/gold/gold_bms.rs
  3. 35 0
      src/ems/mod.rs
  4. 43 16
      src/internal/modbus/code.rs
  5. 36 1
      src/internal/modbus/mod.rs

+ 3 - 4
emu-config.yaml

@@ -20,12 +20,11 @@ rabbitmq:
   username: 'guest'
   password: 'yuanan520.'
 mqtt:
-  host: '122.51.163.61'
+  host: '8.153.70.217'
   port: 1883
-  username: 'inpower'
-  password: 'inpower@123'
+  username: 'root'
+  password: 'Jjj156428!'
   topic:
-    - 'test'
     - '/asw/+/+/setyt'
 log:
   path: 'logs'

+ 31 - 72
src/ems/bms/gold/gold_bms.rs

@@ -1,16 +1,20 @@
 use crate::cmd::config::app_config;
-use crate::connect_or_retry;
 use crate::ems::{Consumer, Producer, Service};
-use crate::internal::modbus::code::{ModbusCode, ModbusData, RegisterType, Rw};
+use crate::internal::modbus::code::{ModbusCode, RegisterType, Rw};
 use crate::internal::modbus::{read_csv_to_code, slice_sequential, MAX_BIT_CNT, MAX_WORD_CNT};
 use crate::internal::utils;
+use crate::internal::utils::mqtt::mqtt_client;
+use crate::{connect_or_retry, publish_data, read_registers_tcp};
 use anyhow::{bail, Error};
 use async_trait::async_trait;
 use log::{error, info};
+use mosquitto_rs::QoS;
 use std::net::SocketAddr;
 use std::sync::Arc;
 use std::time::Duration;
 use tokio::sync::Mutex;
+use tokio::time;
+use tokio::time::Instant;
 use tokio_modbus::client::Reader;
 
 ///
@@ -28,10 +32,12 @@ impl GoldBms {
     pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
         let id = utils::generate_random_str(12);
         utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "bms/bms.log").await;
+        info!("开始初始化BMS[{}]...", id);
         let config = read_csv_to_code("./config/gold_bms.csv").map_err(|e| {
             error!("协议生成失败:{}", e);
             anyhow::anyhow!(e)
         })?;
+        info!("成功加载BMS协议配置,共{}个寄存器", config.len());
         let ctx = connect_modbus_tcp().await?;
         info!("BMS[{}]初始化成功!", id);
         Ok(GoldBms {
@@ -45,81 +51,30 @@ impl GoldBms {
 
     //读取输入寄存器
     async fn read_input_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
-        let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
-        for codes in seq_slice.into_iter() {
-            if let Some(start_code) = codes.first() {
-                let words = self
-                    .ctx
-                    .lock()
-                    .await
-                    .read_input_registers(start_code.addr, codes.len() as u16)
-                    .await??;
-                if words.len() == codes.len() {
-                    words.iter().enumerate().for_each(|(i, x)| {
-                        codes[i].data = Some(ModbusData::Word(*x));
-                    });
-                } else {
-                    bail!(
-                        "返回的数据长度不正确, register长度:{}, data长度:{}",
-                        vec.len(),
-                        words.len()
-                    )
-                }
-            }
-        }
-        Ok(())
+        read_registers_tcp!(self, vec, MAX_WORD_CNT, read_input_registers, |x| x)
     }
 
     //读取离散输入寄存器
     async fn read_discrete_input(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
-        let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_BIT_CNT);
-        for codes in seq_slice.into_iter() {
-            if let Some(start_code) = codes.first() {
-                let words = self
-                    .ctx
-                    .lock()
-                    .await
-                    .read_discrete_inputs(start_code.addr, codes.len() as u16)
-                    .await??;
-                if words.len() == codes.len() {
-                    for (i, x) in words.into_iter().enumerate() {
-                        codes[i].data = Some(ModbusData::Bit(x))
-                    }
-                } else {
-                    bail!(
-                        "返回的数据长度不正确, register长度:{}, data长度:{}",
-                        vec.len(),
-                        words.len()
-                    )
-                }
-            }
-        }
-        Ok(())
+        read_registers_tcp!(self, vec, MAX_BIT_CNT, read_discrete_inputs, |x: bool| x
+            as u16)
     }
 
+    //读取保持寄存器
     async fn read_holding_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
-        let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
-        for codes in seq_slice.into_iter() {
-            if let Some(start_code) = codes.first() {
-                let words = self
-                    .ctx
-                    .lock()
-                    .await
-                    .read_holding_registers(start_code.addr, codes.len() as u16)
-                    .await??;
-                if words.len() == codes.len() {
-                    words.iter().enumerate().for_each(|(i, x)| {
-                        codes[i].data = Some(ModbusData::Word(*x));
-                    });
-                } else {
-                    bail!(
-                        "返回的数据长度不正确, register长度:{}, data长度:{}",
-                        vec.len(),
-                        words.len()
-                    )
-                }
-            }
-        }
+        read_registers_tcp!(self, vec, MAX_WORD_CNT, read_holding_registers, |x| x)
+    }
+
+    //推送消息
+    async fn push(&self, vec: &[ModbusCode], time: Instant) -> anyhow::Result<()> {
+        let client = &mqtt_client().await?.mosq;
+        // 发布遥测数据 (YC)
+        publish_data!(client, vec, "/pds/bms/0/yc", RegisterType::InputRegister);
+        // 发布遥信数据 (YX)
+        publish_data!(client, vec, "/pds/bms/0/yx", RegisterType::DiscreteInput);
+        // 发布遥调数据 (YT)
+        publish_data!(client, vec, "/pds/bms/0/yt", RegisterType::HoldingRegister);
+        info!("数据上行成功, 耗时 {:?}", time.elapsed());
         Ok(())
     }
 }
@@ -127,10 +82,11 @@ impl GoldBms {
 #[async_trait]
 impl Service for GoldBms {
     async fn south(&self) {
+        let code_clone = self.modbus_code.clone();
+        let mut modbus_guard = code_clone.lock().await;
         loop {
             tokio::time::sleep(Duration::from_millis(app_config().emu.bms.interval)).await;
-            let code_clone = self.modbus_code.clone();
-            let mut modbus_guard = code_clone.lock().await;
+            let time = Instant::now();
             if let Err(e) = self
                 .read_discrete_input(
                     modbus_guard
@@ -168,6 +124,9 @@ impl Service for GoldBms {
             {
                 handle_modbus_error(&e, &self.ctx, "读取保持寄存器失败").await;
             }
+            if let Err(e) = &self.push(modbus_guard.as_slice(), time).await {
+                error!("推送消息失败: {}", e);
+            }
         }
     }
 

+ 35 - 0
src/ems/mod.rs

@@ -1,4 +1,5 @@
 use crate::ems::emu::BroadcastMessage;
+use serde::{Deserialize, Serialize};
 use tokio::join;
 
 pub mod bms;
@@ -20,3 +21,37 @@ pub trait Service: Send + Sync {
 
     async fn north(&self);
 }
+
+#[derive(Serialize, Deserialize)]
+pub(crate) struct MyPayload {
+    pub key: String,
+    pub value: Option<serde_json::Value>,
+}
+
+///
+/// 推送消息的宏
+#[macro_export]
+macro_rules! publish_data {
+    ($client:expr, $vec:expr, $topic:expr, $code_type:path) => {{
+        let payload: Vec<_> = $vec
+            .iter()
+            .filter(|it| it.code == $code_type && it.key.is_some())
+            .map(|it| crate::ems::MyPayload {
+                key: it.key.clone().unwrap(),
+                value: it.json_value(),
+            })
+            .collect();
+
+        if !payload.is_empty() {
+            $client
+                .publish(
+                    $topic,
+                    serde_json::to_string(&payload)?,
+                    QoS::AtMostOnce,
+                    false,
+                )
+                .await?;
+        }
+    }};
+}
+

+ 43 - 16
src/internal/modbus/code.rs

@@ -1,5 +1,6 @@
 use crate::internal::modbus::{BaseModbusCode, CsvData};
 use anyhow::anyhow;
+use serde_json::{Number, Value};
 
 /// 寄存器类型
 #[derive(Debug, Eq, PartialEq)]
@@ -34,10 +35,6 @@ pub enum ModbusDataType {
     Bit,
     U16,
     I16,
-    U32,
-    I32,
-    F32,
-    F64,
 }
 
 impl TryFrom<String> for ModbusDataType {
@@ -48,10 +45,6 @@ impl TryFrom<String> for ModbusDataType {
             "bit" => Ok(ModbusDataType::Bit),
             "u16" => Ok(ModbusDataType::U16),
             "i16" => Ok(ModbusDataType::I16),
-            "u32" => Ok(ModbusDataType::U32),
-            "i32" => Ok(ModbusDataType::I32),
-            "f32" => Ok(ModbusDataType::F32),
-            "f64" => Ok(ModbusDataType::F64),
             _ => anyhow::bail!("无效的数据类型!"),
         }
     }
@@ -77,25 +70,57 @@ impl TryFrom<String> for Rw {
     }
 }
 
-#[derive(Debug)]
-pub enum ModbusData {
-    Word(u16),
-    Bit(bool),
-}
-
 #[derive(Debug)]
 pub struct ModbusCode {
     pub addr: u16,
     pub code: RegisterType,
-    pub data: Option<ModbusData>,
+    pub data: Option<u16>,
     pub name: String,
     pub data_type: Option<ModbusDataType>,
     pub factor: Option<f32>,
     pub rw: Option<Rw>,
     pub offset: Option<i16>,
+    pub key: Option<String>,
 }
 
-impl ModbusCode {}
+impl ModbusCode {
+
+    /// 计算rawValue = data * factor + offset
+    pub fn json_value(&self) -> Option<Value> {
+        if self.data_type.is_some() && self.data.is_some() {
+            let data_type = self.data_type.as_ref()?;
+            let data = *self.data.as_ref()? as f32;
+            let factor = *self.factor.as_ref()?;
+            //真值 = 通讯数据 * 分辨率 + 偏移量
+            let mut raw_data = data * factor;
+            // 处理 offset(如果有)
+            if let Some(offset) = self.offset.as_ref() {
+                raw_data += *offset as f32;
+            }
+            // 计算最终值并四舍五入
+            let raw_data = (raw_data / factor).round() * factor;
+            match data_type {
+                ModbusDataType::Bit => Some(Value::Number(Number::from(data as u8))),
+                _ => {
+                    if factor == 1f32 {
+                        // factor=1 时直接返回整数(不带小数位)
+                        Some(Value::Number(Number::from(raw_data as i64)))
+                    } else {
+                        // 计算小数位数(如 factor=0.1 → 1位小数)
+                        let decimal_places = (-factor.log10()).round() as usize;
+                        // 格式化字符串,确保正确的小数位数
+                        let formatted = format!("{0:.1$}", raw_data, decimal_places);
+                        // 解析回 f64 并转为 Number
+                        let number = formatted.parse::<f64>().ok().and_then(Number::from_f64)?;
+                        Some(Value::Number(number))
+                    }
+                }
+            }
+        } else {
+            None
+        }
+    }
+}
 
 impl BaseModbusCode for ModbusCode {
     fn addr(&self) -> u16 {
@@ -126,6 +151,7 @@ impl TryFrom<CsvData> for ModbusCode {
         };
         let offset = value.offset;
         let factor = value.factor;
+        let key = value.key;
         Ok(ModbusCode {
             addr,
             code,
@@ -135,6 +161,7 @@ impl TryFrom<CsvData> for ModbusCode {
             factor,
             rw,
             offset,
+            key,
         })
     }
 }

+ 36 - 1
src/internal/modbus/mod.rs

@@ -19,6 +19,7 @@ pub struct CsvData {
     pub rw: Option<String>,
     pub factor: Option<f32>,
     pub offset: Option<i16>,
+    pub key: Option<String>,
 }
 
 /// 从路径读取csv文档并解析为集合
@@ -127,7 +128,7 @@ mod test {
             TestModbusCode { 0: 11u16 },
             TestModbusCode { 0: 10u16 },
         ];
-        let vec = slice_sequential(codes.as_mut_slice());
+        let vec = slice_sequential(codes.as_mut_slice(), MAX_WORD_CNT);
         let vec = vec
             .into_iter()
             .map(|it| it.into_iter().map(|i| i.0).collect::<Vec<u16>>())
@@ -135,3 +136,37 @@ mod test {
         assert_eq!(vec, vec![vec![1, 2, 3], vec![7, 8], vec![10, 11],]);
     }
 }
+
+///
+/// modbus_tcp 读取寄存器的宏
+///
+#[macro_export]
+macro_rules! read_registers_tcp {
+    (
+        $self:ident, $vec:ident, $max_cnt:expr, $read_fn:ident, $convert:expr
+    ) => {{
+        let seq_slice = slice_sequential($vec.as_mut_slice(), $max_cnt);
+        for codes in seq_slice.into_iter() {
+            if let Some(start_code) = codes.first() {
+                let words = $self
+                    .ctx
+                    .lock()
+                    .await
+                    .$read_fn(start_code.addr, codes.len() as u16)
+                    .await??;
+                if words.len() == codes.len() {
+                    for (i, x) in words.into_iter().enumerate() {
+                        codes[i].data = Some($convert(x));
+                    }
+                } else {
+                    bail!(
+                        "返回的数据长度不正确, register长度:{}, data长度:{}",
+                        $vec.len(),
+                        words.len()
+                    )
+                }
+            }
+        }
+        Ok(())
+    }};
+}