New helper APIs in Transport

This commit is contained in:
ivmarkov 2023-07-17 16:53:14 +00:00
parent 0d73ba74ee
commit 24cdf079a6
2 changed files with 196 additions and 165 deletions

View file

@ -17,13 +17,23 @@
use core::{borrow::Borrow, cell::RefCell};
use crate::{error::ErrorCode, secure_channel::common::OpCode, Matter};
use embassy_futures::select::select;
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel};
use embassy_time::{Duration, Timer};
use log::info;
use log::{error, info, warn};
use crate::{
error::Error, secure_channel::common::PROTO_ID_SECURE_CHANNEL, transport::packet::Packet,
alloc,
data_model::{core::DataModel, objects::DataModelHandler},
error::{Error, ErrorCode},
interaction_model::core::PROTO_ID_INTERACTION_MODEL,
secure_channel::{
common::{OpCode, PROTO_ID_SECURE_CHANNEL},
core::SecureChannel,
},
transport::packet::Packet,
Matter,
};
use super::{
@ -32,6 +42,8 @@ use super::{
MAX_EXCHANGES,
},
mrp::ReliableMessage,
packet::{MAX_RX_BUF_SIZE, MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE},
pipe::{Chunk, Pipe},
session::SessionMgr,
};
@ -83,6 +95,165 @@ impl<'a> Transport<'a> {
unimplemented!()
}
#[inline(always)]
pub async fn handle_tx(&self, tx_pipe: &Pipe<'_>) -> Result<(), Error> {
loop {
loop {
{
let mut data = tx_pipe.data.lock().await;
if data.chunk.is_none() {
let mut tx = alloc!(Packet::new_tx(data.buf));
if self.pull_tx(&mut tx).await? {
data.chunk = Some(Chunk {
start: tx.get_writebuf()?.get_start(),
end: tx.get_writebuf()?.get_tail(),
addr: tx.peer,
});
tx_pipe.data_supplied_notification.signal(());
} else {
break;
}
}
}
tx_pipe.data_consumed_notification.wait().await;
}
self.wait_tx().await?;
}
}
#[inline(always)]
pub async fn handle_rx_multiplex<'t, 'e, const N: usize>(
&'t self,
rx_pipe: &Pipe<'_>,
construction_notification: &'e Notification,
channel: &Channel<NoopRawMutex, ExchangeCtr<'e>, N>,
) -> Result<(), Error>
where
't: 'e,
{
loop {
info!("Transport: waiting for incoming packets");
{
let mut data = rx_pipe.data.lock().await;
if let Some(chunk) = data.chunk {
let mut rx = alloc!(Packet::new_rx(&mut data.buf[chunk.start..chunk.end]));
rx.peer = chunk.addr;
if let Some(exchange_ctr) =
self.process_rx(construction_notification, &mut rx)?
{
let exchange_id = exchange_ctr.id().clone();
info!("Transport: got new exchange: {:?}", exchange_id);
channel.send(exchange_ctr).await;
info!("Transport: exchange sent");
self.wait_construction(construction_notification, &rx, &exchange_id)
.await?;
info!("Transport: exchange started");
}
data.chunk = None;
rx_pipe.data_consumed_notification.signal(());
}
}
rx_pipe.data_supplied_notification.wait().await
}
#[allow(unreachable_code)]
Ok::<_, Error>(())
}
#[inline(always)]
pub async fn exchange_handler<const N: usize, H>(
&self,
tx_buf: &mut [u8; MAX_TX_BUF_SIZE],
rx_buf: &mut [u8; MAX_RX_BUF_SIZE],
sx_buf: &mut [u8; MAX_RX_STATUS_BUF_SIZE],
handler_id: impl core::fmt::Display,
channel: &Channel<NoopRawMutex, ExchangeCtr<'_>, N>,
handler: &H,
) -> Result<(), Error>
where
H: DataModelHandler,
{
loop {
let exchange_ctr: ExchangeCtr<'_> = channel.recv().await;
info!(
"Handler {}: Got exchange {:?}",
handler_id,
exchange_ctr.id()
);
let result = self
.handle_exchange(tx_buf, rx_buf, sx_buf, exchange_ctr, handler)
.await;
if let Err(err) = result {
warn!(
"Handler {}: Exchange closed because of error: {:?}",
handler_id, err
);
} else {
info!("Handler {}: Exchange completed", handler_id);
}
}
}
#[inline(always)]
#[cfg_attr(feature = "nightly", allow(clippy::await_holding_refcell_ref))] // Fine because of the async mutex
pub async fn handle_exchange<H>(
&self,
tx_buf: &mut [u8; MAX_TX_BUF_SIZE],
rx_buf: &mut [u8; MAX_RX_BUF_SIZE],
sx_buf: &mut [u8; MAX_RX_STATUS_BUF_SIZE],
exchange_ctr: ExchangeCtr<'_>,
handler: &H,
) -> Result<(), Error>
where
H: DataModelHandler,
{
let mut tx = alloc!(Packet::new_tx(tx_buf.as_mut()));
let mut rx = alloc!(Packet::new_rx(rx_buf.as_mut()));
let mut exchange = alloc!(exchange_ctr.get(&mut rx).await?);
match rx.get_proto_id() {
PROTO_ID_SECURE_CHANNEL => {
let sc = SecureChannel::new(self.matter());
sc.handle(&mut exchange, &mut rx, &mut tx).await?;
self.matter().notify_changed();
}
PROTO_ID_INTERACTION_MODEL => {
let dm = DataModel::new(handler);
let mut rx_status = alloc!(Packet::new_rx(sx_buf));
dm.handle(&mut exchange, &mut rx, &mut tx, &mut rx_status)
.await?;
self.matter().notify_changed();
}
other => {
error!("Unknown Proto-ID: {}", other);
}
}
Ok(())
}
pub fn process_rx<'r>(
&'r self,
construction_notification: &'r Notification,

View file

@ -17,26 +17,17 @@
use core::{mem::MaybeUninit, pin::pin};
use crate::{
alloc,
data_model::{core::DataModel, objects::DataModelHandler},
interaction_model::core::PROTO_ID_INTERACTION_MODEL,
CommissioningData, Matter,
};
use embassy_futures::select::{select, select_slice, Either};
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel};
use log::{error, info, warn};
use crate::{
error::Error,
secure_channel::{common::PROTO_ID_SECURE_CHANNEL, core::SecureChannel},
transport::packet::{Packet, MAX_RX_BUF_SIZE},
utils::select::EitherUnwrap,
};
use log::{error, info};
use crate::{data_model::objects::DataModelHandler, CommissioningData, Matter};
use crate::{error::Error, transport::packet::MAX_RX_BUF_SIZE, utils::select::EitherUnwrap};
use super::{
core::Transport,
exchange::{ExchangeCtr, Notification, MAX_EXCHANGES},
exchange::{Notification, MAX_EXCHANGES},
packet::{MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE},
pipe::{Chunk, Pipe},
};
@ -157,7 +148,7 @@ impl<'a> TransportRunner<'a> {
let mut mdns = pin!(mdns_runner.run_udp(stack, &mut buffers.mdns));
let mut transport = pin!(self.run_udp(stack, &mut buffers.transport, dev_comm, handler));
embassy_futures::select::select(&mut transport, &mut mdns)
embassy_futures::select::select(&mut mdns, &mut transport)
.await
.unwrap()
}
@ -265,11 +256,12 @@ impl<'a> TransportRunner<'a> {
&construction_notification,
handler
));
let mut tx = pin!(Self::handle_tx(&self.transport, tx_pipe));
let mut tx = pin!(self.transport.handle_tx(tx_pipe));
select(&mut rx, &mut tx).await.unwrap()
}
#[inline(always)]
async fn handle_rx<H>(
transport: &Transport<'_>,
pools: &mut PacketPools,
@ -289,85 +281,30 @@ impl<'a> TransportRunner<'a> {
info!("Handlers size: {}", core::mem::size_of_val(&handlers));
let pools = &mut *pools as *mut _;
// Unsafely allow mutable aliasing in the packet pools by different indices
let pools: *mut PacketPools = pools;
for index in 0..MAX_EXCHANGES {
let channel = &channel;
let handler_id = index;
let pools = unsafe { pools.as_mut() }.unwrap();
let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() };
let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() };
let sx_buf = unsafe { pools.sx[handler_id].assume_init_mut() };
handlers
.push(async move {
loop {
let exchange_ctr: ExchangeCtr<'_> = channel.recv().await;
info!(
"Handler {}: Got exchange {:?}",
handler_id,
exchange_ctr.id()
);
let result = Self::handle_exchange(
transport,
pools,
handler_id,
exchange_ctr,
handler,
)
.await;
if let Err(err) = result {
warn!(
"Handler {}: Exchange closed because of error: {:?}",
handler_id, err
);
} else {
info!("Handler {}: Exchange completed", handler_id);
}
}
})
.push(
transport
.exchange_handler(tx_buf, rx_buf, sx_buf, handler_id, channel, handler),
)
.map_err(|_| ())
.unwrap();
}
let mut rx = pin!(async {
loop {
info!("Transport: waiting for incoming packets");
{
let mut data = rx_pipe.data.lock().await;
if let Some(chunk) = data.chunk {
let mut rx = alloc!(Packet::new_rx(&mut data.buf[chunk.start..chunk.end]));
rx.peer = chunk.addr;
if let Some(exchange_ctr) =
transport.process_rx(construction_notification, &mut rx)?
{
let exchange_id = exchange_ctr.id().clone();
info!("Transport: got new exchange: {:?}", exchange_id);
channel.send(exchange_ctr).await;
info!("Transport: exchange sent");
transport
.wait_construction(construction_notification, &rx, &exchange_id)
.await?;
info!("Transport: exchange started");
}
data.chunk = None;
rx_pipe.data_consumed_notification.signal(());
}
}
rx_pipe.data_supplied_notification.wait().await
}
#[allow(unreachable_code)]
Ok::<_, Error>(())
});
let mut rx =
pin!(transport.handle_rx_multiplex(rx_pipe, &construction_notification, &channel));
let result = select(&mut rx, select_slice(&mut handlers)).await;
@ -381,81 +318,4 @@ impl<'a> TransportRunner<'a> {
Ok(())
}
async fn handle_tx(transport: &Transport<'_>, tx_pipe: &Pipe<'_>) -> Result<(), Error> {
loop {
loop {
{
let mut data = tx_pipe.data.lock().await;
if data.chunk.is_none() {
let mut tx = alloc!(Packet::new_tx(data.buf));
if transport.pull_tx(&mut tx).await? {
data.chunk = Some(Chunk {
start: tx.get_writebuf()?.get_start(),
end: tx.get_writebuf()?.get_tail(),
addr: tx.peer,
});
tx_pipe.data_supplied_notification.signal(());
} else {
break;
}
}
}
tx_pipe.data_consumed_notification.wait().await;
}
transport.wait_tx().await?;
}
}
#[cfg_attr(feature = "nightly", allow(clippy::await_holding_refcell_ref))] // Fine because of the async mutex
async fn handle_exchange<H>(
transport: &Transport<'_>,
pools: *mut PacketPools,
handler_id: usize,
exchange_ctr: ExchangeCtr<'_>,
handler: &H,
) -> Result<(), Error>
where
H: DataModelHandler,
{
let pools = unsafe { pools.as_mut() }.unwrap();
let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() };
let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() };
let rx_status_buf = unsafe { pools.sx[handler_id].assume_init_mut() };
let mut rx = alloc!(Packet::new_rx(rx_buf.as_mut()));
let mut tx = alloc!(Packet::new_tx(tx_buf.as_mut()));
let mut exchange = alloc!(exchange_ctr.get(&mut rx).await?);
match rx.get_proto_id() {
PROTO_ID_SECURE_CHANNEL => {
let sc = SecureChannel::new(transport.matter());
sc.handle(&mut exchange, &mut rx, &mut tx).await?;
transport.matter().notify_changed();
}
PROTO_ID_INTERACTION_MODEL => {
let dm = DataModel::new(handler);
let mut rx_status = alloc!(Packet::new_rx(rx_status_buf));
dm.handle(&mut exchange, &mut rx, &mut tx, &mut rx_status)
.await?;
transport.matter().notify_changed();
}
other => {
error!("Unknown Proto-ID: {}", other);
}
}
Ok(())
}
}