123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- use crate::cmd::config::app_config;
- use crate::connect_or_retry;
- use crate::ems::{Consumer, Producer, Service};
- use crate::internal::modbus::code::{ModbusCode, ModbusData, RegisterType, Rw};
- use crate::internal::modbus::{read_csv_to_code, slice_sequential, MAX_BIT_CNT, MAX_WORD_CNT};
- use crate::internal::utils;
- use anyhow::{bail, Error};
- use async_trait::async_trait;
- use log::{error, info};
- use std::net::SocketAddr;
- use std::sync::Arc;
- use std::time::Duration;
- use tokio::sync::Mutex;
- use tokio_modbus::client::Reader;
- ///
- /// 高特BMS
- /// Modbus-TCP
- pub struct GoldBms {
- pub id: String,
- pub producer: Producer,
- pub consumer: Arc<Mutex<Consumer>>,
- pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
- pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
- }
- impl GoldBms {
- pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
- let id = utils::generate_random_str(12);
- utils::log::init_log("inpower_iot_mgc_rs::ems::bms::*", "bms/bms.log").await;
- let config = read_csv_to_code("./config/gold_bms.csv").map_err(|e| {
- error!("协议生成失败:{}", e);
- anyhow::anyhow!(e)
- })?;
- let ctx = connect_modbus_tcp().await?;
- info!("BMS[{}]初始化成功!", id);
- Ok(GoldBms {
- id,
- producer,
- consumer: Arc::new(Mutex::new(consumer)),
- modbus_code: Arc::new(Mutex::new(config)),
- ctx: Arc::new(Mutex::new(ctx)),
- })
- }
- //读取输入寄存器
- async fn read_input_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
- let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
- for codes in seq_slice.into_iter() {
- if let Some(start_code) = codes.first() {
- let words = self
- .ctx
- .lock()
- .await
- .read_input_registers(start_code.addr, codes.len() as u16)
- .await??;
- if words.len() == codes.len() {
- words.iter().enumerate().for_each(|(i, x)| {
- codes[i].data = Some(ModbusData::Word(*x));
- });
- } else {
- bail!(
- "返回的数据长度不正确, register长度:{}, data长度:{}",
- vec.len(),
- words.len()
- )
- }
- }
- }
- Ok(())
- }
- //读取离散输入寄存器
- async fn read_discrete_input(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
- let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_BIT_CNT);
- for codes in seq_slice.into_iter() {
- if let Some(start_code) = codes.first() {
- let words = self
- .ctx
- .lock()
- .await
- .read_discrete_inputs(start_code.addr, codes.len() as u16)
- .await??;
- if words.len() == codes.len() {
- for (i, x) in words.into_iter().enumerate() {
- codes[i].data = Some(ModbusData::Bit(x))
- }
- } else {
- bail!(
- "返回的数据长度不正确, register长度:{}, data长度:{}",
- vec.len(),
- words.len()
- )
- }
- }
- }
- Ok(())
- }
- async fn read_holding_register(&self, mut vec: Vec<&mut ModbusCode>) -> anyhow::Result<()> {
- let seq_slice = slice_sequential(vec.as_mut_slice(), MAX_WORD_CNT);
- for codes in seq_slice.into_iter() {
- if let Some(start_code) = codes.first() {
- let words = self
- .ctx
- .lock()
- .await
- .read_holding_registers(start_code.addr, codes.len() as u16)
- .await??;
- if words.len() == codes.len() {
- words.iter().enumerate().for_each(|(i, x)| {
- codes[i].data = Some(ModbusData::Word(*x));
- });
- } else {
- bail!(
- "返回的数据长度不正确, register长度:{}, data长度:{}",
- vec.len(),
- words.len()
- )
- }
- }
- }
- Ok(())
- }
- }
- #[async_trait]
- impl Service for GoldBms {
- async fn south(&self) {
- loop {
- tokio::time::sleep(Duration::from_millis(app_config().emu.bms.interval)).await;
- let code_clone = self.modbus_code.clone();
- let mut modbus_guard = code_clone.lock().await;
- if let Err(e) = self
- .read_discrete_input(
- modbus_guard
- .iter_mut()
- .filter(|it| it.code == RegisterType::DiscreteInput)
- .collect(),
- )
- .await
- {
- handle_modbus_error(&e, &self.ctx, "读取离散寄存器失败").await;
- }
- if let Err(e) = self
- .read_input_register(
- modbus_guard
- .iter_mut()
- .filter(|it| it.code == RegisterType::InputRegister)
- .collect(),
- )
- .await
- {
- handle_modbus_error(&e, &self.ctx, "读取输入寄存器失败").await;
- }
- if let Err(e) = self
- .read_holding_register(
- modbus_guard
- .iter_mut()
- .filter(|it| {
- it.code == RegisterType::HoldingRegister && it.rw == Some(Rw::RW)
- })
- .collect(),
- )
- .await
- {
- handle_modbus_error(&e, &self.ctx, "读取保持寄存器失败").await;
- }
- }
- }
- async fn north(&self) {}
- }
- async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
- let config = &app_config().emu.bms.gold_bms;
- let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
- connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "[BMS] ModbusTCP")
- }
- async fn handle_modbus_error(e: &Error, ctx: &Mutex<tokio_modbus::client::Context>, log_str: &str) {
- //错误是tokio_modbus的则重新连接
- if let Some(_) = e.downcast_ref::<tokio_modbus::Error>() {
- let mut ctx = ctx.lock().await;
- if let Ok(context) = connect_modbus_tcp().await {
- *ctx = context;
- }
- } else {
- error!("{log_str}: {}", e);
- }
- }
|