pcs.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. use crate::cmd::config::app_config;
  2. use crate::ems::pcs::pcs_conf;
  3. use crate::ems::{Consumer, Producer, Service};
  4. use crate::internal::modbus::code::ModbusCode;
  5. use crate::internal::utils;
  6. use anyhow::bail;
  7. use log::{error, info};
  8. use std::sync::Arc;
  9. use tokio::sync::Mutex;
  10. use tokio_modbus::client::{Reader, Writer};
  11. use tokio_modbus::FunctionCode;
  12. pub struct Pcs {
  13. pub id: String,
  14. pub producer: Producer,
  15. pub consumer: Arc<Mutex<Consumer>>,
  16. pub modbus_code: Arc<Mutex<Vec<ModbusCode>>>,
  17. pub ctx: Arc<Mutex<tokio_modbus::client::Context>>,
  18. }
  19. /// Modbus-TCP连接
  20. /// FIXME 考虑重连机制
  21. async fn connect_modbus_tcp() -> anyhow::Result<tokio_modbus::client::Context> {
  22. let pcs_config = &app_config().emu.pcs;
  23. let ctx = tokio_modbus::client::tcp::connect(
  24. format!("{}:{}", pcs_config.host, pcs_config.port).parse()?,
  25. )
  26. .await?;
  27. Ok(ctx)
  28. }
  29. impl Pcs {
  30. /// 初始化PCS
  31. pub async fn new(producer: Producer, consumer: Consumer) -> anyhow::Result<Self> {
  32. let id = utils::generate_random_str(12);
  33. utils::log::init_log("inpower_iot_mgc_rs::ems::pcs::*", "pcs/pcs.log").await;
  34. let ctx = match connect_modbus_tcp().await {
  35. Ok(c) => c,
  36. Err(e) => {
  37. error!(
  38. "【PCS】Modbus Tcp {}连接失败: {}",
  39. &app_config().emu.pcs.host,
  40. e.to_string()
  41. );
  42. return Err(e);
  43. }
  44. };
  45. let modbus_code = match pcs_conf() {
  46. Ok(c) => c,
  47. Err(e) => {
  48. error!("【PCS】协议生成失败: {}", e.to_string());
  49. return Err(e);
  50. }
  51. };
  52. info!("PCS [{}] 初始化成功", id);
  53. Ok(Pcs {
  54. id,
  55. producer,
  56. consumer: Arc::new(Mutex::new(consumer)),
  57. modbus_code: Arc::new(Mutex::new(modbus_code)),
  58. ctx: Arc::new(Mutex::new(ctx)),
  59. })
  60. }
  61. async fn read_input_register(&self) -> anyhow::Result<()> {
  62. let modbus_code_clone = self.modbus_code.clone();
  63. let mut modbus_guard = modbus_code_clone.lock().await;
  64. // 从整个配置中过滤出输入寄存器集合
  65. let mut input_registers: Vec<_> = modbus_guard
  66. .iter_mut()
  67. .filter(|it| {
  68. it.addr.is_some()
  69. && it.name.is_some()
  70. && it.code == Some(FunctionCode::ReadInputRegisters.value())
  71. })
  72. .collect();
  73. if let Some(start_register) = input_registers.first() {
  74. if let Some(addr) = start_register.addr {
  75. let words = self
  76. .ctx
  77. .lock()
  78. .await
  79. .read_input_registers(addr, input_registers.len() as u16)
  80. .await??;
  81. if words.len() == input_registers.len() {
  82. for (i, word2) in words.chunks(2).enumerate() {
  83. if let Some(tcp_code) = input_registers.get_mut(i) {
  84. //大端序
  85. tcp_code.data = Some([word2[0] as u8, word2[1] as u8]);
  86. }
  87. }
  88. } else {
  89. bail!(
  90. "返回的数据长度不正确, register长度:{}, data长度:{}",
  91. input_registers.len(),
  92. words.len()
  93. )
  94. }
  95. }
  96. }
  97. Ok(())
  98. }
  99. }
  100. #[async_trait::async_trait]
  101. impl Service for Pcs {
  102. async fn read_task(&self) {
  103. let producer = self.producer.clone();
  104. loop {
  105. tokio::time::sleep(tokio::time::Duration::from_millis(
  106. app_config().emu.ems.read_interval,
  107. ))
  108. .await;
  109. match self.read_input_register().await {
  110. Ok(_) => {}
  111. Err(e) => {
  112. error!("{}", e.to_string());
  113. //断线重连
  114. let context = connect_modbus_tcp().await.unwrap();
  115. *self.ctx.lock().await = context;
  116. }
  117. }
  118. }
  119. }
  120. async fn write_task(&self) {
  121. let consumer = self.consumer.clone();
  122. let ctx = self.ctx.clone();
  123. loop {
  124. let string = consumer.lock().await.recv().await.unwrap();
  125. info!("[PCS] {}", string);
  126. let _ = ctx
  127. .lock()
  128. .await
  129. .write_multiple_registers(1000, &[0xAE10])
  130. .await;
  131. }
  132. }
  133. }