gold_bms.rs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. use crate::cmd::config::app_config;
  2. use crate::connect_or_retry;
  3. use crate::ems::{Consumer, Producer, Service};
  4. use crate::internal::modbus::code::{ModbusCode, ModbusData, RegisterType, Rw};
  5. use crate::internal::modbus::{read_csv_to_code, slice_sequential, MAX_BIT_CNT, MAX_WORD_CNT};
  6. use crate::internal::utils;
  7. use anyhow::{bail, Error};
  8. use async_trait::async_trait;
  9. use log::{error, info};
  10. use std::net::SocketAddr;
  11. use std::sync::Arc;
  12. use std::time::Duration;
  13. use tokio::sync::Mutex;
  14. use tokio_modbus::client::Reader;
  15. ///
  16. /// 高特BMS
  17. /// Modbus-TCP
  18. pub struct GoldBms {
  19. pub id: String,
  20. pub producer: Producer,
  21. pub consumer: Arc<Mutex<Consumer>>,
  22. pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
  23. pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
  24. }
  25. impl GoldBms {
  26. pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
  27. let id = utils::generate_random_str(12);
  28. utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "bms/bms.log").await;
  29. let config = read_csv_to_code("./config/gold_bms.csv").map_err(|e| {
  30. error!("协议生成失败:{}", e);
  31. anyhow::anyhow!(e)
  32. })?;
  33. let ctx = connect_modbus_tcp().await?;
  34. info!("BMS[{}]初始化成功!", id);
  35. Ok(GoldBms {
  36. id,
  37. producer,
  38. consumer: Arc::new(Mutex::new(consumer)),
  39. modbus_code: Arc::new(Mutex::new(config)),
  40. ctx: Arc::new(Mutex::new(ctx)),
  41. })
  42. }
  43. //读取输入寄存器
  44. async fn read_input_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
  45. let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
  46. for codes in seq_slice.into_iter() {
  47. if let Some(start_code) = codes.first() {
  48. let words = self
  49. .ctx
  50. .lock()
  51. .await
  52. .read_input_registers(start_code.addr, codes.len() as u16)
  53. .await??;
  54. if words.len() == codes.len() {
  55. words.iter().enumerate().for_each(|(i, x)| {
  56. codes[i].data = Some(ModbusData::Word(*x));
  57. });
  58. } else {
  59. bail!(
  60. "返回的数据长度不正确, register长度:{}, data长度:{}",
  61. vec.len(),
  62. words.len()
  63. )
  64. }
  65. }
  66. }
  67. Ok(())
  68. }
  69. //读取离散输入寄存器
  70. async fn read_discrete_input(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
  71. let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_BIT_CNT);
  72. for codes in seq_slice.into_iter() {
  73. if let Some(start_code) = codes.first() {
  74. let words = self
  75. .ctx
  76. .lock()
  77. .await
  78. .read_discrete_inputs(start_code.addr, codes.len() as u16)
  79. .await??;
  80. if words.len() == codes.len() {
  81. for (i, x) in words.into_iter().enumerate() {
  82. codes[i].data = Some(ModbusData::Bit(x))
  83. }
  84. } else {
  85. bail!(
  86. "返回的数据长度不正确, register长度:{}, data长度:{}",
  87. vec.len(),
  88. words.len()
  89. )
  90. }
  91. }
  92. }
  93. Ok(())
  94. }
  95. async fn read_holding_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
  96. let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
  97. for codes in seq_slice.into_iter() {
  98. if let Some(start_code) = codes.first() {
  99. let words = self
  100. .ctx
  101. .lock()
  102. .await
  103. .read_holding_registers(start_code.addr, codes.len() as u16)
  104. .await??;
  105. if words.len() == codes.len() {
  106. words.iter().enumerate().for_each(|(i, x)| {
  107. codes[i].data = Some(ModbusData::Word(*x));
  108. });
  109. } else {
  110. bail!(
  111. "返回的数据长度不正确, register长度:{}, data长度:{}",
  112. vec.len(),
  113. words.len()
  114. )
  115. }
  116. }
  117. }
  118. Ok(())
  119. }
  120. }
  121. #[async_trait]
  122. impl Service for GoldBms {
  123. async fn south(&self) {
  124. loop {
  125. tokio::time::sleep(Duration::from_millis(app_config().emu.bms.interval)).await;
  126. let code_clone = self.modbus_code.clone();
  127. let mut modbus_guard = code_clone.lock().await;
  128. if let Err(e) = self
  129. .read_discrete_input(
  130. modbus_guard
  131. .iter_mut()
  132. .filter(|it| it.code == RegisterType::DiscreteInput)
  133. .collect(),
  134. )
  135. .await
  136. {
  137. handle_modbus_error(&e, &self.ctx, "读取离散寄存器失败").await;
  138. }
  139. if let Err(e) = self
  140. .read_input_register(
  141. modbus_guard
  142. .iter_mut()
  143. .filter(|it| it.code == RegisterType::InputRegister)
  144. .collect(),
  145. )
  146. .await
  147. {
  148. handle_modbus_error(&e, &self.ctx, "读取输入寄存器失败").await;
  149. }
  150. if let Err(e) = self
  151. .read_holding_register(
  152. modbus_guard
  153. .iter_mut()
  154. .filter(|it| {
  155. it.code == RegisterType::HoldingRegister && it.rw == Some(Rw::RW)
  156. })
  157. .collect(),
  158. )
  159. .await
  160. {
  161. handle_modbus_error(&e, &self.ctx, "读取保持寄存器失败").await;
  162. }
  163. }
  164. }
  165. async fn north(&self) {}
  166. }
  167. async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
  168. let config = &app_config().emu.bms.gold_bms;
  169. let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
  170. connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "[BMS] ModbusTCP")
  171. }
  172. async fn handle_modbus_error(e: &Error, ctx: &Mutex<tokio_modbus::client::Context>, log_str: &str) {
  173. //错误是tokio_modbus的则重新连接
  174. if let Some(_) = e.downcast_ref::<tokio_modbus::Error>() {
  175. let mut ctx = ctx.lock().await;
  176. if let Ok(context) = connect_modbus_tcp().await {
  177. *ctx = context;
  178. }
  179. } else {
  180. error!("{log_str}: {}", e);
  181. }
  182. }