pcs.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. use crate::cmd::config::app_config;
  2. use crate::connect_or_retry;
  3. use crate::ems::{Consumer, Producer, Service};
  4. use crate::internal::modbus::code::ModbusCode;
  5. use crate::internal::modbus::{read_csv_to_code, slice_sequential};
  6. use crate::internal::utils;
  7. use anyhow::bail;
  8. use log::{error, info};
  9. use std::net::SocketAddr;
  10. use std::sync::Arc;
  11. use tokio::sync::Mutex;
  12. use tokio_modbus::client::Reader;
  13. use tokio_modbus::FunctionCode;
  14. pub struct Pcs {
  15. pub id: String,
  16. pub producer: Producer,
  17. pub consumer: Arc<Mutex<Consumer>>,
  18. pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
  19. pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
  20. }
  21. async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
  22. let config = &app_config().emu.pcs;
  23. let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?;
  24. connect_or_retry!(tokio_modbus::client::tcp::connect(addr), "ModbusTCP")
  25. }
  26. impl Pcs {
  27. /// 初始化PCS
  28. pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
  29. let id = utils::generate_random_str(12);
  30. utils::log::init_log("inpower_iot_mgc_rs::ems::pcs::*", "pcs/pcs.log").await;
  31. let ctx = connect_modbus_tcp().await?;
  32. let modbus_code = match read_csv_to_code("./config/pcs.csv") {
  33. Ok(c) => c,
  34. Err(e) => {
  35. error!("【PCS】协议生成失败: {}", e.to_string());
  36. return Err(e);
  37. }
  38. };
  39. info!("PCS [{}] 初始化成功", id);
  40. Ok(Pcs {
  41. id,
  42. producer,
  43. consumer: Arc::new(Mutex::new(consumer)),
  44. modbus_code: Arc::new(Mutex::new(modbus_code)),
  45. ctx: Arc::new(Mutex::new(ctx)),
  46. })
  47. }
  48. async fn read_input_register(&self) -> anyhow::Result<()> {
  49. let modbus_code_clone = self.modbus_code.clone();
  50. let mut modbus_guard = modbus_code_clone.lock().await;
  51. // 从整个配置中过滤出输入寄存器集合
  52. let mut input_registers: Vec<_> = modbus_guard
  53. .iter_mut()
  54. .filter(|it| {
  55. it.addr.is_some()
  56. && it.name.is_some()
  57. && it.code == Some(FunctionCode::ReadInputRegisters.value())
  58. })
  59. .collect();
  60. //连续子序列分组
  61. let seq_slice = slice_sequential(input_registers.as_mut_slice());
  62. //遍历分组
  63. for codes in seq_slice.into_iter() {
  64. if let Some(start_code) = codes.first() {
  65. if let Some(start_addr) = start_code.addr {
  66. let words = self
  67. .ctx
  68. .lock()
  69. .await
  70. //读取输入寄存器
  71. .read_input_registers(start_addr, codes.len() as u16)
  72. .await??;
  73. if words.len() == codes.len() {
  74. for (i, word) in words.chunks(2).enumerate() {
  75. //大端序
  76. codes[i].data = Some([word[0] as u8, word[1] as u8]);
  77. }
  78. } else {
  79. bail!(
  80. "返回的数据长度不正确, register长度:{}, data长度:{}",
  81. input_registers.len(),
  82. words.len()
  83. )
  84. }
  85. }
  86. }
  87. }
  88. Ok(())
  89. }
  90. // async fn read_holding_registers() -> anyhow::Result<()> {
  91. // todo!("")
  92. // }
  93. }
  94. #[async_trait::async_trait]
  95. impl Service for Pcs {
  96. async fn read_task(&self) {
  97. let _producer = self.producer.clone();
  98. loop {
  99. tokio::time::sleep(tokio::time::Duration::from_millis(
  100. app_config().emu.pcs.interval,
  101. ))
  102. .await;
  103. match self.read_input_register().await {
  104. Ok(_) => {}
  105. Err(e) => {
  106. error!("{}", e.to_string());
  107. //断线重连
  108. let context = connect_modbus_tcp().await.unwrap();
  109. *self.ctx.lock().await = context;
  110. }
  111. }
  112. }
  113. }
  114. async fn write_task(&self) {
  115. let consumer = self.consumer.clone();
  116. let _ctx = self.ctx.clone();
  117. loop {
  118. let string = consumer.lock().await.recv().await.unwrap();
  119. info!("[PCS] {}", string);
  120. }
  121. }
  122. }