From 0eecce5f8d55c2174649f81477d4f3a848f0ced9 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Fri, 14 Jul 2023 22:26:01 +0000 Subject: [PATCH] UDP stack based on embassy-net --- examples/onoff_light/src/main.rs | 25 +++- matter/Cargo.toml | 4 + matter/src/mdns/builtin.rs | 46 ++++--- matter/src/transport/network.rs | 32 +++++ matter/src/transport/runner.rs | 30 +++-- matter/src/transport/udp.rs | 217 ++++++++++++++++++++++++------- 6 files changed, 276 insertions(+), 78 deletions(-) diff --git a/examples/onoff_light/src/main.rs b/examples/onoff_light/src/main.rs index 627eda6..93eab5f 100644 --- a/examples/onoff_light/src/main.rs +++ b/examples/onoff_light/src/main.rs @@ -31,8 +31,9 @@ use matter::error::Error; use matter::mdns::{DefaultMdns, DefaultMdnsRunner}; use matter::persist::FilePsm; use matter::secure_channel::spake2p::VerifierData; -use matter::transport::network::{Ipv4Addr, Ipv6Addr}; +use matter::transport::network::{Ipv4Addr, Ipv6Addr, NetworkStack}; use matter::transport::runner::{RxBuf, TransportRunner, TxBuf}; +use matter::transport::udp::UdpBuffers; use matter::utils::select::EitherUnwrap; mod dev_att; @@ -131,6 +132,13 @@ fn run() -> Result<(), Error> { let mut tx_buf = TxBuf::uninit(); let mut rx_buf = RxBuf::uninit(); + // NOTE (no_std): If using the `embassy-net` UDP implementation, replace this dummy stack with the `embassy-net` one + // When using a custom UDP stack, remove this + let stack = NetworkStack::new(); + + let mut mdns_udp_buffers = UdpBuffers::new(); + let mut trans_udp_buffers = UdpBuffers::new(); + #[cfg(all(feature = "std", not(target_os = "espidf")))] { let mut buf = [0; 4096]; @@ -164,6 +172,9 @@ fn run() -> Result<(), Error> { let runner = &mut runner; let tx_buf = &mut tx_buf; let rx_buf = &mut rx_buf; + let stack = &stack; + let mdns_udp_buffers = &mut mdns_udp_buffers; + let trans_udp_buffers = &mut trans_udp_buffers; info!( "About to run wth node {:p}, handler {:p}, transport runner {:p}, mdns_runner {:p}", @@ -171,9 +182,11 @@ fn run() -> Result<(), Error> { ); let mut fut = pin!(async move { - // NOTE (no_std): On no_std, the `run_udp` is a no-op so you might want to replace it with `run` and - // connect the pipes of the `run` method with your own UDP stack + // NOTE: If using a custom UDP stack, replace `run_udp` with `run` + // and connect the pipes of the `run` method with the custom UDP stack let mut transport = pin!(runner.run_udp( + stack, + trans_udp_buffers, tx_buf, rx_buf, CommissioningData { @@ -184,9 +197,9 @@ fn run() -> Result<(), Error> { &handler, )); - // NOTE (no_std): On no_std, the `run_udp` is a no-op so you might want to replace it with `run` and - // connect the pipes of the `run` method with your own UDP stack - let mut mdns = pin!(mdns_runner.run_udp()); + // NOTE: If using a custom UDP stack, replace `run_udp` with `run` + // and connect the pipes of the `run` method with the custom UDP stack + let mut mdns = pin!(mdns_runner.run_udp(stack, mdns_udp_buffers)); let mut save = pin!(save(matter, &psm)); select3(&mut transport, &mut mdns, &mut save).await.unwrap() diff --git a/matter/Cargo.toml b/matter/Cargo.toml index 5b8816d..87533b5 100644 --- a/matter/Cargo.toml +++ b/matter/Cargo.toml @@ -24,6 +24,7 @@ nightly = [] crypto_openssl = ["alloc", "openssl", "foreign-types", "hmac", "sha2"] crypto_mbedtls = ["alloc", "mbedtls"] crypto_rustcrypto = ["alloc", "sha2", "hmac", "pbkdf2", "hkdf", "aes", "ccm", "p256", "elliptic-curve", "crypto-bigint", "x509-cert", "rand_core"] +embassy-net = ["dep:embassy-net", "dep:embassy-net-driver", "smoltcp"] [dependencies] matter_macro_derive = { path = "../matter_macro_derive" } @@ -46,6 +47,9 @@ embassy-time = { version = "0.1.1", features = ["generic-queue-8"] } embassy-sync = "0.2" critical-section = "1.1.1" domain = { version = "0.7.2", default_features = false, features = ["heapless"] } +embassy-net = { version = "0.1", features = ["udp", "igmp", "proto-ipv6", "medium-ethernet", "medium-ip"], optional = true } +embassy-net-driver = { version = "0.1", optional = true } +smoltcp = { version = "0.10", default-features = false, optional = true } # STD-only dependencies rand = { version = "0.8.5", optional = true } diff --git a/matter/src/mdns/builtin.rs b/matter/src/mdns/builtin.rs index 7b6f891..c869218 100644 --- a/matter/src/mdns/builtin.rs +++ b/matter/src/mdns/builtin.rs @@ -1,17 +1,15 @@ -use core::{cell::RefCell, mem::MaybeUninit, pin::pin}; +use core::{cell::RefCell, pin::pin}; use domain::base::name::FromStrError; use domain::base::{octets::ParseError, ShortBuf}; -use embassy_futures::select::{select, select3}; +use embassy_futures::select::select; use embassy_time::{Duration, Timer}; use log::info; use crate::data_model::cluster_basic_information::BasicInfoConfig; use crate::error::{Error, ErrorCode}; use crate::transport::network::{Address, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use crate::transport::packet::{MAX_RX_BUF_SIZE, MAX_TX_BUF_SIZE}; use crate::transport::pipe::{Chunk, Pipe}; -use crate::transport::udp::UdpListener; use crate::utils::select::{EitherUnwrap, Notification}; use super::{ @@ -19,18 +17,14 @@ use super::{ Service, ServiceMode, }; -const IP_BIND_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::UNSPECIFIED); - const IP_BROADCAST_ADDR: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251); const IPV6_BROADCAST_ADDR: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x00fb); const PORT: u16 = 5353; -type MdnsTxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>; -type MdnsRxBuf = MaybeUninit<[u8; MAX_RX_BUF_SIZE]>; - pub struct Mdns<'a> { host: Host<'a>, + #[allow(unused)] interface: u32, dev_det: &'a BasicInfoConfig<'a>, matter_port: u16, @@ -108,9 +102,21 @@ impl<'a> MdnsRunner<'a> { Self(mdns) } - pub async fn run_udp(&mut self) -> Result<(), Error> { - let mut tx_buf = MdnsTxBuf::uninit(); - let mut rx_buf = MdnsRxBuf::uninit(); + #[cfg(any(feature = "std", feature = "embassy-net"))] + pub async fn run_udp( + &mut self, + stack: &crate::transport::network::NetworkStack, + buffers: &mut crate::transport::udp::UdpBuffers, + ) -> Result<(), Error> + where + D: crate::transport::network::NetworkStackMulticastDriver + + crate::transport::network::NetworkStackDriver + + 'static, + { + let mut tx_buf = + core::mem::MaybeUninit::<[u8; crate::transport::packet::MAX_TX_BUF_SIZE]>::uninit(); + let mut rx_buf = + core::mem::MaybeUninit::<[u8; crate::transport::packet::MAX_RX_BUF_SIZE]>::uninit(); let tx_buf = &mut tx_buf; let rx_buf = &mut rx_buf; @@ -121,10 +127,18 @@ impl<'a> MdnsRunner<'a> { let tx_pipe = &tx_pipe; let rx_pipe = &rx_pipe; - let mut udp = UdpListener::new(SocketAddr::new(IP_BIND_ADDR, PORT)).await?; + let mut udp = crate::transport::udp::UdpListener::new( + stack, + crate::transport::network::SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), PORT), + buffers, + ) + .await?; udp.join_multicast_v6(IPV6_BROADCAST_ADDR, self.0.interface)?; - udp.join_multicast_v4(IP_BROADCAST_ADDR, Ipv4Addr::from(self.0.host.ip))?; + udp.join_multicast_v4( + IP_BROADCAST_ADDR, + crate::transport::network::Ipv4Addr::from(self.0.host.ip), + )?; let udp = &udp; @@ -168,7 +182,9 @@ impl<'a> MdnsRunner<'a> { let mut run = pin!(async move { self.run(tx_pipe, rx_pipe).await }); - select3(&mut tx, &mut rx, &mut run).await.unwrap() + embassy_futures::select::select3(&mut tx, &mut rx, &mut run) + .await + .unwrap() } pub async fn run(&self, tx_pipe: &Pipe<'_>, rx_pipe: &Pipe<'_>) -> Result<(), Error> { diff --git a/matter/src/transport/network.rs b/matter/src/transport/network.rs index ba50386..c3b71ee 100644 --- a/matter/src/transport/network.rs +++ b/matter/src/transport/network.rs @@ -55,3 +55,35 @@ impl Debug for Address { } } } + +#[cfg(all(feature = "std", not(feature = "embassy-net")))] +pub use std_stack::*; + +#[cfg(feature = "embassy-net")] +pub use embassy_stack::*; + +#[cfg(all(feature = "std", not(feature = "embassy-net")))] +mod std_stack { + pub trait NetworkStackDriver {} + + impl NetworkStackDriver for () {} + + pub trait NetworkStackMulticastDriver {} + + impl NetworkStackMulticastDriver for () {} + + pub struct NetworkStack(D); + + impl NetworkStack<()> { + pub const fn new() -> Self { + Self(()) + } + } +} + +#[cfg(feature = "embassy-net")] +mod embassy_stack { + pub use embassy_net::Stack as NetworkStack; + pub use embassy_net_driver::Driver as NetworkStackDriver; + pub use smoltcp::phy::Device as NetworkStackMulticastDriver; +} diff --git a/matter/src/transport/runner.rs b/matter/src/transport/runner.rs index c4cd4ed..ccb1034 100644 --- a/matter/src/transport/runner.rs +++ b/matter/src/transport/runner.rs @@ -21,10 +21,9 @@ use crate::{ alloc, data_model::{core::DataModel, objects::DataModelHandler}, interaction_model::core::PROTO_ID_INTERACTION_MODEL, - transport::network::{Address, IpAddr, Ipv6Addr, SocketAddr}, CommissioningData, Matter, }; -use embassy_futures::select::{select, select3, select_slice, Either}; +use embassy_futures::select::{select, select_slice, Either}; use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel}; use log::{error, info, warn}; @@ -40,7 +39,6 @@ use super::{ exchange::{ExchangeCtr, Notification, MAX_EXCHANGES}, packet::{MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE}, pipe::{Chunk, Pipe}, - udp::UdpListener, }; pub type TxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>; @@ -103,20 +101,30 @@ impl<'a> TransportRunner<'a> { &self.transport } - pub async fn run_udp( + #[cfg(any(feature = "std", feature = "embassy-net"))] + pub async fn run_udp( &mut self, + stack: &crate::transport::network::NetworkStack, + buffers: &mut crate::transport::udp::UdpBuffers, tx_buf: &mut TxBuf, rx_buf: &mut RxBuf, dev_comm: CommissioningData, handler: &H, ) -> Result<(), Error> where + D: crate::transport::network::NetworkStackDriver, H: DataModelHandler, { - let udp = UdpListener::new(SocketAddr::new( - IpAddr::V6(Ipv6Addr::UNSPECIFIED), - self.transport.matter().port, - )) + let udp = crate::transport::udp::UdpListener::new( + stack, + crate::transport::network::SocketAddr::new( + crate::transport::network::IpAddr::V6( + crate::transport::network::Ipv6Addr::UNSPECIFIED, + ), + self.transport.matter().port, + ), + buffers, + ) .await?; let tx_pipe = Pipe::new(unsafe { tx_buf.assume_init_mut() }); @@ -154,7 +162,7 @@ impl<'a> TransportRunner<'a> { data.chunk = Some(Chunk { start: 0, end: len, - addr: Address::Udp(addr), + addr: crate::transport::network::Address::Udp(addr), }); rx_pipe.data_supplied_notification.signal(()); } @@ -166,7 +174,9 @@ impl<'a> TransportRunner<'a> { let mut run = pin!(async move { self.run(tx_pipe, rx_pipe, dev_comm, handler).await }); - select3(&mut tx, &mut rx, &mut run).await.unwrap() + embassy_futures::select::select3(&mut tx, &mut rx, &mut run) + .await + .unwrap() } pub async fn run( diff --git a/matter/src/transport/udp.rs b/matter/src/transport/udp.rs index 9b23489..e9e5811 100644 --- a/matter/src/transport/udp.rs +++ b/matter/src/transport/udp.rs @@ -15,29 +15,45 @@ * limitations under the License. */ -#[cfg(feature = "std")] +#[cfg(all(feature = "std", not(feature = "embassy-net")))] pub use smol_udp::*; -#[cfg(not(feature = "std"))] -pub use dummy_udp::*; +#[cfg(feature = "embassy-net")] +pub use embassy_udp::*; -#[cfg(feature = "std")] +#[cfg(all(feature = "std", not(feature = "embassy-net")))] mod smol_udp { use crate::error::*; use log::{debug, info, warn}; use smol::net::UdpSocket; - use crate::transport::network::{Ipv4Addr, Ipv6Addr, SocketAddr}; + use crate::transport::network::{ + Ipv4Addr, Ipv6Addr, NetworkStack, NetworkStackDriver, NetworkStackMulticastDriver, + SocketAddr, + }; - pub struct UdpListener { - socket: UdpSocket, + pub struct UdpBuffers(()); + + impl UdpBuffers { + pub const fn new() -> Self { + Self(()) + } } - impl UdpListener { - pub async fn new(addr: SocketAddr) -> Result { - let listener = UdpListener { - socket: UdpSocket::bind((addr.ip(), addr.port())).await?, - }; + pub struct UdpListener<'a, D>(UdpSocket, &'a NetworkStack) + where + D: NetworkStackDriver; + + impl<'a, D> UdpListener<'a, D> + where + D: NetworkStackDriver + 'a, + { + pub async fn new( + stack: &'a NetworkStack, + addr: SocketAddr, + _buffers: &'a mut UdpBuffers, + ) -> Result, Error> { + let listener = UdpListener(UdpSocket::bind((addr.ip(), addr.port())).await?, stack); info!("Listening on {:?}", addr); @@ -48,8 +64,11 @@ mod smol_udp { &mut self, multiaddr: Ipv6Addr, interface: u32, - ) -> Result<(), Error> { - self.socket.join_multicast_v6(&multiaddr, interface)?; + ) -> Result<(), Error> + where + D: NetworkStackMulticastDriver + 'static, + { + self.0.join_multicast_v6(&multiaddr, interface)?; info!("Joined IPV6 multicast {}/{}", multiaddr, interface); @@ -60,9 +79,12 @@ mod smol_udp { &mut self, multiaddr: Ipv4Addr, interface: Ipv4Addr, - ) -> Result<(), Error> { + ) -> Result<(), Error> + where + D: NetworkStackMulticastDriver + 'static, + { #[cfg(not(target_os = "espidf"))] - self.socket.join_multicast_v4(multiaddr, interface)?; + self.0.join_multicast_v4(multiaddr, interface)?; // join_multicast_v4() is broken for ESP-IDF, most likely due to wrong `ip_mreq` signature in the `libc` crate // Note that also most *_multicast_v4 and *_multicast_v6 methods are broken as well in Rust STD for the ESP-IDF @@ -101,7 +123,7 @@ mod smol_udp { }; esp_setsockopt( - &mut self.socket, + &mut self.0, esp_idf_sys::IPPROTO_IP, esp_idf_sys::IP_ADD_MEMBERSHIP, mreq, @@ -114,18 +136,18 @@ mod smol_udp { } pub async fn recv(&self, in_buf: &mut [u8]) -> Result<(usize, SocketAddr), Error> { - let (size, addr) = self.socket.recv_from(in_buf).await.map_err(|e| { + let (len, addr) = self.0.recv_from(in_buf).await.map_err(|e| { warn!("Error on the network: {:?}", e); ErrorCode::Network })?; - debug!("Got packet {:?} from addr {:?}", &in_buf[..size], addr); + debug!("Got packet {:?} from addr {:?}", &in_buf[..len], addr); - Ok((size, addr)) + Ok((len, addr)) } pub async fn send(&self, addr: SocketAddr, out_buf: &[u8]) -> Result { - let len = self.socket.send_to(out_buf, addr).await.map_err(|e| { + let len = self.0.send_to(out_buf, addr).await.map_err(|e| { warn!("Error on the network: {:?}", e); ErrorCode::Network })?; @@ -143,35 +165,92 @@ mod smol_udp { } } -#[cfg(not(feature = "std"))] -mod dummy_udp { - use core::future::pending; +#[cfg(feature = "embassy-net")] +mod embassy_udp { + use core::mem::MaybeUninit; + + use embassy_net::udp::{PacketMetadata, UdpSocket}; + + use smoltcp::wire::{IpAddress, IpEndpoint, Ipv4Address, Ipv6Address}; use crate::error::*; - use log::{debug, info}; - use crate::transport::network::{Ipv4Addr, Ipv6Addr, SocketAddr}; + use log::{debug, info, warn}; - pub struct UdpListener {} + use crate::transport::network::{ + IpAddr, Ipv4Addr, Ipv6Addr, NetworkStack, NetworkStackDriver, NetworkStackMulticastDriver, + SocketAddr, + }; - impl UdpListener { - pub async fn new(addr: SocketAddr) -> Result { - let listener = UdpListener {}; + const RX_BUF_SIZE: usize = 4096; + const TX_BUF_SIZE: usize = 4096; - info!("Pretending to listen on {:?}", addr); + pub struct UdpBuffers { + rx_buffer: MaybeUninit<[u8; RX_BUF_SIZE]>, + tx_buffer: MaybeUninit<[u8; TX_BUF_SIZE]>, + rx_meta: [PacketMetadata; 16], + tx_meta: [PacketMetadata; 16], + } - Ok(listener) + impl UdpBuffers { + pub const fn new() -> Self { + Self { + rx_buffer: MaybeUninit::uninit(), + tx_buffer: MaybeUninit::uninit(), + + rx_meta: [PacketMetadata::EMPTY; 16], + tx_meta: [PacketMetadata::EMPTY; 16], + } + } + } + + pub struct UdpListener<'a, D>(UdpSocket<'a>, &'a NetworkStack) + where + D: NetworkStackDriver; + + impl<'a, D> UdpListener<'a, D> + where + D: NetworkStackDriver + 'a, + { + pub async fn new( + stack: &'a NetworkStack, + addr: SocketAddr, + buffers: &'a mut UdpBuffers, + ) -> Result, Error> { + let mut socket = UdpSocket::new( + stack, + &mut buffers.rx_meta, + unsafe { buffers.rx_buffer.assume_init_mut() }, + &mut buffers.tx_meta, + unsafe { buffers.tx_buffer.assume_init_mut() }, + ); + + socket.bind(addr.port()).map_err(|e| { + warn!("Error on the network: {:?}", e); + ErrorCode::Network + })?; + + info!("Listening on {:?}", addr); + + Ok(UdpListener(socket, stack)) } pub fn join_multicast_v6( &mut self, multiaddr: Ipv6Addr, - interface: u32, - ) -> Result<(), Error> { - info!( - "Pretending to join IPV6 multicast {}/{}", - multiaddr, interface - ); + _interface: u32, + ) -> Result<(), Error> + where + D: NetworkStackMulticastDriver + 'static, + { + self.1 + .join_multicast_group(Self::from_ip_addr(IpAddr::V6(multiaddr))) + .map_err(|e| { + warn!("Error on the network: {:?}", e); + ErrorCode::Network + })?; + + info!("Joined IP multicast {}", multiaddr); Ok(()) } @@ -179,23 +258,45 @@ mod dummy_udp { pub fn join_multicast_v4( &mut self, multiaddr: Ipv4Addr, - interface: Ipv4Addr, - ) -> Result<(), Error> { - info!( - "Pretending to join IP multicast {}/{}", - multiaddr, interface - ); + _interface: Ipv4Addr, + ) -> Result<(), Error> + where + D: NetworkStackMulticastDriver + 'static, + { + self.1 + .join_multicast_group(Self::from_ip_addr(IpAddr::V4(multiaddr))) + .map_err(|e| { + warn!("Error on the network: {:?}", e); + ErrorCode::Network + })?; + + info!("Joined IP multicast {}", multiaddr); Ok(()) } - pub async fn recv(&self, _in_buf: &mut [u8]) -> Result<(usize, SocketAddr), Error> { - info!("Pretending to wait for incoming packets (looping forever)"); + pub async fn recv(&self, in_buf: &mut [u8]) -> Result<(usize, SocketAddr), Error> { + let (len, ep) = self.0.recv_from(in_buf).await.map_err(|e| { + warn!("Error on the network: {:?}", e); + ErrorCode::Network + })?; - pending().await + let addr = Self::to_socket_addr(ep); + + debug!("Got packet {:?} from addr {:?}", &in_buf[..len], addr); + + Ok((len, addr)) } pub async fn send(&self, addr: SocketAddr, out_buf: &[u8]) -> Result { + self.0 + .send_to(out_buf, Self::from_socket_addr(addr)) + .await + .map_err(|e| { + warn!("Error on the network: {:?}", e); + ErrorCode::Network + })?; + debug!( "Send packet {:?} ({}/{}) to addr {:?}", out_buf, @@ -206,5 +307,27 @@ mod dummy_udp { Ok(out_buf.len()) } + + fn to_socket_addr(ep: IpEndpoint) -> SocketAddr { + SocketAddr::new(Self::to_ip_addr(ep.addr), ep.port) + } + + fn from_socket_addr(addr: SocketAddr) -> IpEndpoint { + IpEndpoint::new(Self::from_ip_addr(addr.ip()), addr.port()) + } + + fn to_ip_addr(ip: IpAddress) -> IpAddr { + match ip { + IpAddress::Ipv4(addr) => IpAddr::V4(Ipv4Addr::from(addr.0)), + IpAddress::Ipv6(addr) => IpAddr::V6(Ipv6Addr::from(addr.0)), + } + } + + fn from_ip_addr(ip: IpAddr) -> IpAddress { + match ip { + IpAddr::V4(v4) => IpAddress::Ipv4(Ipv4Address::from_bytes(&v4.octets())), + IpAddr::V6(v6) => IpAddress::Ipv6(Ipv6Address::from_bytes(&v6.octets())), + } + } } }