UDP stack based on embassy-net

This commit is contained in:
ivmarkov 2023-07-14 22:26:01 +00:00
parent 762438ca8e
commit 0eecce5f8d
6 changed files with 276 additions and 78 deletions

View file

@ -31,8 +31,9 @@ use matter::error::Error;
use matter::mdns::{DefaultMdns, DefaultMdnsRunner};
use matter::persist::FilePsm;
use matter::secure_channel::spake2p::VerifierData;
use matter::transport::network::{Ipv4Addr, Ipv6Addr};
use matter::transport::network::{Ipv4Addr, Ipv6Addr, NetworkStack};
use matter::transport::runner::{RxBuf, TransportRunner, TxBuf};
use matter::transport::udp::UdpBuffers;
use matter::utils::select::EitherUnwrap;
mod dev_att;
@ -131,6 +132,13 @@ fn run() -> Result<(), Error> {
let mut tx_buf = TxBuf::uninit();
let mut rx_buf = RxBuf::uninit();
// NOTE (no_std): If using the `embassy-net` UDP implementation, replace this dummy stack with the `embassy-net` one
// When using a custom UDP stack, remove this
let stack = NetworkStack::new();
let mut mdns_udp_buffers = UdpBuffers::new();
let mut trans_udp_buffers = UdpBuffers::new();
#[cfg(all(feature = "std", not(target_os = "espidf")))]
{
let mut buf = [0; 4096];
@ -164,6 +172,9 @@ fn run() -> Result<(), Error> {
let runner = &mut runner;
let tx_buf = &mut tx_buf;
let rx_buf = &mut rx_buf;
let stack = &stack;
let mdns_udp_buffers = &mut mdns_udp_buffers;
let trans_udp_buffers = &mut trans_udp_buffers;
info!(
"About to run wth node {:p}, handler {:p}, transport runner {:p}, mdns_runner {:p}",
@ -171,9 +182,11 @@ fn run() -> Result<(), Error> {
);
let mut fut = pin!(async move {
// NOTE (no_std): On no_std, the `run_udp` is a no-op so you might want to replace it with `run` and
// connect the pipes of the `run` method with your own UDP stack
// NOTE: If using a custom UDP stack, replace `run_udp` with `run`
// and connect the pipes of the `run` method with the custom UDP stack
let mut transport = pin!(runner.run_udp(
stack,
trans_udp_buffers,
tx_buf,
rx_buf,
CommissioningData {
@ -184,9 +197,9 @@ fn run() -> Result<(), Error> {
&handler,
));
// NOTE (no_std): On no_std, the `run_udp` is a no-op so you might want to replace it with `run` and
// connect the pipes of the `run` method with your own UDP stack
let mut mdns = pin!(mdns_runner.run_udp());
// NOTE: If using a custom UDP stack, replace `run_udp` with `run`
// and connect the pipes of the `run` method with the custom UDP stack
let mut mdns = pin!(mdns_runner.run_udp(stack, mdns_udp_buffers));
let mut save = pin!(save(matter, &psm));
select3(&mut transport, &mut mdns, &mut save).await.unwrap()

View file

@ -24,6 +24,7 @@ nightly = []
crypto_openssl = ["alloc", "openssl", "foreign-types", "hmac", "sha2"]
crypto_mbedtls = ["alloc", "mbedtls"]
crypto_rustcrypto = ["alloc", "sha2", "hmac", "pbkdf2", "hkdf", "aes", "ccm", "p256", "elliptic-curve", "crypto-bigint", "x509-cert", "rand_core"]
embassy-net = ["dep:embassy-net", "dep:embassy-net-driver", "smoltcp"]
[dependencies]
matter_macro_derive = { path = "../matter_macro_derive" }
@ -46,6 +47,9 @@ embassy-time = { version = "0.1.1", features = ["generic-queue-8"] }
embassy-sync = "0.2"
critical-section = "1.1.1"
domain = { version = "0.7.2", default_features = false, features = ["heapless"] }
embassy-net = { version = "0.1", features = ["udp", "igmp", "proto-ipv6", "medium-ethernet", "medium-ip"], optional = true }
embassy-net-driver = { version = "0.1", optional = true }
smoltcp = { version = "0.10", default-features = false, optional = true }
# STD-only dependencies
rand = { version = "0.8.5", optional = true }

View file

@ -1,17 +1,15 @@
use core::{cell::RefCell, mem::MaybeUninit, pin::pin};
use core::{cell::RefCell, pin::pin};
use domain::base::name::FromStrError;
use domain::base::{octets::ParseError, ShortBuf};
use embassy_futures::select::{select, select3};
use embassy_futures::select::select;
use embassy_time::{Duration, Timer};
use log::info;
use crate::data_model::cluster_basic_information::BasicInfoConfig;
use crate::error::{Error, ErrorCode};
use crate::transport::network::{Address, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use crate::transport::packet::{MAX_RX_BUF_SIZE, MAX_TX_BUF_SIZE};
use crate::transport::pipe::{Chunk, Pipe};
use crate::transport::udp::UdpListener;
use crate::utils::select::{EitherUnwrap, Notification};
use super::{
@ -19,18 +17,14 @@ use super::{
Service, ServiceMode,
};
const IP_BIND_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::UNSPECIFIED);
const IP_BROADCAST_ADDR: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
const IPV6_BROADCAST_ADDR: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x00fb);
const PORT: u16 = 5353;
type MdnsTxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>;
type MdnsRxBuf = MaybeUninit<[u8; MAX_RX_BUF_SIZE]>;
pub struct Mdns<'a> {
host: Host<'a>,
#[allow(unused)]
interface: u32,
dev_det: &'a BasicInfoConfig<'a>,
matter_port: u16,
@ -108,9 +102,21 @@ impl<'a> MdnsRunner<'a> {
Self(mdns)
}
pub async fn run_udp(&mut self) -> Result<(), Error> {
let mut tx_buf = MdnsTxBuf::uninit();
let mut rx_buf = MdnsRxBuf::uninit();
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub async fn run_udp<D>(
&mut self,
stack: &crate::transport::network::NetworkStack<D>,
buffers: &mut crate::transport::udp::UdpBuffers,
) -> Result<(), Error>
where
D: crate::transport::network::NetworkStackMulticastDriver
+ crate::transport::network::NetworkStackDriver
+ 'static,
{
let mut tx_buf =
core::mem::MaybeUninit::<[u8; crate::transport::packet::MAX_TX_BUF_SIZE]>::uninit();
let mut rx_buf =
core::mem::MaybeUninit::<[u8; crate::transport::packet::MAX_RX_BUF_SIZE]>::uninit();
let tx_buf = &mut tx_buf;
let rx_buf = &mut rx_buf;
@ -121,10 +127,18 @@ impl<'a> MdnsRunner<'a> {
let tx_pipe = &tx_pipe;
let rx_pipe = &rx_pipe;
let mut udp = UdpListener::new(SocketAddr::new(IP_BIND_ADDR, PORT)).await?;
let mut udp = crate::transport::udp::UdpListener::new(
stack,
crate::transport::network::SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), PORT),
buffers,
)
.await?;
udp.join_multicast_v6(IPV6_BROADCAST_ADDR, self.0.interface)?;
udp.join_multicast_v4(IP_BROADCAST_ADDR, Ipv4Addr::from(self.0.host.ip))?;
udp.join_multicast_v4(
IP_BROADCAST_ADDR,
crate::transport::network::Ipv4Addr::from(self.0.host.ip),
)?;
let udp = &udp;
@ -168,7 +182,9 @@ impl<'a> MdnsRunner<'a> {
let mut run = pin!(async move { self.run(tx_pipe, rx_pipe).await });
select3(&mut tx, &mut rx, &mut run).await.unwrap()
embassy_futures::select::select3(&mut tx, &mut rx, &mut run)
.await
.unwrap()
}
pub async fn run(&self, tx_pipe: &Pipe<'_>, rx_pipe: &Pipe<'_>) -> Result<(), Error> {

View file

@ -55,3 +55,35 @@ impl Debug for Address {
}
}
}
#[cfg(all(feature = "std", not(feature = "embassy-net")))]
pub use std_stack::*;
#[cfg(feature = "embassy-net")]
pub use embassy_stack::*;
#[cfg(all(feature = "std", not(feature = "embassy-net")))]
mod std_stack {
pub trait NetworkStackDriver {}
impl NetworkStackDriver for () {}
pub trait NetworkStackMulticastDriver {}
impl NetworkStackMulticastDriver for () {}
pub struct NetworkStack<D>(D);
impl NetworkStack<()> {
pub const fn new() -> Self {
Self(())
}
}
}
#[cfg(feature = "embassy-net")]
mod embassy_stack {
pub use embassy_net::Stack as NetworkStack;
pub use embassy_net_driver::Driver as NetworkStackDriver;
pub use smoltcp::phy::Device as NetworkStackMulticastDriver;
}

View file

@ -21,10 +21,9 @@ use crate::{
alloc,
data_model::{core::DataModel, objects::DataModelHandler},
interaction_model::core::PROTO_ID_INTERACTION_MODEL,
transport::network::{Address, IpAddr, Ipv6Addr, SocketAddr},
CommissioningData, Matter,
};
use embassy_futures::select::{select, select3, select_slice, Either};
use embassy_futures::select::{select, select_slice, Either};
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel};
use log::{error, info, warn};
@ -40,7 +39,6 @@ use super::{
exchange::{ExchangeCtr, Notification, MAX_EXCHANGES},
packet::{MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE},
pipe::{Chunk, Pipe},
udp::UdpListener,
};
pub type TxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>;
@ -103,20 +101,30 @@ impl<'a> TransportRunner<'a> {
&self.transport
}
pub async fn run_udp<H>(
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub async fn run_udp<D, H>(
&mut self,
stack: &crate::transport::network::NetworkStack<D>,
buffers: &mut crate::transport::udp::UdpBuffers,
tx_buf: &mut TxBuf,
rx_buf: &mut RxBuf,
dev_comm: CommissioningData,
handler: &H,
) -> Result<(), Error>
where
D: crate::transport::network::NetworkStackDriver,
H: DataModelHandler,
{
let udp = UdpListener::new(SocketAddr::new(
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
self.transport.matter().port,
))
let udp = crate::transport::udp::UdpListener::new(
stack,
crate::transport::network::SocketAddr::new(
crate::transport::network::IpAddr::V6(
crate::transport::network::Ipv6Addr::UNSPECIFIED,
),
self.transport.matter().port,
),
buffers,
)
.await?;
let tx_pipe = Pipe::new(unsafe { tx_buf.assume_init_mut() });
@ -154,7 +162,7 @@ impl<'a> TransportRunner<'a> {
data.chunk = Some(Chunk {
start: 0,
end: len,
addr: Address::Udp(addr),
addr: crate::transport::network::Address::Udp(addr),
});
rx_pipe.data_supplied_notification.signal(());
}
@ -166,7 +174,9 @@ impl<'a> TransportRunner<'a> {
let mut run = pin!(async move { self.run(tx_pipe, rx_pipe, dev_comm, handler).await });
select3(&mut tx, &mut rx, &mut run).await.unwrap()
embassy_futures::select::select3(&mut tx, &mut rx, &mut run)
.await
.unwrap()
}
pub async fn run<H>(

View file

@ -15,29 +15,45 @@
* limitations under the License.
*/
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(feature = "embassy-net")))]
pub use smol_udp::*;
#[cfg(not(feature = "std"))]
pub use dummy_udp::*;
#[cfg(feature = "embassy-net")]
pub use embassy_udp::*;
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(feature = "embassy-net")))]
mod smol_udp {
use crate::error::*;
use log::{debug, info, warn};
use smol::net::UdpSocket;
use crate::transport::network::{Ipv4Addr, Ipv6Addr, SocketAddr};
use crate::transport::network::{
Ipv4Addr, Ipv6Addr, NetworkStack, NetworkStackDriver, NetworkStackMulticastDriver,
SocketAddr,
};
pub struct UdpListener {
socket: UdpSocket,
pub struct UdpBuffers(());
impl UdpBuffers {
pub const fn new() -> Self {
Self(())
}
}
impl UdpListener {
pub async fn new(addr: SocketAddr) -> Result<UdpListener, Error> {
let listener = UdpListener {
socket: UdpSocket::bind((addr.ip(), addr.port())).await?,
};
pub struct UdpListener<'a, D>(UdpSocket, &'a NetworkStack<D>)
where
D: NetworkStackDriver;
impl<'a, D> UdpListener<'a, D>
where
D: NetworkStackDriver + 'a,
{
pub async fn new(
stack: &'a NetworkStack<D>,
addr: SocketAddr,
_buffers: &'a mut UdpBuffers,
) -> Result<UdpListener<'a, D>, Error> {
let listener = UdpListener(UdpSocket::bind((addr.ip(), addr.port())).await?, stack);
info!("Listening on {:?}", addr);
@ -48,8 +64,11 @@ mod smol_udp {
&mut self,
multiaddr: Ipv6Addr,
interface: u32,
) -> Result<(), Error> {
self.socket.join_multicast_v6(&multiaddr, interface)?;
) -> Result<(), Error>
where
D: NetworkStackMulticastDriver + 'static,
{
self.0.join_multicast_v6(&multiaddr, interface)?;
info!("Joined IPV6 multicast {}/{}", multiaddr, interface);
@ -60,9 +79,12 @@ mod smol_udp {
&mut self,
multiaddr: Ipv4Addr,
interface: Ipv4Addr,
) -> Result<(), Error> {
) -> Result<(), Error>
where
D: NetworkStackMulticastDriver + 'static,
{
#[cfg(not(target_os = "espidf"))]
self.socket.join_multicast_v4(multiaddr, interface)?;
self.0.join_multicast_v4(multiaddr, interface)?;
// join_multicast_v4() is broken for ESP-IDF, most likely due to wrong `ip_mreq` signature in the `libc` crate
// Note that also most *_multicast_v4 and *_multicast_v6 methods are broken as well in Rust STD for the ESP-IDF
@ -101,7 +123,7 @@ mod smol_udp {
};
esp_setsockopt(
&mut self.socket,
&mut self.0,
esp_idf_sys::IPPROTO_IP,
esp_idf_sys::IP_ADD_MEMBERSHIP,
mreq,
@ -114,18 +136,18 @@ mod smol_udp {
}
pub async fn recv(&self, in_buf: &mut [u8]) -> Result<(usize, SocketAddr), Error> {
let (size, addr) = self.socket.recv_from(in_buf).await.map_err(|e| {
let (len, addr) = self.0.recv_from(in_buf).await.map_err(|e| {
warn!("Error on the network: {:?}", e);
ErrorCode::Network
})?;
debug!("Got packet {:?} from addr {:?}", &in_buf[..size], addr);
debug!("Got packet {:?} from addr {:?}", &in_buf[..len], addr);
Ok((size, addr))
Ok((len, addr))
}
pub async fn send(&self, addr: SocketAddr, out_buf: &[u8]) -> Result<usize, Error> {
let len = self.socket.send_to(out_buf, addr).await.map_err(|e| {
let len = self.0.send_to(out_buf, addr).await.map_err(|e| {
warn!("Error on the network: {:?}", e);
ErrorCode::Network
})?;
@ -143,35 +165,92 @@ mod smol_udp {
}
}
#[cfg(not(feature = "std"))]
mod dummy_udp {
use core::future::pending;
#[cfg(feature = "embassy-net")]
mod embassy_udp {
use core::mem::MaybeUninit;
use embassy_net::udp::{PacketMetadata, UdpSocket};
use smoltcp::wire::{IpAddress, IpEndpoint, Ipv4Address, Ipv6Address};
use crate::error::*;
use log::{debug, info};
use crate::transport::network::{Ipv4Addr, Ipv6Addr, SocketAddr};
use log::{debug, info, warn};
pub struct UdpListener {}
use crate::transport::network::{
IpAddr, Ipv4Addr, Ipv6Addr, NetworkStack, NetworkStackDriver, NetworkStackMulticastDriver,
SocketAddr,
};
impl UdpListener {
pub async fn new(addr: SocketAddr) -> Result<UdpListener, Error> {
let listener = UdpListener {};
const RX_BUF_SIZE: usize = 4096;
const TX_BUF_SIZE: usize = 4096;
info!("Pretending to listen on {:?}", addr);
pub struct UdpBuffers {
rx_buffer: MaybeUninit<[u8; RX_BUF_SIZE]>,
tx_buffer: MaybeUninit<[u8; TX_BUF_SIZE]>,
rx_meta: [PacketMetadata; 16],
tx_meta: [PacketMetadata; 16],
}
Ok(listener)
impl UdpBuffers {
pub const fn new() -> Self {
Self {
rx_buffer: MaybeUninit::uninit(),
tx_buffer: MaybeUninit::uninit(),
rx_meta: [PacketMetadata::EMPTY; 16],
tx_meta: [PacketMetadata::EMPTY; 16],
}
}
}
pub struct UdpListener<'a, D>(UdpSocket<'a>, &'a NetworkStack<D>)
where
D: NetworkStackDriver;
impl<'a, D> UdpListener<'a, D>
where
D: NetworkStackDriver + 'a,
{
pub async fn new(
stack: &'a NetworkStack<D>,
addr: SocketAddr,
buffers: &'a mut UdpBuffers,
) -> Result<UdpListener<'a, D>, Error> {
let mut socket = UdpSocket::new(
stack,
&mut buffers.rx_meta,
unsafe { buffers.rx_buffer.assume_init_mut() },
&mut buffers.tx_meta,
unsafe { buffers.tx_buffer.assume_init_mut() },
);
socket.bind(addr.port()).map_err(|e| {
warn!("Error on the network: {:?}", e);
ErrorCode::Network
})?;
info!("Listening on {:?}", addr);
Ok(UdpListener(socket, stack))
}
pub fn join_multicast_v6(
&mut self,
multiaddr: Ipv6Addr,
interface: u32,
) -> Result<(), Error> {
info!(
"Pretending to join IPV6 multicast {}/{}",
multiaddr, interface
);
_interface: u32,
) -> Result<(), Error>
where
D: NetworkStackMulticastDriver + 'static,
{
self.1
.join_multicast_group(Self::from_ip_addr(IpAddr::V6(multiaddr)))
.map_err(|e| {
warn!("Error on the network: {:?}", e);
ErrorCode::Network
})?;
info!("Joined IP multicast {}", multiaddr);
Ok(())
}
@ -179,23 +258,45 @@ mod dummy_udp {
pub fn join_multicast_v4(
&mut self,
multiaddr: Ipv4Addr,
interface: Ipv4Addr,
) -> Result<(), Error> {
info!(
"Pretending to join IP multicast {}/{}",
multiaddr, interface
);
_interface: Ipv4Addr,
) -> Result<(), Error>
where
D: NetworkStackMulticastDriver + 'static,
{
self.1
.join_multicast_group(Self::from_ip_addr(IpAddr::V4(multiaddr)))
.map_err(|e| {
warn!("Error on the network: {:?}", e);
ErrorCode::Network
})?;
info!("Joined IP multicast {}", multiaddr);
Ok(())
}
pub async fn recv(&self, _in_buf: &mut [u8]) -> Result<(usize, SocketAddr), Error> {
info!("Pretending to wait for incoming packets (looping forever)");
pub async fn recv(&self, in_buf: &mut [u8]) -> Result<(usize, SocketAddr), Error> {
let (len, ep) = self.0.recv_from(in_buf).await.map_err(|e| {
warn!("Error on the network: {:?}", e);
ErrorCode::Network
})?;
pending().await
let addr = Self::to_socket_addr(ep);
debug!("Got packet {:?} from addr {:?}", &in_buf[..len], addr);
Ok((len, addr))
}
pub async fn send(&self, addr: SocketAddr, out_buf: &[u8]) -> Result<usize, Error> {
self.0
.send_to(out_buf, Self::from_socket_addr(addr))
.await
.map_err(|e| {
warn!("Error on the network: {:?}", e);
ErrorCode::Network
})?;
debug!(
"Send packet {:?} ({}/{}) to addr {:?}",
out_buf,
@ -206,5 +307,27 @@ mod dummy_udp {
Ok(out_buf.len())
}
fn to_socket_addr(ep: IpEndpoint) -> SocketAddr {
SocketAddr::new(Self::to_ip_addr(ep.addr), ep.port)
}
fn from_socket_addr(addr: SocketAddr) -> IpEndpoint {
IpEndpoint::new(Self::from_ip_addr(addr.ip()), addr.port())
}
fn to_ip_addr(ip: IpAddress) -> IpAddr {
match ip {
IpAddress::Ipv4(addr) => IpAddr::V4(Ipv4Addr::from(addr.0)),
IpAddress::Ipv6(addr) => IpAddr::V6(Ipv6Addr::from(addr.0)),
}
}
fn from_ip_addr(ip: IpAddr) -> IpAddress {
match ip {
IpAddr::V4(v4) => IpAddress::Ipv4(Ipv4Address::from_bytes(&v4.octets())),
IpAddr::V6(v6) => IpAddress::Ipv6(Ipv6Address::from_bytes(&v6.octets())),
}
}
}
}