فهرست منبع

【框架搭建】update

yuanan 2 ماه پیش
والد
کامیت
0ad8c7bbe0
8فایلهای تغییر یافته به همراه139 افزوده شده و 166 حذف شده
  1. 1 0
      Cargo.toml
  2. 35 2
      src/ems/bms/bms.rs
  3. 4 0
      src/ems/emu/device.rs
  4. 71 23
      src/ems/emu/emu.rs
  5. 1 0
      src/ems/emu/mod.rs
  6. 1 1
      src/ems/mod.rs
  7. 26 3
      src/ems/pcs/pcs.rs
  8. 0 137
      src/main.rs

+ 1 - 0
Cargo.toml

@@ -29,6 +29,7 @@ tklog = "0.2.9"
 # 日志框架
 log = "0.4.25"
 rand = "0.9.0"
+# 异步trait支持
 async-trait = "0.1.86"
 
 

+ 35 - 2
src/ems/bms/bms.rs

@@ -1,16 +1,49 @@
+use crate::ems::{Consumer, Producer, Service};
 use crate::internal::utils;
 use log::info;
+use std::sync::Arc;
+use tokio::join;
+use tokio::sync::Mutex;
 
 pub struct Bms {
     pub id: String,
+    pub producer: Producer,
+    pub consumer: Arc<Mutex<Consumer>>,
 }
 
 impl Bms {
     /// 初始化BMS
-    pub async fn new() -> Self {
+    pub async fn new(producer: Producer, consumer: Consumer) -> Self {
         let id = utils::generate_random_str(12);
         utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "logs/bms.log").await;
         info!("BMS [{}] 初始化成功", id);
-        Bms { id }
+        Bms {
+            id,
+            producer,
+            consumer: Arc::new(Mutex::new(consumer)),
+        }
+    }
+}
+
+async fn read_task(producer: Producer) {
+    loop {
+        producer.send("PCS 你好".to_string()).await.unwrap();
+        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
+    }
+}
+
+async fn write_task(consumer: Arc<Mutex<Consumer>>) {
+    loop {
+        let string = consumer.lock().await.recv().await.unwrap();
+        info!("[BMS] {}", string);
+    }
+}
+
+#[async_trait::async_trait]
+impl Service for Bms {
+    async fn serve(&self) {
+        let read_handle = tokio::spawn(read_task(self.producer.clone()));
+        let write_handle = tokio::spawn(write_task(self.consumer.clone()));
+        let _ = join!(read_handle, write_handle);
     }
 }

+ 4 - 0
src/ems/emu/device.rs

@@ -0,0 +1,4 @@
+pub enum DeviceType {
+    PCS,
+    EMS,
+}

+ 71 - 23
src/ems/emu/emu.rs

@@ -1,51 +1,99 @@
+use crate::ems::bms::bms::Bms;
 use crate::ems::pcs::pcs::Pcs;
 use crate::ems::Service;
 use crate::internal::utils;
+use log::info;
 use std::collections::HashMap;
 use std::sync::Arc;
 use tokio::sync;
 use tokio::sync::Mutex;
 
-type ServiceChannels = Arc<HashMap<String, Mutex<sync::mpsc::Receiver<String>>>>;
+/// 抽象设备结构
+/// id: 设备唯一ID
+/// name: 设备名称
+/// service: 设备的服务
+/// channel: 广播接收器
+pub struct Device {
+    pub id: String,
+    pub name: String,
+    pub service: Arc<Mutex<dyn Service>>,
+    pub channel: Arc<Mutex<sync::mpsc::Receiver<String>>>,
+}
 
-/// services 是所有实现了Service接口的设备集合
-/// tx 是EMU向其他设备广播消息的发送器
-/// service_channels 是所有设备向EMU发送消息的通道集合
+/// EMU 能量管理处理单元
+/// devices: 设备集合
+/// tx: 广播发生器
 pub struct Emu {
-    pub services: Vec<Arc<Mutex<dyn Service>>>,
+    pub devices: HashMap<String, Arc<Device>>,
     pub tx: sync::broadcast::Sender<String>,
-    pub service_channels: ServiceChannels,
 }
+
 impl Emu {
+    /// 创建EMU实例
     pub async fn new() -> Self {
         //初始化日志
         utils::log::init_log("inpower_iot_mgc_rs::ems::emu::*", "logs/emu.log").await;
+        //EMU消息广播器
         let (tx, _) = sync::broadcast::channel::<String>(8);
-        let mut services: Vec<Arc<Mutex<dyn Service>>> = Vec::new();
-        let mut service_channels = HashMap::new();
-        log::info!("EMU初始化完成");
-        //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 创建PCS
-        let (pcs_tx, pcs_rx) = sync::mpsc::channel::<String>(8);
-        let pcs = Arc::new(Mutex::new(Pcs::new(pcs_tx, tx.subscribe()).await));
-        services.push(pcs);
-        service_channels.insert(1.to_string(), Mutex::new(pcs_rx));
-        //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
-        Emu {
-            services,
-            service_channels: Arc::new(service_channels),
-            tx,
+        let mut devices = HashMap::new();
+        info!("EMU初始化完成");
+        {
+            //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 创建PCS
+            let (pcs_tx, pcs_rx) = sync::mpsc::channel::<String>(8);
+            let pcs = Arc::new(Mutex::new(Pcs::new(pcs_tx, tx.subscribe()).await));
+            let pcs_dev = Device {
+                id: "".to_string(),
+                name: "PCS".to_string(),
+                service: pcs,
+                channel: Arc::new(Mutex::new(pcs_rx)),
+            };
+            devices.insert("pcs".to_string(), Arc::new(pcs_dev));
+            //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
+
+            //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 创建BMS
+            let (bms_tx, bms_rx) = sync::mpsc::channel::<String>(8);
+            let bms = Arc::new(Mutex::new(Bms::new(bms_tx, tx.subscribe()).await));
+            let bms_dev = Device {
+                id: "".to_string(),
+                name: "".to_string(),
+                service: bms,
+                channel: Arc::new(Mutex::new(bms_rx)),
+            };
+            devices.insert("bms".to_string(), Arc::new(bms_dev));
+            //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
         }
+        Emu { devices, tx }
     }
 
     pub async fn run(self: Arc<Self>) {
         let mut handles = Vec::new();
-        for service in self.services.iter() {
-            let service_clone = service.clone();
-            let handle = tokio::spawn(async move {
-                service_clone.lock().await.serve().await;
+        for (_tp, dev) in self.devices.iter() {
+            //启动协程处理服务
+            let handle = tokio::spawn({
+                //Arc指针计数+1
+                let dev_clone = Arc::clone(dev);
+                async move {
+                    //启动服务, serve()函数签名是&self, 暂时可以不用Mutex锁
+                    dev_clone.service.clone().lock().await.serve().await;
+                }
             });
             handles.push(handle);
         }
+        let x = &self.devices;
+        let a = x.get(&"pcs".to_string()).unwrap().clone();
+        tokio::spawn(async move {
+            loop {
+                let str = a.channel.lock().await.recv().await.unwrap();
+                info!("[BMS] {}", str)
+            }
+        });
+        let sender = self.tx.clone();
+        tokio::spawn(async move {
+            loop {
+                sender.send(String::from("BMS发出了信号")).unwrap();
+                tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
+            }
+        });
         for handler in handles {
             match handler.await {
                 Ok(_) => {}

+ 1 - 0
src/ems/emu/mod.rs

@@ -1 +1,2 @@
+pub mod device;
 pub mod emu;

+ 1 - 1
src/ems/mod.rs

@@ -6,5 +6,5 @@ pub type Producer = tokio::sync::mpsc::Sender<String>;
 pub type Consumer = tokio::sync::broadcast::Receiver<String>;
 #[async_trait::async_trait]
 pub trait Service: Send + Sync {
-    async fn serve(&mut self);
+    async fn serve(&self);
 }

+ 26 - 3
src/ems/pcs/pcs.rs

@@ -1,11 +1,14 @@
 use crate::ems::{Consumer, Producer, Service};
 use crate::internal::utils;
 use log::info;
+use std::sync::Arc;
+use tokio::join;
+use tokio::sync::Mutex;
 
 pub struct Pcs {
     pub id: String,
     pub producer: Producer,
-    pub consumer: Consumer,
+    pub consumer: Arc<Mutex<Consumer>>,
 }
 
 impl Pcs {
@@ -17,12 +20,32 @@ impl Pcs {
         Pcs {
             id,
             producer,
-            consumer,
+            consumer: Arc::new(Mutex::new(consumer)),
         }
     }
 }
 
+async fn read_task(producer: Producer) {
+    loop {
+        producer.send("PCS nihao".to_string()).await.unwrap();
+        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
+    }
+}
+
+async fn write_task(consumer: Arc<Mutex<Consumer>>) {
+    loop {
+        let string = consumer.lock().await.recv().await.unwrap();
+        info!("[PCS] {}", string);
+    }
+}
+
 #[async_trait::async_trait]
 impl Service for Pcs {
-    async fn serve(&mut self) {}
+    /// PCS 服务
+    /// 内部具有并发读写寄存器
+    async fn serve(&self) {
+        let read_handle = tokio::spawn(read_task(self.producer.clone()));
+        let write_handle = tokio::spawn(write_task(self.consumer.clone()));
+        let _ = join!(read_handle, write_handle);
+    }
 }

+ 0 - 137
src/main.rs

@@ -1,140 +1,3 @@
-// use async_trait::async_trait;
-// use std::{sync::Arc, time::Duration};
-// use tokio::{
-//     sync::{broadcast, mpsc},
-//     time,
-// };
-//
-// #[async_trait]
-// trait Service: Send + Sync {
-//     async fn serve(
-//         self: Arc<Self>,
-//         output_tx: broadcast::Sender<String>,
-//         input_rx: mpsc::Receiver<String>,
-//     );
-// }
-//
-// struct Pcs;
-//
-// impl Pcs {
-//     fn new() -> Self {
-//         Pcs
-//     }
-// }
-//
-// #[async_trait]
-// impl Service for Pcs {
-//     async fn serve(
-//         self: Arc<Self>,
-//         output_tx: broadcast::Sender<String>,
-//         mut input_rx: mpsc::Receiver<String>,
-//     ) {
-//         // 消息发送任务
-//         let send_task = tokio::spawn({
-//             let output_tx = output_tx.clone();
-//             async move {
-//                 loop {
-//                     time::sleep(Duration::from_secs(2)).await;
-//                     let _ = output_tx.send("我是PCS".to_string());
-//                 }
-//             }
-//         });
-//
-//         // 消息处理任务
-//         let handle_task = tokio::spawn(async move {
-//             while let Some(msg) = input_rx.recv().await {
-//                 println!("回复: {}", msg);
-//             }
-//         });
-//
-//         let _ = tokio::join!(send_task, handle_task);
-//     }
-// }
-//
-// struct Bms;
-//
-// impl Bms {
-//     fn new() -> Self {
-//         Bms
-//     }
-// }
-//
-// #[async_trait]
-// impl Service for Bms {
-//     async fn serve(
-//         self: Arc<Self>,
-//         output_tx: broadcast::Sender<String>,
-//         _input_rx: mpsc::Receiver<String>,
-//     ) {
-//         tokio::spawn(async move {
-//             loop {
-//                 time::sleep(Duration::from_secs(2)).await;
-//                 let _ = output_tx.send("我是BMS".to_string());
-//             }
-//         })
-//         .await
-//         .unwrap();
-//     }
-// }
-//
-// struct Emu {
-//     outputs: Vec<broadcast::Receiver<String>>,
-//     inputs: Vec<mpsc::Sender<String>>,
-// }
-//
-// impl Emu {
-//     fn new() -> Self {
-//         // 初始化PCS
-//         let (pcs_output_tx, pcs_output_rx) = broadcast::channel(32);
-//         let (pcs_input_tx, pcs_input_rx) = mpsc::channel(32);
-//         let pcs = Arc::new(Pcs::new());
-//         tokio::spawn(pcs.clone().serve(pcs_output_tx, pcs_input_rx));
-//
-//         // 初始化BMS
-//         let (bms_output_tx, bms_output_rx) = broadcast::channel(32);
-//         let (bms_input_tx, bms_input_rx) = mpsc::channel(32);
-//         let bms = Arc::new(Bms::new());
-//         tokio::spawn(bms.clone().serve(bms_output_tx, bms_input_rx));
-//
-//         Emu {
-//             outputs: vec![pcs_output_rx, bms_output_rx],
-//             inputs: vec![pcs_input_tx, bms_input_tx],
-//         }
-//     }
-//
-//     async fn start_server(&mut self) {
-//         // 处理输出消息
-//         for rx in &mut self.outputs {
-//             let mut rx = rx.resubscribe();
-//             tokio::spawn(async move {
-//                 while let Ok(msg) = rx.recv().await {
-//                     println!("{}", msg);
-//                 }
-//             });
-//         }
-//
-//         // 模拟发送消息到PCS
-//         let pcs_input = self.inputs[0].clone();
-//         tokio::spawn(async move {
-//             loop {
-//                 time::sleep(Duration::from_secs(3)).await;
-//                 pcs_input.send("你好".to_string()).await.unwrap();
-//             }
-//         });
-//
-//         // 等待Ctrl-C
-//         tokio::signal::ctrl_c().await.unwrap();
-//         println!("EMU系统关闭");
-//     }
-// }
-//
-// #[tokio::main]
-// async fn main() {
-//     println!("EMU系统启动");
-//     let mut emu = Emu::new();
-//     emu.start_server().await;
-// }
-
 use inpower_iot_mgc_rs::cmd::cmd::cmd;
 
 #[tokio::main]