Kaynağa Gözat

【功能】ModbusTCP重连机制与高特BMS

mikasa 2 hafta önce
ebeveyn
işleme
cf43596a8a

+ 8 - 1
emu-config.yaml

@@ -6,7 +6,14 @@ emu:
     # 读取间隔, 单位ms
     interval: 5000
   bms:
-    name: 'vcan0'
+    # 读取间隔, 单位ms
+    interval: 5000
+    test_bms:
+      name: 'vcan0'
+    gold_bms:
+      host: '127.0.0.1'
+      port: 33434
+
 rabbitmq:
   host: '122.51.163.61'
   port: 5672

+ 13 - 0
src/cmd/config.rs

@@ -33,8 +33,21 @@ pub struct PcsConfig {
 
 #[derive(Serialize, Deserialize, Debug)]
 pub struct BmsConfig {
+    pub interval: u64,
+    pub test_bms: TestBms,
+    pub gold_bms: GoldBms,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct TestBms {
     pub name: String,
 }
+/// 高特BMS配置
+#[derive(Serialize, Deserialize, Debug)]
+pub struct GoldBms {
+    pub host: String,
+    pub port: u16,
+}
 
 #[derive(Serialize, Deserialize, Debug)]
 pub struct RabbitMQConfig {

+ 2 - 2
src/ems/bms/bms.rs

@@ -20,12 +20,12 @@ impl Bms {
         let id = utils::generate_random_str(12);
         utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "bms/bms.log").await;
         info!("BMS [{}] 初始化成功", id);
-        let can_socket = match CanSocket::open(app_config().emu.bms.name.as_str()) {
+        let can_socket = match CanSocket::open(app_config().emu.bms.test_bms.name.as_str()) {
             Ok(can) => can,
             Err(e) => {
                 return Err(anyhow!(
                     "can: {}打开失败: {}",
-                    app_config().emu.bms.name.as_str(),
+                    app_config().emu.bms.test_bms.name.as_str(),
                     e.to_string()
                 ));
             }

+ 77 - 0
src/ems/bms/gold_bms.rs

@@ -0,0 +1,77 @@
+use crate::cmd::config::app_config;
+use crate::connect_or_retry;
+use crate::ems::{Consumer, Producer, Service};
+use crate::internal::modbus::code::ModbusCode;
+use crate::internal::utils;
+use async_trait::async_trait;
+use log::{error, info};
+use std::cmp::min;
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::sync::Mutex;
+use tokio_modbus::client::Reader;
+
+///
+/// 高特BMS
+/// Modbus-TCP
+pub struct GoldBms {
+    pub id: String,
+    pub producer: Producer,
+    pub consumer: Arc<Mutex<Consumer>>,
+    pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
+    pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
+}
+
+impl GoldBms {
+    pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
+        let config = &app_config().emu.bms.gold_bms;
+        let id = utils::generate_random_str(12);
+        utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "bms/bms.log").await;
+        let context = connect_modbus_tcp().await?;
+        info!("BMS[{}]初始化成功!", id);
+        Ok(GoldBms {
+            id,
+            producer,
+            consumer: Arc::new(Mutex::new(consumer)),
+            modbus_code: Arc::new(Default::default()),
+            ctx: Arc::new(Mutex::new(context)),
+        })
+    }
+    async fn read_input_register(&self) -> anyhow::Result<()> {
+        self.ctx
+            .lock()
+            .await
+            .read_holding_registers(0, 10)
+            .await??;
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl Service for GoldBms {
+    async fn read_task(&self) {
+        let config = &app_config().emu.bms.gold_bms;
+        loop {
+            tokio::time::sleep(Duration::from_millis(app_config().emu.bms.interval)).await;
+            match self.read_input_register().await {
+                Ok(()) => {
+                    println!("读取数据成功!");
+                }
+                Err(e) => {
+                    println!("Modbus 读取失败: {:?},断开连接", e);
+                    let mut x = self.ctx.lock().await;
+                    *x = connect_modbus_tcp().await.unwrap();
+                }
+            }
+        }
+    }
+
+    async fn write_task(&self) {}
+}
+
+async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
+    let config = &app_config().emu.bms.gold_bms;
+    let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
+    connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "ModbusTCP")
+}

+ 1 - 0
src/ems/bms/mod.rs

@@ -1 +1,2 @@
 pub mod bms;
+pub mod gold_bms;

+ 16 - 15
src/ems/emu/emu.rs

@@ -1,5 +1,5 @@
 use crate::cmd::config::app_config;
-use crate::ems::bms::bms::Bms;
+use crate::ems::bms::gold_bms::GoldBms;
 use crate::ems::emu::device::{DevType, Device};
 use crate::ems::emu::BroadcastMessage;
 use crate::ems::pcs::pcs::Pcs;
@@ -11,6 +11,7 @@ use regex::Regex;
 use std::collections::HashMap;
 use std::sync::Arc;
 use tokio::sync::broadcast;
+use tokio::task::JoinHandle;
 use tokio::{join, sync};
 
 /// EMU 能量管理处理单元
@@ -37,7 +38,7 @@ impl Emu {
 
             //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 创建BMS
             let (bms_tx, bms_rx) = sync::mpsc::channel::<String>(8);
-            let bms = Bms::new(bms_tx, tx.subscribe());
+            let bms = GoldBms::new(bms_tx, tx.subscribe());
             //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
             let (pcs, bms) = join!(pcs, bms);
 
@@ -48,12 +49,12 @@ impl Emu {
                 );
             }
 
-            if let Ok(p) = pcs {
-                devices.insert(
-                    DevType::PCS,
-                    Device::new(p.id.clone(), "PCS", Arc::new(p), pcs_rx),
-                );
-            }
+            // if let Ok(p) = pcs {
+            //     devices.insert(
+            //         DevType::PCS,
+            //         Device::new(p.id.clone(), "PCS", Arc::new(p), pcs_rx),
+            //     );
+            // }
         }
         info!("EMU初始化完成");
         Emu {
@@ -64,15 +65,16 @@ impl Emu {
 
     pub async fn run(self: Arc<Self>) {
         let mut handles = Vec::new();
+        match self.mqtt().await {
+            Ok(h) => handles.push(h),
+            Err(e) => error!("MQTT 启动失败: {}", e),
+        }
         for dev in self.devices.values() {
             let device_clone = Arc::clone(dev);
             handles.push(tokio::spawn(async move {
                 device_clone.start().await;
             }))
         }
-        if let Err(e) = self.mqtt().await {
-            error!("MQTT 启动失败: {}", e);
-        }
         // let x = &self.devices;
         // let a = x.get(&"pcs".to_string()).unwrap().clone();
         // tokio::spawn(async move {
@@ -106,7 +108,7 @@ impl Emu {
         }
     }
 
-    async fn mqtt(&self) -> anyhow::Result<()> {
+    async fn mqtt(&self) -> anyhow::Result<JoinHandle<()>> {
         //mqtt Client
         let client = &mqtt_client().await?.mosq;
         let subscriptions = client.subscriber().unwrap();
@@ -118,7 +120,7 @@ impl Emu {
         let sender = self.tx.clone();
         let devices = Arc::clone(&self.devices);
         //异步处理mqtt推送的消息
-        tokio::spawn(async move {
+        Ok(tokio::spawn(async move {
             loop {
                 if let Ok(message) = subscriptions.recv().await {
                     match message {
@@ -136,8 +138,7 @@ impl Emu {
                     }
                 }
             }
-        });
-        Ok(())
+        }))
     }
 }
 

+ 31 - 1
src/ems/emu/mod.rs

@@ -1,6 +1,6 @@
+use serde_json::Number;
 use std::fmt::{Display, Formatter};
 use std::sync::Arc;
-use serde_json::Number;
 
 pub mod device;
 pub mod emu;
@@ -23,3 +23,33 @@ impl Display for BroadcastMessage {
         }
     }
 }
+
+/// ModbusTCP开启连接,如果失败重试
+/// 初始等待重连时间1秒,每次递增,最大60秒
+#[macro_export]
+macro_rules! connect_or_retry {
+    ($connect_expr:expr,$label:expr) => {{
+        use std::cmp::min;
+        use tokio::time::{sleep, Duration};
+        let mut retry_delay = Duration::from_secs(1);
+        let max_delay = Duration::from_secs(60);
+        loop {
+            match $connect_expr.await {
+                Ok(conn) => {
+                    log::info!("[{}] 连接成功!", $label);
+                    break Ok(conn);
+                }
+                Err(e) => {
+                    log::error!(
+                        "[{}] 连接失败: {}, 将在 {:?} 后重试...",
+                        $label,
+                        e,
+                        retry_delay
+                    );
+                    sleep(retry_delay).await;
+                    retry_delay = min(retry_delay * 2, max_delay);
+                }
+            }
+        }
+    }};
+}

+ 1 - 0
src/ems/mod.rs

@@ -4,6 +4,7 @@ use tokio::join;
 pub mod bms;
 pub mod emu;
 pub mod pcs;
+pub mod tms;
 
 pub type Producer = tokio::sync::mpsc::Sender<String>;
 pub type Consumer = tokio::sync::broadcast::Receiver<BroadcastMessage>;

+ 1 - 1
src/ems/pcs/mod.rs

@@ -19,7 +19,7 @@ pub(crate) struct PcsCsvData {
 
 /// pcs config
 pub(crate) fn pcs_conf() -> anyhow::Result<Vec<ModbusCode>> {
-    let mut rdr = csv::Reader::from_path("config/pcs.csv")?;
+    let mut rdr = csv::Reader::from_path("./config/pcs.csv")?;
     let mut vec = Vec::new();
     for result in rdr.deserialize() {
         let csv_data: PcsCsvData = result?;

+ 0 - 0
src/ems/tms/mod.rs