Browse Source

【功能】MQTT根据topic分配给dev

mikasa 3 weeks ago
parent
commit
ed3f5fa0d8
10 changed files with 198 additions and 99 deletions
  1. 2 21
      Cargo.lock
  2. 6 3
      Cargo.toml
  3. 5 3
      emu-config.yaml
  4. 3 6
      src/cmd/config.rs
  5. 40 2
      src/ems/emu/device.rs
  6. 105 51
      src/ems/emu/emu.rs
  7. 23 0
      src/ems/emu/mod.rs
  8. 2 1
      src/ems/mod.rs
  9. 7 7
      src/ems/pcs/pcs.rs
  10. 5 5
      src/main.rs

+ 2 - 21
Cargo.lock

@@ -616,11 +616,12 @@ dependencies = [
  "openssl",
  "rand",
  "rayon",
+ "regex",
  "serde",
+ "serde_json",
  "serde_yml",
  "socketcan",
  "thiserror 2.0.12",
- "tikv-jemallocator",
  "tklog",
  "tokio",
  "tokio-cron-scheduler",
@@ -1337,26 +1338,6 @@ dependencies = [
  "syn 2.0.100",
 ]
 
-[[package]]
-name = "tikv-jemalloc-sys"
-version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
-dependencies = [
- "cc",
- "libc",
-]
-
-[[package]]
-name = "tikv-jemallocator"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
-dependencies = [
- "libc",
- "tikv-jemalloc-sys",
-]
-
 [[package]]
 name = "tklog"
 version = "0.2.9"

+ 6 - 3
Cargo.toml

@@ -44,8 +44,11 @@ mosquitto-rs = "0.11.2"
 openssl = { version = "0.10.71", features = ["vendored"] }
 # 多核并行处理
 rayon = "1.10.0"
-# 更好的内存分配器
-[target.'cfg(not(target_env = "msvc"))'.dependencies]
-tikv-jemallocator = "0.6.0"
+# json处理
+serde_json = "1.0.140"
+regex = "1.11.1"
+## 更好的内存分配器
+#[target.'cfg(not(target_env = "msvc"))'.dependencies]
+#tikv-jemallocator = "0.6.0"
 
 

+ 5 - 3
emu-config.yaml

@@ -3,11 +3,10 @@ emu:
   pcs:
     host: '127.0.0.1'
     port: 33434
+    # 读取间隔, 单位ms
+    interval: 5000
   bms:
     name: 'vcan0'
-  ems:
-    # 读取间隔, 单位ms
-    read_interval: 5000
 rabbitmq:
   host: '122.51.163.61'
   port: 5672
@@ -18,5 +17,8 @@ mqtt:
   port: 1883
   username: 'inpower'
   password: 'inpower@123'
+  topic:
+    - 'test'
+    - '/asw/+/+/setyt'
 log:
   path: 'logs'

+ 3 - 6
src/cmd/config.rs

@@ -21,7 +21,6 @@ pub struct AppConfig {
 pub struct EmuConfig {
     pub ver: String,
     pub pcs: PcsConfig,
-    pub ems: EmsConfig,
     pub bms: BmsConfig,
 }
 
@@ -29,12 +28,9 @@ pub struct EmuConfig {
 pub struct PcsConfig {
     pub host: String,
     pub port: u16,
+    pub interval: u64,
 }
-#[derive(Serialize, Deserialize, Debug)]
-pub struct EmsConfig {
-    //读取间隔, 单位ms
-    pub read_interval: u64,
-}
+
 #[derive(Serialize, Deserialize, Debug)]
 pub struct BmsConfig {
     pub name: String,
@@ -58,4 +54,5 @@ pub struct MqttConfig {
     pub port: u16,
     pub username: String,
     pub password: String,
+    pub topic: Vec<String>,
 }

+ 40 - 2
src/ems/emu/device.rs

@@ -1,4 +1,42 @@
-pub enum DeviceType {
+use crate::ems::Service;
+use std::sync::Arc;
+use tokio::sync::{mpsc, Mutex};
+
+/// 设备的类型
+#[derive(Eq, PartialEq, Hash)]
+pub enum DevType {
     PCS,
-    EMS,
+    BMS,
+}
+
+/// 抽象设备结构
+/// id: 设备唯一ID
+/// name: 设备名称
+/// service: 设备的服务
+/// channel: 广播接收器
+pub struct Device {
+    pub id: String,
+    pub name: String,
+    pub service: Arc<dyn Service>,
+    pub channel: Arc<Mutex<mpsc::Receiver<String>>>,
+}
+impl Device {
+    /// 创建设备
+    pub fn new(
+        id: String,
+        name: &str,
+        service: Arc<dyn Service>,
+        channel: mpsc::Receiver<String>,
+    ) -> Arc<Self> {
+        Arc::new(Self {
+            id,
+            name: name.to_string(),
+            service,
+            channel: Arc::new(Mutex::new(channel)),
+        })
+    }
+
+    pub async fn start(self: Arc<Self>) {
+        self.service.serve().await;
+    }
 }

+ 105 - 51
src/ems/emu/emu.rs

@@ -1,33 +1,24 @@
+use crate::cmd::config::app_config;
 use crate::ems::bms::bms::Bms;
+use crate::ems::emu::device::{DevType, Device};
+use crate::ems::emu::BroadcastMessage;
 use crate::ems::pcs::pcs::Pcs;
-use crate::ems::Service;
 use crate::internal::utils;
 use crate::internal::utils::mqtt::mqtt_client;
-use log::info;
-use mosquitto_rs::{Event, QoS};
+use log::{error, info};
+use mosquitto_rs::{Event, Message, QoS};
+use regex::Regex;
 use std::collections::HashMap;
 use std::sync::Arc;
-use tokio::sync::Mutex;
+use tokio::sync::broadcast;
 use tokio::{join, sync};
 
-/// 抽象设备结构
-/// id: 设备唯一ID
-/// name: 设备名称
-/// service: 设备的服务
-/// channel: 广播接收器
-pub struct Device {
-    pub id: String,
-    pub name: String,
-    pub service: Arc<dyn Service>,
-    pub channel: Arc<Mutex<sync::mpsc::Receiver<String>>>,
-}
-
 /// EMU 能量管理处理单元
 /// devices: 设备集合
 /// tx: 广播发生器
 pub struct Emu {
-    pub devices: HashMap<String, Arc<Device>>,
-    pub tx: sync::broadcast::Sender<String>,
+    pub devices: Arc<HashMap<DevType, Arc<Device>>>,
+    pub tx: broadcast::Sender<BroadcastMessage>,
 }
 
 impl Emu {
@@ -36,7 +27,7 @@ impl Emu {
         //初始化日志
         utils::log::init_log("inpower_iot_mgc_rs::ems::emu::*", "emu/emu.log").await;
         //EMU消息广播器
-        let (tx, _) = sync::broadcast::channel::<String>(8);
+        let (tx, _) = broadcast::channel::<BroadcastMessage>(8);
         let mut devices = HashMap::new();
         {
             //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 创建PCS
@@ -51,44 +42,37 @@ impl Emu {
             let (pcs, bms) = join!(pcs, bms);
 
             if let Ok(b) = bms {
-                let bms_dev = Device {
-                    id: "".to_string(),
-                    name: "BMS".to_string(),
-                    service: Arc::new(b),
-                    channel: Arc::new(Mutex::new(bms_rx)),
-                };
-                devices.insert("bms".to_string(), Arc::new(bms_dev));
+                devices.insert(
+                    DevType::BMS,
+                    Device::new(b.id.clone(), "BMS", Arc::new(b), bms_rx),
+                );
             }
 
             if let Ok(p) = pcs {
-                let pcs_dev = Device {
-                    id: "".to_string(),
-                    name: "PCS".to_string(),
-                    service: Arc::new(p),
-                    channel: Arc::new(Mutex::new(pcs_rx)),
-                };
-                devices.insert("pcs".to_string(), Arc::new(pcs_dev));
+                devices.insert(
+                    DevType::PCS,
+                    Device::new(p.id.clone(), "PCS", Arc::new(p), pcs_rx),
+                );
             }
         }
         info!("EMU初始化完成");
-        Emu { devices, tx }
+        Emu {
+            devices: Arc::new(devices),
+            tx,
+        }
     }
 
     pub async fn run(self: Arc<Self>) {
         let mut handles = Vec::new();
-        for (_tp, dev) in self.devices.iter() {
-            //启动协程处理服务
-            let handle = tokio::spawn({
-                //Arc指针计数+1
-                let dev_clone = Arc::clone(dev);
-                async move {
-                    //启动服务
-                    dev_clone.service.clone().serve().await;
-                }
-            });
-            handles.push(handle);
+        for dev in self.devices.values() {
+            let device_clone = Arc::clone(dev);
+            handles.push(tokio::spawn(async move {
+                device_clone.start().await;
+            }))
+        }
+        if let Err(e) = self.mqtt().await {
+            error!("MQTT 启动失败: {}", e);
         }
-        self.mqtt().await.unwrap();
         // let x = &self.devices;
         // let a = x.get(&"pcs".to_string()).unwrap().clone();
         // tokio::spawn(async move {
@@ -115,7 +99,7 @@ impl Emu {
             match handler.await {
                 Ok(_) => {}
                 Err(e) => {
-                    log::error!("内部错误: {}", e.to_string());
+                    log::error!("设备运行错误: {}", e.to_string());
                     continue;
                 }
             }
@@ -123,18 +107,25 @@ impl Emu {
     }
 
     async fn mqtt(&self) -> anyhow::Result<()> {
+        //mqtt Client
         let client = &mqtt_client().await?.mosq;
         let subscriptions = client.subscriber().unwrap();
-        client.subscribe("pcs", QoS::AtMostOnce).await?;
+        let topic_vec = &app_config().mqtt.topic;
+        //订阅这些主题
+        for topic in topic_vec.iter() {
+            client.subscribe(topic, QoS::AtMostOnce).await?;
+        }
         let sender = self.tx.clone();
+        let devices = Arc::clone(&self.devices);
+        //异步处理mqtt推送的消息
         tokio::spawn(async move {
             loop {
                 if let Ok(message) = subscriptions.recv().await {
                     match message {
                         Event::Message(msg) => {
-                            let string = String::from_utf8(msg.payload).unwrap();
-                            info!("topic: {}", msg.topic);
-                            sender.send(string).unwrap();
+                            if let Err(e) = mqtt_msg_handler(msg, &sender, &devices).await {
+                                error!("[mqtt] 订阅消息分发失败: {}", e.to_string());
+                            }
                         }
                         Event::Connected(_) => {
                             info!("[mqtt] 已连接")
@@ -149,3 +140,66 @@ impl Emu {
         Ok(())
     }
 }
+
+async fn mqtt_msg_handler(
+    msg: Message,
+    sender: &broadcast::Sender<BroadcastMessage>,
+    devices: &Arc<HashMap<DevType, Arc<Device>>>,
+) -> anyhow::Result<()> {
+    use serde_json::Value;
+    let json = String::from_utf8(msg.payload)?;
+    let v: Value = serde_json::from_str(json.as_str())?;
+    //{"key":"open_pcs","value":12}
+    if let (Some(k), Some(v)) = (v.get("key"), v.get("value")) {
+        //key must string, value must number
+        if k.is_string() && v.is_number() {
+            if let (Some(key), Some(value)) = (k.as_str(), v.as_number()) {
+                //topic 必须匹配 /asw/${dev}/${index}/setyt
+                if let Some((dev, _index)) = extract_with_regex(msg.topic.as_str()) {
+                    //广播给设备带上dev_id
+                    let id = match dev {
+                        "bms" => {
+                            let bms = devices
+                                .get(&DevType::BMS)
+                                .ok_or(anyhow::anyhow!("设备容器中找不到BMS"))?;
+                            Some(bms.id.as_str())
+                        }
+                        "pcs" => {
+                            let pcs = devices
+                                .get(&DevType::PCS)
+                                .ok_or(anyhow::anyhow!("设备容器中找不到PCS"))?;
+                            Some(pcs.id.as_str())
+                        }
+                        other => {
+                            error!("[mqtt] 目前没有<{}>这个设备", other);
+                            None
+                        }
+                    };
+                    if let Some(id) = id {
+                        sender.send(BroadcastMessage {
+                            id: id.to_string(),
+                            key: key.to_string(),
+                            value: Arc::new(value.clone()),
+                        })?;
+                    }
+                } else {
+                    error!("[mqtt] topic解析错误: {}", msg.topic)
+                }
+            }
+        } else {
+            error!("[mqtt] JSON解析错误, key is {}, value is {}", k, v)
+        }
+    } else {
+        error!("[mqtt] JSON解析错误")
+    }
+    Ok(())
+}
+
+fn extract_with_regex(path: &str) -> Option<(&str, &str)> {
+    let re = Regex::new(r"^/asw/([^/]+)/([^/]+)/setyt$").unwrap();
+    if let Some(caps) = re.captures(path) {
+        Some((caps.get(1)?.as_str(), caps.get(2)?.as_str()))
+    } else {
+        None
+    }
+}

+ 23 - 0
src/ems/emu/mod.rs

@@ -1,2 +1,25 @@
+use std::fmt::{Display, Formatter};
+use std::sync::Arc;
+use serde_json::Number;
+
 pub mod device;
 pub mod emu;
+
+#[derive(Clone, Debug)]
+pub struct BroadcastMessage {
+    pub id: String,
+    pub key: String,
+    pub value: Arc<dyn std::any::Any + Send + Sync>,
+}
+
+impl Display for BroadcastMessage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        if let Some(val) = self.value.downcast_ref::<Number>() {
+            write!(f, "({}, {})", self.key, val)
+        } else if let Some(val) = self.value.downcast_ref::<String>() {
+            write!(f, "({}, {})", self.key, val)
+        } else {
+            write!(f, "({}, {:?})", self.key, self.value.type_id())
+        }
+    }
+}

+ 2 - 1
src/ems/mod.rs

@@ -1,3 +1,4 @@
+use crate::ems::emu::BroadcastMessage;
 use tokio::join;
 
 pub mod bms;
@@ -5,7 +6,7 @@ pub mod emu;
 pub mod pcs;
 
 pub type Producer = tokio::sync::mpsc::Sender<String>;
-pub type Consumer = tokio::sync::broadcast::Receiver<String>;
+pub type Consumer = tokio::sync::broadcast::Receiver<BroadcastMessage>;
 #[async_trait::async_trait]
 pub trait Service: Send + Sync {
     async fn serve(&self) {

+ 7 - 7
src/ems/pcs/pcs.rs

@@ -8,7 +8,7 @@ use anyhow::bail;
 use log::{error, info};
 use std::sync::Arc;
 use tokio::sync::Mutex;
-use tokio_modbus::client::{Reader};
+use tokio_modbus::client::Reader;
 use tokio_modbus::FunctionCode;
 
 pub struct Pcs {
@@ -106,18 +106,18 @@ impl Pcs {
         Ok(())
     }
 
-    async fn read_holding_registers() -> anyhow::Result<()> {
-        todo!("")
-    }
+    // async fn read_holding_registers() -> anyhow::Result<()> {
+    //     todo!("")
+    // }
 }
 
 #[async_trait::async_trait]
 impl Service for Pcs {
     async fn read_task(&self) {
-        let producer = self.producer.clone();
+        let _producer = self.producer.clone();
         loop {
             tokio::time::sleep(tokio::time::Duration::from_millis(
-                app_config().emu.ems.read_interval,
+                app_config().emu.pcs.interval,
             ))
             .await;
             match self.read_input_register().await {
@@ -134,7 +134,7 @@ impl Service for Pcs {
 
     async fn write_task(&self) {
         let consumer = self.consumer.clone();
-        let ctx = self.ctx.clone();
+        let _ctx = self.ctx.clone();
         loop {
             let string = consumer.lock().await.recv().await.unwrap();
             info!("[PCS] {}", string);

+ 5 - 5
src/main.rs

@@ -1,10 +1,10 @@
 use inpower_iot_mgc_rs::cmd::cmd::cmd;
 
-#[cfg(not(target_env = "msvc"))]
-use tikv_jemallocator::Jemalloc;
-#[cfg(not(target_env = "msvc"))]
-#[global_allocator]
-static GLOBAL: Jemalloc = Jemalloc;
+// #[cfg(not(target_env = "msvc"))]
+// use tikv_jemallocator::Jemalloc;
+// #[cfg(not(target_env = "msvc"))]
+// #[global_allocator]
+// static GLOBAL: Jemalloc = Jemalloc;
 
 #[tokio::main]
 async fn main() {