Browse Source

【框架搭建】update

yuanan 2 tháng trước cách đây
mục cha
commit
1dc9802016
11 tập tin đã thay đổi với 327 bổ sung103 xóa
  1. 83 0
      Cargo.lock
  2. 4 0
      Cargo.toml
  3. 3 6
      src/cmd/cmd.rs
  4. 6 25
      src/ems/bms/bms.rs
  5. 59 0
      src/ems/emu/emu.rs
  6. 1 0
      src/ems/emu/mod.rs
  7. 6 3
      src/ems/mod.rs
  8. 16 18
      src/ems/pcs/pcs.rs
  9. 0 50
      src/ems/service.rs
  10. 11 0
      src/internal/utils/mod.rs
  11. 138 1
      src/main.rs

+ 83 - 0
Cargo.lock

@@ -462,9 +462,11 @@ name = "inpower-iot-mgc-rs"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-trait",
  "chrono",
  "clap",
  "log",
+ "rand",
  "serde",
  "serde_yml",
  "tklog",
@@ -680,6 +682,15 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
 
+[[package]]
+name = "ppv-lite86"
+version = "0.2.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
+dependencies = [
+ "zerocopy 0.7.35",
+]
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.93"
@@ -698,6 +709,37 @@ dependencies = [
  "proc-macro2",
 ]
 
+[[package]]
+name = "rand"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
+dependencies = [
+ "rand_chacha",
+ "rand_core",
+ "zerocopy 0.8.17",
+]
+
+[[package]]
+name = "rand_chacha"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
+dependencies = [
+ "ppv-lite86",
+ "rand_core",
+]
+
+[[package]]
+name = "rand_core"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff"
+dependencies = [
+ "getrandom",
+ "zerocopy 0.8.17",
+]
+
 [[package]]
 name = "redox_syscall"
 version = "0.5.8"
@@ -1275,3 +1317,44 @@ checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
 dependencies = [
  "bitflags 2.8.0",
 ]
+
+[[package]]
+name = "zerocopy"
+version = "0.7.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
+dependencies = [
+ "byteorder",
+ "zerocopy-derive 0.7.35",
+]
+
+[[package]]
+name = "zerocopy"
+version = "0.8.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aa91407dacce3a68c56de03abe2760159582b846c6a4acd2f456618087f12713"
+dependencies = [
+ "zerocopy-derive 0.8.17",
+]
+
+[[package]]
+name = "zerocopy-derive"
+version = "0.7.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "zerocopy-derive"
+version = "0.8.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06718a168365cad3d5ff0bb133aad346959a2074bd4a85c121255a11304a8626"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]

+ 4 - 0
Cargo.toml

@@ -20,11 +20,15 @@ tokio-cron-scheduler = "0.13.0"
 tokio-modbus = { version = "0.16.1", features = ["rtu", "tcp"] }
 # tokio 异步串口支持
 tokio-serial = "5.4.5"
+# CAN协议支持,开启tokio特性
+#socketcan = { version = "3.5.0", features = ["tokio"] }
 # 时间工具
 chrono = "0.4.39"
 # 日志框架
 tklog = "0.2.9"
 # 日志框架
 log = "0.4.25"
+rand = "0.9.0"
+async-trait = "0.1.86"
 
 

+ 3 - 6
src/cmd/cmd.rs

@@ -1,6 +1,7 @@
 use crate::cmd::config::{AppConfig, EMU_CONFIG};
 use crate::ems;
 use clap::Parser;
+use std::sync::Arc;
 use tokio::fs::read_to_string;
 
 #[derive(Parser, Debug)]
@@ -14,12 +15,8 @@ pub async fn cmd() {
     let args = Args::parse();
     match init_config(args.config).await {
         Ok(()) => {
-            println!("{:?}", EMU_CONFIG.get().unwrap());
-            ems::service::Ems::new()
-                .await
-                .start_services()
-                .await
-                .unwrap()
+            let emu = Arc::new(ems::emu::emu::Emu::new().await);
+            let _ = emu.run().await;
         }
         Err(e) => {
             eprintln!("配置加载失败: {}", e.to_string());

+ 6 - 25
src/ems/bms/bms.rs

@@ -1,35 +1,16 @@
-use crate::ems::service::Service;
 use crate::internal::utils;
-use anyhow::Error;
 use log::info;
-use std::time::Duration;
-use tokio::task::JoinHandle;
 
-pub struct Bms {}
+pub struct Bms {
+    pub id: String,
+}
 
 impl Bms {
     /// 初始化BMS
     pub async fn new() -> Self {
+        let id = utils::generate_random_str(12);
         utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "logs/bms.log").await;
-        info!("BMS初始化成功");
-        Bms {}
-    }
-}
-
-impl Service for Bms {
-    fn run(&self) -> Result<JoinHandle<()>, Error> {
-        let handle: JoinHandle<()> = tokio::spawn(async {
-            let read_handle = tokio::spawn(async {
-                loop {
-                    tokio::time::sleep(Duration::from_secs(3)).await;
-                }
-            });
-            let write_handle = tokio::spawn(async {
-                loop {
-                    tokio::time::sleep(Duration::from_secs(1)).await;
-                }
-            });
-        });
-        Ok(handle)
+        info!("BMS [{}] 初始化成功", id);
+        Bms { id }
     }
 }

+ 59 - 0
src/ems/emu/emu.rs

@@ -0,0 +1,59 @@
+use crate::ems::pcs::pcs::Pcs;
+use crate::ems::Service;
+use crate::internal::utils;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::sync;
+use tokio::sync::Mutex;
+
+type ServiceChannels = Arc<HashMap<String, Mutex<sync::mpsc::Receiver<String>>>>;
+
+/// services 是所有实现了Service接口的设备集合
+/// tx 是EMU向其他设备广播消息的发送器
+/// service_channels 是所有设备向EMU发送消息的通道集合
+pub struct Emu {
+    pub services: Vec<Arc<Mutex<dyn Service>>>,
+    pub tx: sync::broadcast::Sender<String>,
+    pub service_channels: ServiceChannels,
+}
+impl Emu {
+    pub async fn new() -> Self {
+        //初始化日志
+        utils::log::init_log("inpower_iot_mgc_rs::ems::emu::*", "logs/emu.log").await;
+        let (tx, _) = sync::broadcast::channel::<String>(8);
+        let mut services: Vec<Arc<Mutex<dyn Service>>> = Vec::new();
+        let mut service_channels = HashMap::new();
+        log::info!("EMU初始化完成");
+        //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 创建PCS
+        let (pcs_tx, pcs_rx) = sync::mpsc::channel::<String>(8);
+        let pcs = Arc::new(Mutex::new(Pcs::new(pcs_tx, tx.subscribe()).await));
+        services.push(pcs);
+        service_channels.insert(1.to_string(), Mutex::new(pcs_rx));
+        //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
+        Emu {
+            services,
+            service_channels: Arc::new(service_channels),
+            tx,
+        }
+    }
+
+    pub async fn run(self: Arc<Self>) {
+        let mut handles = Vec::new();
+        for service in self.services.iter() {
+            let service_clone = service.clone();
+            let handle = tokio::spawn(async move {
+                service_clone.lock().await.serve().await;
+            });
+            handles.push(handle);
+        }
+        for handler in handles {
+            match handler.await {
+                Ok(_) => {}
+                Err(e) => {
+                    log::error!("内部错误: {}", e.to_string());
+                    continue;
+                }
+            }
+        }
+    }
+}

+ 1 - 0
src/ems/emu/mod.rs

@@ -0,0 +1 @@
+pub mod emu;

+ 6 - 3
src/ems/mod.rs

@@ -1,7 +1,10 @@
 pub mod bms;
+pub mod emu;
 pub mod pcs;
-pub mod service;
 
-pub trait Job: Send + Sync {
-    fn do_some(&self);
+pub type Producer = tokio::sync::mpsc::Sender<String>;
+pub type Consumer = tokio::sync::broadcast::Receiver<String>;
+#[async_trait::async_trait]
+pub trait Service: Send + Sync {
+    async fn serve(&mut self);
 }

+ 16 - 18
src/ems/pcs/pcs.rs

@@ -1,30 +1,28 @@
-use crate::cmd::config::EMU_CONFIG;
-use crate::ems::service::Service;
+use crate::ems::{Consumer, Producer, Service};
 use crate::internal::utils;
-use anyhow::Error;
 use log::info;
-use std::time::Duration;
-use tokio::task::JoinHandle;
 
-pub struct Pcs {}
+pub struct Pcs {
+    pub id: String,
+    pub producer: Producer,
+    pub consumer: Consumer,
+}
 
 impl Pcs {
     /// 初始化PCS
-    pub async fn new() -> Self {
+    pub async fn new(producer: Producer, consumer: Consumer) -> Self {
+        let id = utils::generate_random_str(12);
         utils::log::init_log("inpower_iot_mgc_rs::ems::pcs::*", "logs/pcs.log").await;
-        info!("PCS初始化成功");
-        Pcs {}
+        info!("PCS [{}] 初始化成功", id);
+        Pcs {
+            id,
+            producer,
+            consumer,
+        }
     }
 }
 
+#[async_trait::async_trait]
 impl Service for Pcs {
-    fn run(&self) -> Result<JoinHandle<()>, Error> {
-        let emu_config = EMU_CONFIG.get().unwrap();
-        let handle = tokio::spawn(async {
-            loop {
-                tokio::time::sleep(Duration::from_millis(emu_config.emu.ems.read_interval)).await;
-            }
-        });
-        Ok(handle)
-    }
+    async fn serve(&mut self) {}
 }

+ 0 - 50
src/ems/service.rs

@@ -1,50 +0,0 @@
-use crate::ems::bms::bms::Bms;
-use crate::ems::pcs::pcs::Pcs;
-use crate::ems::service::Device::{BMS, PCS};
-use std::collections::HashMap;
-use tokio::task::JoinHandle;
-
-pub trait Service {
-    fn run(&self) -> Result<JoinHandle<()>, anyhow::Error>;
-}
-
-/// 设备类型
-#[derive(Eq, Hash, PartialEq)]
-pub enum Device<'a> {
-    PCS(&'a str),
-    BMS(&'a str),
-}
-
-pub struct Ctx {}
-
-/// EMS
-/// devices: 实现了Service trait的列表
-pub struct Ems<'a> {
-    pub(crate) devices: HashMap<Device<'a>, Box<dyn Service>>,
-    pub context: Ctx,
-}
-
-impl Ems<'_> {
-    pub async fn new() -> Self {
-        let mut devices: HashMap<Device, Box<dyn Service>> = HashMap::new();
-        let pcs = Pcs::new().await;
-        let bms = Bms::new().await;
-        devices.insert(PCS("PCS"), Box::new(pcs));
-        devices.insert(BMS("BMS"), Box::new(bms));
-        devices.get(&PCS("PCS")).unwrap().run().expect("");
-        let context = Ctx {};
-        Ems { devices, context }
-    }
-
-    pub async fn start_services(&self) -> Result<(), anyhow::Error> {
-        let mut handles = Vec::new();
-        for (_name, dev) in &self.devices {
-            let handle = dev.run()?;
-            handles.push(handle);
-        }
-        for handle in handles {
-            handle.await?;
-        }
-        Ok(())
-    }
-}

+ 11 - 0
src/internal/utils/mod.rs

@@ -1 +1,12 @@
+use rand::distr::Alphanumeric;
+use rand::{rng, Rng};
+
 pub mod log;
+
+pub fn generate_random_str(len: usize) -> String {
+    rng()
+        .sample_iter(&Alphanumeric) // 随机从 Alphanumeric(字母和数字)中取样
+        .take(len) // 取前 len 个字符
+        .map(char::from) // 将 u8 转换为 char
+        .collect() // 收集为一个字符串
+}

+ 138 - 1
src/main.rs

@@ -1,6 +1,143 @@
+// use async_trait::async_trait;
+// use std::{sync::Arc, time::Duration};
+// use tokio::{
+//     sync::{broadcast, mpsc},
+//     time,
+// };
+//
+// #[async_trait]
+// trait Service: Send + Sync {
+//     async fn serve(
+//         self: Arc<Self>,
+//         output_tx: broadcast::Sender<String>,
+//         input_rx: mpsc::Receiver<String>,
+//     );
+// }
+//
+// struct Pcs;
+//
+// impl Pcs {
+//     fn new() -> Self {
+//         Pcs
+//     }
+// }
+//
+// #[async_trait]
+// impl Service for Pcs {
+//     async fn serve(
+//         self: Arc<Self>,
+//         output_tx: broadcast::Sender<String>,
+//         mut input_rx: mpsc::Receiver<String>,
+//     ) {
+//         // 消息发送任务
+//         let send_task = tokio::spawn({
+//             let output_tx = output_tx.clone();
+//             async move {
+//                 loop {
+//                     time::sleep(Duration::from_secs(2)).await;
+//                     let _ = output_tx.send("我是PCS".to_string());
+//                 }
+//             }
+//         });
+//
+//         // 消息处理任务
+//         let handle_task = tokio::spawn(async move {
+//             while let Some(msg) = input_rx.recv().await {
+//                 println!("回复: {}", msg);
+//             }
+//         });
+//
+//         let _ = tokio::join!(send_task, handle_task);
+//     }
+// }
+//
+// struct Bms;
+//
+// impl Bms {
+//     fn new() -> Self {
+//         Bms
+//     }
+// }
+//
+// #[async_trait]
+// impl Service for Bms {
+//     async fn serve(
+//         self: Arc<Self>,
+//         output_tx: broadcast::Sender<String>,
+//         _input_rx: mpsc::Receiver<String>,
+//     ) {
+//         tokio::spawn(async move {
+//             loop {
+//                 time::sleep(Duration::from_secs(2)).await;
+//                 let _ = output_tx.send("我是BMS".to_string());
+//             }
+//         })
+//         .await
+//         .unwrap();
+//     }
+// }
+//
+// struct Emu {
+//     outputs: Vec<broadcast::Receiver<String>>,
+//     inputs: Vec<mpsc::Sender<String>>,
+// }
+//
+// impl Emu {
+//     fn new() -> Self {
+//         // 初始化PCS
+//         let (pcs_output_tx, pcs_output_rx) = broadcast::channel(32);
+//         let (pcs_input_tx, pcs_input_rx) = mpsc::channel(32);
+//         let pcs = Arc::new(Pcs::new());
+//         tokio::spawn(pcs.clone().serve(pcs_output_tx, pcs_input_rx));
+//
+//         // 初始化BMS
+//         let (bms_output_tx, bms_output_rx) = broadcast::channel(32);
+//         let (bms_input_tx, bms_input_rx) = mpsc::channel(32);
+//         let bms = Arc::new(Bms::new());
+//         tokio::spawn(bms.clone().serve(bms_output_tx, bms_input_rx));
+//
+//         Emu {
+//             outputs: vec![pcs_output_rx, bms_output_rx],
+//             inputs: vec![pcs_input_tx, bms_input_tx],
+//         }
+//     }
+//
+//     async fn start_server(&mut self) {
+//         // 处理输出消息
+//         for rx in &mut self.outputs {
+//             let mut rx = rx.resubscribe();
+//             tokio::spawn(async move {
+//                 while let Ok(msg) = rx.recv().await {
+//                     println!("{}", msg);
+//                 }
+//             });
+//         }
+//
+//         // 模拟发送消息到PCS
+//         let pcs_input = self.inputs[0].clone();
+//         tokio::spawn(async move {
+//             loop {
+//                 time::sleep(Duration::from_secs(3)).await;
+//                 pcs_input.send("你好".to_string()).await.unwrap();
+//             }
+//         });
+//
+//         // 等待Ctrl-C
+//         tokio::signal::ctrl_c().await.unwrap();
+//         println!("EMU系统关闭");
+//     }
+// }
+//
+// #[tokio::main]
+// async fn main() {
+//     println!("EMU系统启动");
+//     let mut emu = Emu::new();
+//     emu.start_server().await;
+// }
+
 use inpower_iot_mgc_rs::cmd::cmd::cmd;
 
 #[tokio::main]
 async fn main() {
     cmd().await;
-}
+}