Преглед изворни кода

【功能】接收MQTT消息

mikasa пре 3 недеља
родитељ
комит
f36db54a20
4 измењених фајлова са 34 додато и 14 уклоњено
  1. 1 4
      src/ems/bms/bms.rs
  2. 31 1
      src/ems/emu/emu.rs
  3. 2 8
      src/ems/pcs/pcs.rs
  4. 0 1
      src/internal/utils/mqtt.rs

+ 1 - 4
src/ems/bms/bms.rs

@@ -1,12 +1,9 @@
 use crate::cmd::config::app_config;
 use crate::ems::{Consumer, Producer, Service};
 use crate::internal::utils;
-use crate::internal::utils::mqtt::{mqtt_client, MqttClient};
 use anyhow::anyhow;
 use log::{debug, info};
 use socketcan::tokio::{AsyncCanSocket, CanSocket};
-use socketcan::{EmbeddedFrame, IoResult};
-use std::io::Read;
 use std::sync::Arc;
 use tokio::sync::Mutex;
 
@@ -45,7 +42,7 @@ impl Bms {
 #[async_trait::async_trait]
 impl Service for Bms {
     async fn read_task(&self) {
-        let producer = self.producer.clone();
+        let _producer = self.producer.clone();
         let can_socket = Arc::clone(&self.can_socket);
         loop {
             let can_frame = can_socket.read_frame().await.unwrap();

+ 31 - 1
src/ems/emu/emu.rs

@@ -2,7 +2,9 @@ use crate::ems::bms::bms::Bms;
 use crate::ems::pcs::pcs::Pcs;
 use crate::ems::Service;
 use crate::internal::utils;
+use crate::internal::utils::mqtt::mqtt_client;
 use log::info;
+use mosquitto_rs::{Event, QoS};
 use std::collections::HashMap;
 use std::sync::Arc;
 use tokio::sync::Mutex;
@@ -51,7 +53,7 @@ impl Emu {
             if let Ok(b) = bms {
                 let bms_dev = Device {
                     id: "".to_string(),
-                    name: "".to_string(),
+                    name: "BMS".to_string(),
                     service: Arc::new(b),
                     channel: Arc::new(Mutex::new(bms_rx)),
                 };
@@ -86,6 +88,7 @@ impl Emu {
             });
             handles.push(handle);
         }
+        self.mqtt().await.unwrap();
         // let x = &self.devices;
         // let a = x.get(&"pcs".to_string()).unwrap().clone();
         // tokio::spawn(async move {
@@ -118,4 +121,31 @@ impl Emu {
             }
         }
     }
+
+    async fn mqtt(&self) -> anyhow::Result<()> {
+        let client = &mqtt_client().await?.mosq;
+        let subscriptions = client.subscriber().unwrap();
+        client.subscribe("pcs", QoS::AtMostOnce).await?;
+        let sender = self.tx.clone();
+        tokio::spawn(async move {
+            loop {
+                if let Ok(message) = subscriptions.recv().await {
+                    match message {
+                        Event::Message(msg) => {
+                            let string = String::from_utf8(msg.payload).unwrap();
+                            info!("topic: {}", msg.topic);
+                            sender.send(string).unwrap();
+                        }
+                        Event::Connected(_) => {
+                            info!("[mqtt] 已连接")
+                        }
+                        Event::Disconnected(_) => {
+                            info!("[mqtt] 断开连接")
+                        }
+                    }
+                }
+            }
+        });
+        Ok(())
+    }
 }

+ 2 - 8
src/ems/pcs/pcs.rs

@@ -8,7 +8,7 @@ use anyhow::bail;
 use log::{error, info};
 use std::sync::Arc;
 use tokio::sync::Mutex;
-use tokio_modbus::client::{Reader, Writer};
+use tokio_modbus::client::{Reader};
 use tokio_modbus::FunctionCode;
 
 pub struct Pcs {
@@ -107,9 +107,8 @@ impl Pcs {
     }
 
     async fn read_holding_registers() -> anyhow::Result<()> {
-        Ok(())
+        todo!("")
     }
-
 }
 
 #[async_trait::async_trait]
@@ -139,11 +138,6 @@ impl Service for Pcs {
         loop {
             let string = consumer.lock().await.recv().await.unwrap();
             info!("[PCS] {}", string);
-            let _ = ctx
-                .lock()
-                .await
-                .write_multiple_registers(1000, &[0xAE10])
-                .await;
         }
     }
 }

+ 0 - 1
src/internal/utils/mqtt.rs

@@ -19,7 +19,6 @@ pub async fn mqtt_client() -> anyhow::Result<&'static MqttClient> {
     Ok(&conn)
 }
 
-
 pub struct MqttClient {
     pub mosq: Client,
 }