From 24cdf079a6e18261e99589cdb85a1093cd3b66c8 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 17 Jul 2023 16:53:14 +0000 Subject: [PATCH] New helper APIs in Transport --- matter/src/transport/core.rs | 177 ++++++++++++++++++++++++++++++- matter/src/transport/runner.rs | 184 ++++----------------------------- 2 files changed, 196 insertions(+), 165 deletions(-) diff --git a/matter/src/transport/core.rs b/matter/src/transport/core.rs index 98c2fba..1a51b91 100644 --- a/matter/src/transport/core.rs +++ b/matter/src/transport/core.rs @@ -17,13 +17,23 @@ use core::{borrow::Borrow, cell::RefCell}; -use crate::{error::ErrorCode, secure_channel::common::OpCode, Matter}; use embassy_futures::select::select; +use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel}; use embassy_time::{Duration, Timer}; -use log::info; + +use log::{error, info, warn}; use crate::{ - error::Error, secure_channel::common::PROTO_ID_SECURE_CHANNEL, transport::packet::Packet, + alloc, + data_model::{core::DataModel, objects::DataModelHandler}, + error::{Error, ErrorCode}, + interaction_model::core::PROTO_ID_INTERACTION_MODEL, + secure_channel::{ + common::{OpCode, PROTO_ID_SECURE_CHANNEL}, + core::SecureChannel, + }, + transport::packet::Packet, + Matter, }; use super::{ @@ -32,6 +42,8 @@ use super::{ MAX_EXCHANGES, }, mrp::ReliableMessage, + packet::{MAX_RX_BUF_SIZE, MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE}, + pipe::{Chunk, Pipe}, session::SessionMgr, }; @@ -83,6 +95,165 @@ impl<'a> Transport<'a> { unimplemented!() } + #[inline(always)] + pub async fn handle_tx(&self, tx_pipe: &Pipe<'_>) -> Result<(), Error> { + loop { + loop { + { + let mut data = tx_pipe.data.lock().await; + + if data.chunk.is_none() { + let mut tx = alloc!(Packet::new_tx(data.buf)); + + if self.pull_tx(&mut tx).await? { + data.chunk = Some(Chunk { + start: tx.get_writebuf()?.get_start(), + end: tx.get_writebuf()?.get_tail(), + addr: tx.peer, + }); + tx_pipe.data_supplied_notification.signal(()); + } else { + break; + } + } + } + + tx_pipe.data_consumed_notification.wait().await; + } + + self.wait_tx().await?; + } + } + + #[inline(always)] + pub async fn handle_rx_multiplex<'t, 'e, const N: usize>( + &'t self, + rx_pipe: &Pipe<'_>, + construction_notification: &'e Notification, + channel: &Channel, N>, + ) -> Result<(), Error> + where + 't: 'e, + { + loop { + info!("Transport: waiting for incoming packets"); + + { + let mut data = rx_pipe.data.lock().await; + + if let Some(chunk) = data.chunk { + let mut rx = alloc!(Packet::new_rx(&mut data.buf[chunk.start..chunk.end])); + rx.peer = chunk.addr; + + if let Some(exchange_ctr) = + self.process_rx(construction_notification, &mut rx)? + { + let exchange_id = exchange_ctr.id().clone(); + + info!("Transport: got new exchange: {:?}", exchange_id); + + channel.send(exchange_ctr).await; + info!("Transport: exchange sent"); + + self.wait_construction(construction_notification, &rx, &exchange_id) + .await?; + + info!("Transport: exchange started"); + } + + data.chunk = None; + rx_pipe.data_consumed_notification.signal(()); + } + } + + rx_pipe.data_supplied_notification.wait().await + } + + #[allow(unreachable_code)] + Ok::<_, Error>(()) + } + + #[inline(always)] + pub async fn exchange_handler( + &self, + tx_buf: &mut [u8; MAX_TX_BUF_SIZE], + rx_buf: &mut [u8; MAX_RX_BUF_SIZE], + sx_buf: &mut [u8; MAX_RX_STATUS_BUF_SIZE], + handler_id: impl core::fmt::Display, + channel: &Channel, N>, + handler: &H, + ) -> Result<(), Error> + where + H: DataModelHandler, + { + loop { + let exchange_ctr: ExchangeCtr<'_> = channel.recv().await; + + info!( + "Handler {}: Got exchange {:?}", + handler_id, + exchange_ctr.id() + ); + + let result = self + .handle_exchange(tx_buf, rx_buf, sx_buf, exchange_ctr, handler) + .await; + + if let Err(err) = result { + warn!( + "Handler {}: Exchange closed because of error: {:?}", + handler_id, err + ); + } else { + info!("Handler {}: Exchange completed", handler_id); + } + } + } + + #[inline(always)] + #[cfg_attr(feature = "nightly", allow(clippy::await_holding_refcell_ref))] // Fine because of the async mutex + pub async fn handle_exchange( + &self, + tx_buf: &mut [u8; MAX_TX_BUF_SIZE], + rx_buf: &mut [u8; MAX_RX_BUF_SIZE], + sx_buf: &mut [u8; MAX_RX_STATUS_BUF_SIZE], + exchange_ctr: ExchangeCtr<'_>, + handler: &H, + ) -> Result<(), Error> + where + H: DataModelHandler, + { + let mut tx = alloc!(Packet::new_tx(tx_buf.as_mut())); + let mut rx = alloc!(Packet::new_rx(rx_buf.as_mut())); + + let mut exchange = alloc!(exchange_ctr.get(&mut rx).await?); + + match rx.get_proto_id() { + PROTO_ID_SECURE_CHANNEL => { + let sc = SecureChannel::new(self.matter()); + + sc.handle(&mut exchange, &mut rx, &mut tx).await?; + + self.matter().notify_changed(); + } + PROTO_ID_INTERACTION_MODEL => { + let dm = DataModel::new(handler); + + let mut rx_status = alloc!(Packet::new_rx(sx_buf)); + + dm.handle(&mut exchange, &mut rx, &mut tx, &mut rx_status) + .await?; + + self.matter().notify_changed(); + } + other => { + error!("Unknown Proto-ID: {}", other); + } + } + + Ok(()) + } + pub fn process_rx<'r>( &'r self, construction_notification: &'r Notification, diff --git a/matter/src/transport/runner.rs b/matter/src/transport/runner.rs index 373021d..d46b3e3 100644 --- a/matter/src/transport/runner.rs +++ b/matter/src/transport/runner.rs @@ -17,26 +17,17 @@ use core::{mem::MaybeUninit, pin::pin}; -use crate::{ - alloc, - data_model::{core::DataModel, objects::DataModelHandler}, - interaction_model::core::PROTO_ID_INTERACTION_MODEL, - CommissioningData, Matter, -}; use embassy_futures::select::{select, select_slice, Either}; use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel}; -use log::{error, info, warn}; -use crate::{ - error::Error, - secure_channel::{common::PROTO_ID_SECURE_CHANNEL, core::SecureChannel}, - transport::packet::{Packet, MAX_RX_BUF_SIZE}, - utils::select::EitherUnwrap, -}; +use log::{error, info}; + +use crate::{data_model::objects::DataModelHandler, CommissioningData, Matter}; +use crate::{error::Error, transport::packet::MAX_RX_BUF_SIZE, utils::select::EitherUnwrap}; use super::{ core::Transport, - exchange::{ExchangeCtr, Notification, MAX_EXCHANGES}, + exchange::{Notification, MAX_EXCHANGES}, packet::{MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE}, pipe::{Chunk, Pipe}, }; @@ -157,7 +148,7 @@ impl<'a> TransportRunner<'a> { let mut mdns = pin!(mdns_runner.run_udp(stack, &mut buffers.mdns)); let mut transport = pin!(self.run_udp(stack, &mut buffers.transport, dev_comm, handler)); - embassy_futures::select::select(&mut transport, &mut mdns) + embassy_futures::select::select(&mut mdns, &mut transport) .await .unwrap() } @@ -265,11 +256,12 @@ impl<'a> TransportRunner<'a> { &construction_notification, handler )); - let mut tx = pin!(Self::handle_tx(&self.transport, tx_pipe)); + let mut tx = pin!(self.transport.handle_tx(tx_pipe)); select(&mut rx, &mut tx).await.unwrap() } + #[inline(always)] async fn handle_rx( transport: &Transport<'_>, pools: &mut PacketPools, @@ -289,85 +281,30 @@ impl<'a> TransportRunner<'a> { info!("Handlers size: {}", core::mem::size_of_val(&handlers)); - let pools = &mut *pools as *mut _; + // Unsafely allow mutable aliasing in the packet pools by different indices + let pools: *mut PacketPools = pools; for index in 0..MAX_EXCHANGES { let channel = &channel; let handler_id = index; + let pools = unsafe { pools.as_mut() }.unwrap(); + + let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() }; + let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() }; + let sx_buf = unsafe { pools.sx[handler_id].assume_init_mut() }; + handlers - .push(async move { - loop { - let exchange_ctr: ExchangeCtr<'_> = channel.recv().await; - - info!( - "Handler {}: Got exchange {:?}", - handler_id, - exchange_ctr.id() - ); - - let result = Self::handle_exchange( - transport, - pools, - handler_id, - exchange_ctr, - handler, - ) - .await; - - if let Err(err) = result { - warn!( - "Handler {}: Exchange closed because of error: {:?}", - handler_id, err - ); - } else { - info!("Handler {}: Exchange completed", handler_id); - } - } - }) + .push( + transport + .exchange_handler(tx_buf, rx_buf, sx_buf, handler_id, channel, handler), + ) .map_err(|_| ()) .unwrap(); } - let mut rx = pin!(async { - loop { - info!("Transport: waiting for incoming packets"); - - { - let mut data = rx_pipe.data.lock().await; - - if let Some(chunk) = data.chunk { - let mut rx = alloc!(Packet::new_rx(&mut data.buf[chunk.start..chunk.end])); - rx.peer = chunk.addr; - - if let Some(exchange_ctr) = - transport.process_rx(construction_notification, &mut rx)? - { - let exchange_id = exchange_ctr.id().clone(); - - info!("Transport: got new exchange: {:?}", exchange_id); - - channel.send(exchange_ctr).await; - info!("Transport: exchange sent"); - - transport - .wait_construction(construction_notification, &rx, &exchange_id) - .await?; - - info!("Transport: exchange started"); - } - - data.chunk = None; - rx_pipe.data_consumed_notification.signal(()); - } - } - - rx_pipe.data_supplied_notification.wait().await - } - - #[allow(unreachable_code)] - Ok::<_, Error>(()) - }); + let mut rx = + pin!(transport.handle_rx_multiplex(rx_pipe, &construction_notification, &channel)); let result = select(&mut rx, select_slice(&mut handlers)).await; @@ -381,81 +318,4 @@ impl<'a> TransportRunner<'a> { Ok(()) } - - async fn handle_tx(transport: &Transport<'_>, tx_pipe: &Pipe<'_>) -> Result<(), Error> { - loop { - loop { - { - let mut data = tx_pipe.data.lock().await; - - if data.chunk.is_none() { - let mut tx = alloc!(Packet::new_tx(data.buf)); - - if transport.pull_tx(&mut tx).await? { - data.chunk = Some(Chunk { - start: tx.get_writebuf()?.get_start(), - end: tx.get_writebuf()?.get_tail(), - addr: tx.peer, - }); - tx_pipe.data_supplied_notification.signal(()); - } else { - break; - } - } - } - - tx_pipe.data_consumed_notification.wait().await; - } - - transport.wait_tx().await?; - } - } - - #[cfg_attr(feature = "nightly", allow(clippy::await_holding_refcell_ref))] // Fine because of the async mutex - async fn handle_exchange( - transport: &Transport<'_>, - pools: *mut PacketPools, - handler_id: usize, - exchange_ctr: ExchangeCtr<'_>, - handler: &H, - ) -> Result<(), Error> - where - H: DataModelHandler, - { - let pools = unsafe { pools.as_mut() }.unwrap(); - - let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() }; - let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() }; - let rx_status_buf = unsafe { pools.sx[handler_id].assume_init_mut() }; - - let mut rx = alloc!(Packet::new_rx(rx_buf.as_mut())); - let mut tx = alloc!(Packet::new_tx(tx_buf.as_mut())); - - let mut exchange = alloc!(exchange_ctr.get(&mut rx).await?); - - match rx.get_proto_id() { - PROTO_ID_SECURE_CHANNEL => { - let sc = SecureChannel::new(transport.matter()); - - sc.handle(&mut exchange, &mut rx, &mut tx).await?; - - transport.matter().notify_changed(); - } - PROTO_ID_INTERACTION_MODEL => { - let dm = DataModel::new(handler); - - let mut rx_status = alloc!(Packet::new_rx(rx_status_buf)); - - dm.handle(&mut exchange, &mut rx, &mut tx, &mut rx_status) - .await?; - - transport.matter().notify_changed(); - } - other => { - error!("Unknown Proto-ID: {}", other); - } - } - - Ok(()) - } }