From 916f2148f89b0a68ea6d7137e2d1ab010377c375 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Fri, 21 Jul 2023 12:06:58 +0000 Subject: [PATCH] Simplify API by combining Matter, Transport and TransportRunner; simplify Mdns and Psm runners --- .github/workflows/test-linux.yml | 6 +- Cargo.toml | 3 +- README.md | 13 +- examples/onoff_light/src/main.rs | 99 +++---- matter/Cargo.toml | 28 +- matter/src/core.rs | 36 ++- matter/src/crypto/crypto_mbedtls.rs | 8 + matter/src/crypto/crypto_openssl.rs | 8 + matter/src/crypto/mod.rs | 28 +- matter/src/data_model/objects/metadata.rs | 17 ++ matter/src/error.rs | 6 +- matter/src/interaction_model/core.rs | 4 +- matter/src/mdns.rs | 8 +- matter/src/mdns/astro.rs | 44 ++- matter/src/mdns/builtin.rs | 68 ++--- matter/src/pairing/mod.rs | 2 +- matter/src/persist.rs | 54 +++- matter/src/secure_channel/crypto.rs | 14 +- matter/src/secure_channel/mod.rs | 14 +- matter/src/transport/core.rs | 270 +++++++++++++++--- matter/src/transport/exchange.rs | 56 ++-- matter/src/transport/mod.rs | 1 - matter/src/transport/runner.rs | 319 ---------------------- matter/src/transport/session.rs | 5 + matter/src/transport/udp.rs | 8 +- matter/tests/common/im_engine.rs | 22 +- matter/tests/data_model/timed_requests.rs | 4 +- matter_macro_derive/Cargo.toml | 1 + tools/tlv_tool/Cargo.toml | 3 +- 29 files changed, 520 insertions(+), 629 deletions(-) delete mode 100644 matter/src/transport/runner.rs diff --git a/.github/workflows/test-linux.yml b/.github/workflows/test-linux.yml index 82e2425..e08c84b 100644 --- a/.github/workflows/test-linux.yml +++ b/.github/workflows/test-linux.yml @@ -15,11 +15,11 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - crypto-backend: ['crypto_openssl', 'crypto_rustcrypto', 'crypto_mbedtls'] + crypto-backend: ['rustcrypto', 'mbedtls', 'openssl'] steps: - uses: actions/checkout@v2 - name: Build - run: cd matter; cargo build --verbose --no-default-features --features ${{matrix.crypto-backend}} + run: cd matter; cargo build --no-default-features --features ${{matrix.crypto-backend}} - name: Run tests - run: cd matter; cargo test --verbose --no-default-features --features ${{matrix.crypto-backend}} -- --test-threads=1 + run: cd matter; cargo test --no-default-features --features os,${{matrix.crypto-backend}} -- --test-threads=1 diff --git a/Cargo.toml b/Cargo.toml index 0ec37ce..6c6d58c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [workspace] +resolver = "2" members = ["matter", "matter_macro_derive"] -exclude = ["examples/*"] +exclude = ["examples/*", "tools/tlv_tool"] # For compatibility with ESP IDF [patch.crates-io] diff --git a/README.md b/README.md index 86de880..3de42e9 100644 --- a/README.md +++ b/README.md @@ -41,23 +41,24 @@ The `async` metaphor however comes with a bit higher memory usage, due to not en ## Build -Building the library: +### Building the library ``` $ cargo build ``` -Building and running the example (Linux, MacOS X): +### Building and running the example (Linux, MacOS X) ``` $ cargo run --example onoff_light ``` -Building the example (Espressif's ESP-IDF): +### Building the example (Espressif's ESP-IDF) + * Install all build prerequisites described [here](https://github.com/esp-rs/esp-idf-template#prerequisites) * Build with the following command line: ``` -export MCU=esp32; export CARGO_TARGET_XTENSA_ESP32_ESPIDF_LINKER=ldproxy; export RUSTFLAGS="-C default-linker-libraries"; export WIFI_SSID=ssid;export WIFI_PASS=pass; cargo build --example onoff_light --no-default-features --features std,crypto_rustcrypto --target xtensa-esp32-espidf -Zbuild-std=std,panic_abort +export MCU=esp32; export CARGO_TARGET_XTENSA_ESP32_ESPIDF_LINKER=ldproxy; export RUSTFLAGS="-C default-linker-libraries"; export WIFI_SSID=ssid;export WIFI_PASS=pass; cargo build --example onoff_light --no-default-features --features esp-idf --target xtensa-esp32-espidf -Zbuild-std=std,panic_abort ``` * If you are building for a different Espressif MCU, change the `MCU` variable, the `xtensa-esp32-espidf` target and the name of the `CARGO_TARGET__LINKER` variable to match your MCU and its Rust target. Available Espressif MCUs and targets are: * esp32 / xtensa-esp32-espidf @@ -69,6 +70,10 @@ export MCU=esp32; export CARGO_TARGET_XTENSA_ESP32_ESPIDF_LINKER=ldproxy; export * Put in `WIFI_SSID` / `WIFI_PASS` the SSID & password for your wireless router * Flash using the `espflash` utility described in the build prerequsites' link above +### Building the example (ESP32-XX baremetal or RP2040) + +Coming soon! + ## Test With the `chip-tool` (the current tool for testing Matter) use the Ethernet commissioning mechanism: diff --git a/examples/onoff_light/src/main.rs b/examples/onoff_light/src/main.rs index 3f2d890..e6de9b7 100644 --- a/examples/onoff_light/src/main.rs +++ b/examples/onoff_light/src/main.rs @@ -18,6 +18,7 @@ use core::borrow::Borrow; use core::pin::pin; +use embassy_futures::select::select3; use log::info; use matter::core::{CommissioningData, Matter}; use matter::data_model::cluster_basic_information::BasicInfoConfig; @@ -27,18 +28,18 @@ use matter::data_model::objects::*; use matter::data_model::root_endpoint; use matter::data_model::system_model::descriptor; use matter::error::Error; -use matter::mdns::MdnsService; -use matter::persist::FilePsm; +use matter::mdns::{MdnsRunBuffers, MdnsService}; use matter::secure_channel::spake2p::VerifierData; +use matter::transport::core::RunBuffers; use matter::transport::network::{Ipv4Addr, Ipv6Addr, NetworkStack}; -use matter::transport::runner::{AllUdpBuffers, TransportRunner}; +use matter::utils::select::EitherUnwrap; mod dev_att; #[cfg(feature = "std")] fn main() -> Result<(), Error> { let thread = std::thread::Builder::new() - .stack_size(140 * 1024) + .stack_size(150 * 1024) .spawn(run) .unwrap(); @@ -56,11 +57,11 @@ fn run() -> Result<(), Error> { initialize_logger(); info!( - "Matter memory: mDNS={}, Matter={}, TransportRunner={}, UdpBuffers={}", + "Matter memory: mDNS={}, Matter={}, MdnsBuffers={}, RunBuffers={}", core::mem::size_of::(), core::mem::size_of::(), - core::mem::size_of::(), - core::mem::size_of::(), + core::mem::size_of::(), + core::mem::size_of::(), ); let dev_det = BasicInfoConfig { @@ -73,12 +74,6 @@ fn run() -> Result<(), Error> { device_name: "OnOff Light", }; - let psm_path = std::env::temp_dir().join("matter-iot"); - info!("Persisting from/to {}", psm_path.display()); - - #[cfg(all(feature = "std", not(target_os = "espidf")))] - let psm = matter::persist::FilePsm::new(psm_path)?; - let (ipv4_addr, ipv6_addr, interface) = initialize_network()?; let dev_att = dev_att::HardCodedDevAtt::new(); @@ -106,7 +101,7 @@ fn run() -> Result<(), Error> { matter::MATTER_PORT, ); - info!("mDNS initialized: {:p}", &mdns); + info!("mDNS initialized"); let matter = Matter::new( // vid/pid should match those in the DAC @@ -118,36 +113,28 @@ fn run() -> Result<(), Error> { matter::MATTER_PORT, ); - info!("Matter initialized: {:p}", &matter); + info!("Matter initialized"); #[cfg(all(feature = "std", not(target_os = "espidf")))] - { - let mut buf = [0; 4096]; - let buf = &mut buf; - if let Some(data) = psm.load("acls", buf)? { - matter.load_acls(data)?; - } - - if let Some(data) = psm.load("fabrics", buf)? { - matter.load_fabrics(data)?; - } - } - - let mut runner = TransportRunner::new(&matter); - - info!("Transport runner initialized: {:p}", &runner); + let mut psm = matter::persist::Psm::new(&matter, std::env::temp_dir().join("matter-iot"))?; let handler = HandlerCompat(handler(&matter)); - // NOTE (no_std): If using the `embassy-net` UDP implementation, replace this dummy stack with the `embassy-net` one - // When using a custom UDP stack, remove this + // When using a custom UDP stack, remove the network stack initialization below + // and call `Matter::run_piped()` instead, by utilizing the TX & RX `Pipe` structs + // to push/pull your UDP packets from/to the Matter stack. + // Ditto for `MdnsService`. + // + // When using the `embassy-net` feature (as opposed to the Rust Standard Library network stack), + // this initialization would be more complex. let stack = NetworkStack::new(); - let mut buffers = AllUdpBuffers::new(); + let mut mdns_buffers = MdnsRunBuffers::new(); + let mut mdns_runner = pin!(mdns.run(&stack, &mut mdns_buffers)); - let mut fut = pin!(runner.run_udp_all( + let mut buffers = RunBuffers::new(); + let mut runner = matter.run( &stack, - &mdns, &mut buffers, CommissioningData { // TODO: Hard-coded for now @@ -155,16 +142,30 @@ fn run() -> Result<(), Error> { discriminator: 250, }, &handler, - )); + ); + + info!( + "Matter transport runner memory: {}", + core::mem::size_of_val(&runner) + ); + + let mut runner = pin!(runner); + + #[cfg(all(feature = "std", not(target_os = "espidf")))] + let mut psm_runner = pin!(psm.run()); + + #[cfg(not(all(feature = "std", not(target_os = "espidf"))))] + let mut psm_runner = pin!(core::future::pending()); + + let mut runner = select3(&mut runner, &mut mdns_runner, &mut psm_runner); - // NOTE: For no_std, replace with your own no_std way of polling the future #[cfg(feature = "std")] - async_io::block_on(&mut fut)?; + async_io::block_on(&mut runner).unwrap()?; // NOTE (no_std): For no_std, replace with your own more efficient no_std executor, // because the executor used below is a simple busy-loop poller #[cfg(not(feature = "std"))] - embassy_futures::block_on(&mut fut)?; + embassy_futures::block_on(&mut runner).unwrap()?; Ok(()) } @@ -268,26 +269,6 @@ fn initialize_network() -> Result<(Ipv4Addr, Ipv6Addr, u32), Error> { Ok((ip, ipv6, 0 as _)) } -#[cfg(all(feature = "std", not(target_os = "espidf")))] -#[inline(never)] -async fn save(matter: &Matter<'_>, psm: &FilePsm) -> Result<(), Error> { - let mut buf = [0; 4096]; - let buf = &mut buf; - - loop { - matter.wait_changed().await; - if matter.is_changed() { - if let Some(data) = matter.store_acls(buf)? { - psm.store("acls", data)?; - } - - if let Some(data) = matter.store_fabrics(buf)? { - psm.store("fabrics", data)?; - } - } - } -} - #[cfg(target_os = "espidf")] #[inline(never)] fn initialize_logger() { diff --git a/matter/Cargo.toml b/matter/Cargo.toml index 69ad4e8..9b5c5cc 100644 --- a/matter/Cargo.toml +++ b/matter/Cargo.toml @@ -8,23 +8,23 @@ repository = "https://github.com/kedars/matter-rs" readme = "README.md" keywords = ["matter", "smart", "smart-home", "IoT", "ESP32"] categories = ["embedded", "network-programming"] -license = "MIT" +license = "Apache-2.0" [lib] name = "matter" path = "src/lib.rs" [features] -default = ["os", "crypto_rustcrypto"] +default = ["os", "mbedtls"] os = ["std", "backtrace", "env_logger", "nix", "critical-section/std", "embassy-sync/std", "embassy-time/std"] -esp-idf = ["std", "crypto_rustcrypto", "esp-idf-sys", "esp-idf-hal", "esp-idf-svc"] +esp-idf = ["std", "rustcrypto", "esp-idf-sys"] std = ["alloc", "rand", "qrcode", "async-io", "esp-idf-sys?/std", "embassy-time/generic-queue-16"] backtrace = [] alloc = [] nightly = [] -crypto_openssl = ["alloc", "openssl", "foreign-types", "hmac", "sha2"] -crypto_mbedtls = ["alloc", "mbedtls"] -crypto_rustcrypto = ["alloc", "sha2", "hmac", "pbkdf2", "hkdf", "aes", "ccm", "p256", "elliptic-curve", "crypto-bigint", "x509-cert", "rand_core"] +openssl = ["alloc", "dep:openssl", "foreign-types", "hmac", "sha2"] +mbedtls = ["alloc", "dep:mbedtls"] +rustcrypto = ["alloc", "sha2", "hmac", "pbkdf2", "hkdf", "aes", "ccm", "p256", "elliptic-curve", "crypto-bigint", "x509-cert", "rand_core"] embassy-net = ["dep:embassy-net", "dep:embassy-net-driver", "smoltcp"] [dependencies] @@ -58,10 +58,10 @@ smoltcp = { version = "0.10", default-features = false, optional = true } # STD-only dependencies rand = { version = "0.8.5", optional = true } qrcode = { version = "0.12", default-features = false, optional = true } # Print QR code -async-io = { version = "=1.12", optional = true } # =1.2 for compatibility with ESP IDF +async-io = { version = "=1.12", optional = true } # =1.12 for compatibility with ESP IDF # crypto -openssl = { git = "https://github.com/sfackler/rust-openssl", optional = true } +openssl = { version = "0.10.55", optional = true } foreign-types = { version = "0.3.2", optional = true } # rust-crypto @@ -81,18 +81,22 @@ x509-cert = { version = "0.2.0", default-features = false, features = ["pem"], o astro-dnssd = { version = "0.3" } [target.'cfg(not(target_os = "espidf"))'.dependencies] -mbedtls = { git = "https://github.com/fortanix/rust-mbedtls", optional = true } +mbedtls = { version = "0.9", optional = true } env_logger = { version = "0.10.0", optional = true } nix = { version = "0.26", features = ["net"], optional = true } [target.'cfg(target_os = "espidf")'.dependencies] -esp-idf-sys = { version = "0.33", optional = true, default-features = false, features = ["native", "binstart"] } -esp-idf-hal = { version = "0.41", optional = true, features = ["embassy-sync", "critical-section"] } # TODO: Only necessary for the examples -esp-idf-svc = { version = "0.46", optional = true, features = ["embassy-time-driver"] } # TODO: Only necessary for the examples +esp-idf-sys = { version = "0.33", optional = true, default-features = false, features = ["native"] } [build-dependencies] embuild = "0.31.2" +[target.'cfg(target_os = "espidf")'.dev-dependencies] +esp-idf-sys = { version = "0.33", default-features = false, features = ["binstart"] } +esp-idf-hal = { version = "0.41", features = ["embassy-sync", "critical-section"] } +esp-idf-svc = { version = "0.46", features = ["embassy-time-driver"] } +embedded-svc = { version = "0.25" } + [[example]] name = "onoff_light" path = "../examples/onoff_light/src/main.rs" diff --git a/matter/src/core.rs b/matter/src/core.rs index 13c0930..f019652 100644 --- a/matter/src/core.rs +++ b/matter/src/core.rs @@ -28,8 +28,11 @@ use crate::{ mdns::Mdns, pairing::{print_pairing_code_and_qr, DiscoveryCapabilities}, secure_channel::{pake::PaseMgr, spake2p::VerifierData}, - transport::exchange::Notification, - utils::{epoch::Epoch, rand::Rand}, + transport::{ + exchange::{ExchangeCtx, MAX_EXCHANGES}, + session::SessionMgr, + }, + utils::{epoch::Epoch, rand::Rand, select::Notification}, }; /* The Matter Port */ @@ -45,17 +48,20 @@ pub struct CommissioningData { /// The primary Matter Object pub struct Matter<'a> { - pub fabric_mgr: RefCell, - pub acl_mgr: RefCell, - pub pase_mgr: RefCell, - pub failsafe: RefCell, - pub persist_notification: Notification, - pub mdns: &'a dyn Mdns, - pub epoch: Epoch, - pub rand: Rand, - pub dev_det: &'a BasicInfoConfig<'a>, - pub dev_att: &'a dyn DevAttDataFetcher, - pub port: u16, + fabric_mgr: RefCell, + pub acl_mgr: RefCell, // Public for tests + pase_mgr: RefCell, + failsafe: RefCell, + persist_notification: Notification, + pub(crate) send_notification: Notification, + mdns: &'a dyn Mdns, + pub(crate) epoch: Epoch, + pub(crate) rand: Rand, + dev_det: &'a BasicInfoConfig<'a>, + dev_att: &'a dyn DevAttDataFetcher, + pub(crate) port: u16, + pub(crate) exchanges: RefCell>, + pub session_mgr: RefCell, // Public for tests } impl<'a> Matter<'a> { @@ -94,12 +100,15 @@ impl<'a> Matter<'a> { pase_mgr: RefCell::new(PaseMgr::new(epoch, rand)), failsafe: RefCell::new(FailSafe::new()), persist_notification: Notification::new(), + send_notification: Notification::new(), mdns, epoch, rand, dev_det, dev_att, port, + exchanges: RefCell::new(heapless::Vec::new()), + session_mgr: RefCell::new(SessionMgr::new(epoch, rand)), } } @@ -160,6 +169,7 @@ impl<'a> Matter<'a> { Ok(false) } } + pub fn notify_changed(&self) { if self.is_changed() { self.persist_notification.signal(()); diff --git a/matter/src/crypto/crypto_mbedtls.rs b/matter/src/crypto/crypto_mbedtls.rs index 1eb7a88..7403e57 100644 --- a/matter/src/crypto/crypto_mbedtls.rs +++ b/matter/src/crypto/crypto_mbedtls.rs @@ -17,6 +17,8 @@ extern crate alloc; +use core::fmt::{self, Debug}; + use alloc::sync::Arc; use log::{error, info}; @@ -355,3 +357,9 @@ impl Sha256 { Ok(()) } } + +impl Debug for Sha256 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "Sha256") + } +} diff --git a/matter/src/crypto/crypto_openssl.rs b/matter/src/crypto/crypto_openssl.rs index 24fa267..a29df5c 100644 --- a/matter/src/crypto/crypto_openssl.rs +++ b/matter/src/crypto/crypto_openssl.rs @@ -15,6 +15,8 @@ * limitations under the License. */ +use core::fmt::{self, Debug}; + use crate::error::{Error, ErrorCode}; use crate::utils::rand::Rand; @@ -391,3 +393,9 @@ impl Sha256 { Ok(()) } } + +impl Debug for Sha256 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "Sha256") + } +} diff --git a/matter/src/crypto/mod.rs b/matter/src/crypto/mod.rs index 85c40b0..04584c8 100644 --- a/matter/src/crypto/mod.rs +++ b/matter/src/crypto/mod.rs @@ -37,37 +37,29 @@ pub const ECDH_SHARED_SECRET_LEN_BYTES: usize = 32; pub const EC_SIGNATURE_LEN_BYTES: usize = 64; -#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))] +#[cfg(all(feature = "mbedtls", target_os = "espidf"))] mod crypto_esp_mbedtls; -#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))] +#[cfg(all(feature = "mbedtls", target_os = "espidf"))] pub use self::crypto_esp_mbedtls::*; -#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))] +#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))] mod crypto_mbedtls; -#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))] +#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))] pub use self::crypto_mbedtls::*; -#[cfg(feature = "crypto_openssl")] +#[cfg(feature = "openssl")] mod crypto_openssl; -#[cfg(feature = "crypto_openssl")] +#[cfg(feature = "openssl")] pub use self::crypto_openssl::*; -#[cfg(feature = "crypto_rustcrypto")] +#[cfg(feature = "rustcrypto")] mod crypto_rustcrypto; -#[cfg(feature = "crypto_rustcrypto")] +#[cfg(feature = "rustcrypto")] pub use self::crypto_rustcrypto::*; -#[cfg(not(any( - feature = "crypto_openssl", - feature = "crypto_mbedtls", - feature = "crypto_rustcrypto" -)))] +#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))] pub mod crypto_dummy; -#[cfg(not(any( - feature = "crypto_openssl", - feature = "crypto_mbedtls", - feature = "crypto_rustcrypto" -)))] +#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))] pub use self::crypto_dummy::*; impl<'a> FromTLV<'a> for KeyPair { diff --git a/matter/src/data_model/objects/metadata.rs b/matter/src/data_model/objects/metadata.rs index 368ff9b..3e15612 100644 --- a/matter/src/data_model/objects/metadata.rs +++ b/matter/src/data_model/objects/metadata.rs @@ -1,3 +1,20 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::data_model::objects::Node; #[cfg(feature = "nightly")] diff --git a/matter/src/error.rs b/matter/src/error.rs index c8da820..91ba77e 100644 --- a/matter/src/error.rs +++ b/matter/src/error.rs @@ -149,7 +149,7 @@ impl From> for Error { } } -#[cfg(feature = "crypto_openssl")] +#[cfg(feature = "openssl")] impl From for Error { fn from(e: openssl::error::ErrorStack) -> Self { ::log::error!("Error in TLS: {}", e); @@ -157,7 +157,7 @@ impl From for Error { } } -#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))] +#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))] impl From for Error { fn from(e: mbedtls::Error) -> Self { ::log::error!("Error in TLS: {}", e); @@ -173,7 +173,7 @@ impl From for Error { } } -#[cfg(feature = "crypto_rustcrypto")] +#[cfg(feature = "rustcrypto")] impl From for Error { fn from(_e: ccm::aead::Error) -> Self { Self::new(ErrorCode::Crypto) diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index 4ce3583..b2d3b8c 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -609,7 +609,7 @@ impl<'a, 'r, 'p> Interaction<'a, 'r, 'p> { rx: &mut Packet<'_>, tx: &mut Packet<'_>, ) -> Result, Error> { - let epoch = exchange.transport().matter().epoch; + let epoch = exchange.matter.epoch; let mut opcode: OpCode = rx.get_proto_opcode()?; @@ -641,7 +641,7 @@ impl<'a, 'r, 'p> Interaction<'a, 'r, 'p> { where S: FnOnce() -> u32, { - let epoch = exchange.transport().matter().epoch; + let epoch = exchange.matter.epoch; let opcode = rx.get_proto_opcode()?; let rx_data = rx.as_slice(); diff --git a/matter/src/mdns.rs b/matter/src/mdns.rs index 47a0861..c66d12f 100644 --- a/matter/src/mdns.rs +++ b/matter/src/mdns.rs @@ -55,19 +55,15 @@ where } } -#[cfg(all(feature = "std", target_os = "macos"))] -pub use astro::MdnsRunner; #[cfg(all(feature = "std", target_os = "macos"))] pub use astro::MdnsService; #[cfg(all(feature = "std", target_os = "macos"))] pub use astro::MdnsUdpBuffers; -#[cfg(not(all(feature = "std", target_os = "macos")))] -pub use builtin::MdnsRunner; +#[cfg(any(feature = "std", feature = "embassy-net"))] +pub use builtin::MdnsRunBuffers; #[cfg(not(all(feature = "std", target_os = "macos")))] pub use builtin::MdnsService; -#[cfg(not(all(feature = "std", target_os = "macos")))] -pub use builtin::MdnsUdpBuffers; pub struct DummyMdns; diff --git a/matter/src/mdns/astro.rs b/matter/src/mdns/astro.rs index 1ac4331..afb933d 100644 --- a/matter/src/mdns/astro.rs +++ b/matter/src/mdns/astro.rs @@ -11,6 +11,17 @@ use log::info; use super::ServiceMode; +/// Only for API-compatibility with builtin::MdnsRunner +pub struct MdnsUdpBuffers(()); + +/// Only for API-compatibility with builtin::MdnsRunner +impl MdnsUdpBuffers { + #[inline(always)] + pub const fn new() -> Self { + Self(()) + } +} + pub struct MdnsService<'a> { dev_det: &'a BasicInfoConfig<'a>, matter_port: u16, @@ -78,16 +89,15 @@ impl<'a> MdnsService<'a> { Ok(()) } -} -/// Only for API-compatibility with builtin::MdnsRunner -pub struct MdnsUdpBuffers(()); + /// Only for API-compatibility with builtin::MdnsRunner + pub async fn run_udp(&mut self, buffers: &mut MdnsUdpBuffers) -> Result<(), Error> { + core::future::pending::>().await + } -/// Only for API-compatibility with builtin::MdnsRunner -impl MdnsUdpBuffers { - #[inline(always)] - pub const fn new() -> Self { - Self(()) + /// Only for API-compatibility with builtin::MdnsRunner + pub async fn run(&self, _tx_pipe: &Pipe<'_>, _rx_pipe: &Pipe<'_>) -> Result<(), Error> { + core::future::pending::>().await } } @@ -100,21 +110,3 @@ impl<'a> super::Mdns for MdnsService<'a> { MdnsService::remove(self, service) } } - -/// Only for API-compatibility with builtin::MdnsRunner -pub struct MdnsRunner<'a>(&'a MdnsService<'a>); - -/// Only for API-compatibility with builtin::MdnsRunner -impl<'a> MdnsRunner<'a> { - pub const fn new(mdns: &'a MdnsService<'a>) -> Self { - Self(mdns) - } - - pub async fn run_udp(&mut self, buffers: &mut MdnsUdpBuffers) -> Result<(), Error> { - core::future::pending::>().await - } - - pub async fn run(&self, _tx_pipe: &Pipe<'_>, _rx_pipe: &Pipe<'_>) -> Result<(), Error> { - core::future::pending::>().await - } -} diff --git a/matter/src/mdns/builtin.rs b/matter/src/mdns/builtin.rs index 9845b20..e799879 100644 --- a/matter/src/mdns/builtin.rs +++ b/matter/src/mdns/builtin.rs @@ -22,6 +22,25 @@ const IPV6_BROADCAST_ADDR: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x const PORT: u16 = 5353; +#[cfg(any(feature = "std", feature = "embassy-net"))] +pub struct MdnsRunBuffers { + udp: crate::transport::udp::UdpBuffers, + tx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_TX_BUF_SIZE]>, + rx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_RX_BUF_SIZE]>, +} + +#[cfg(any(feature = "std", feature = "embassy-net"))] +impl MdnsRunBuffers { + #[inline(always)] + pub const fn new() -> Self { + Self { + udp: crate::transport::udp::UdpBuffers::new(), + tx_buf: core::mem::MaybeUninit::uninit(), + rx_buf: core::mem::MaybeUninit::uninit(), + } + } +} + pub struct MdnsService<'a> { host: Host<'a>, #[allow(unused)] @@ -100,39 +119,12 @@ impl<'a> MdnsService<'a> { Ok(()) } -} - -#[cfg(any(feature = "std", feature = "embassy-net"))] -pub struct MdnsUdpBuffers { - udp: crate::transport::udp::UdpBuffers, - tx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_TX_BUF_SIZE]>, - rx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_RX_BUF_SIZE]>, -} - -#[cfg(any(feature = "std", feature = "embassy-net"))] -impl MdnsUdpBuffers { - #[inline(always)] - pub const fn new() -> Self { - Self { - udp: crate::transport::udp::UdpBuffers::new(), - tx_buf: core::mem::MaybeUninit::uninit(), - rx_buf: core::mem::MaybeUninit::uninit(), - } - } -} - -pub struct MdnsRunner<'a>(&'a MdnsService<'a>); - -impl<'a> MdnsRunner<'a> { - pub const fn new(mdns: &'a MdnsService<'a>) -> Self { - Self(mdns) - } #[cfg(any(feature = "std", feature = "embassy-net"))] - pub async fn run_udp( - &mut self, + pub async fn run( + &self, stack: &crate::transport::network::NetworkStack, - buffers: &mut MdnsUdpBuffers, + buffers: &mut MdnsRunBuffers, ) -> Result<(), Error> where D: crate::transport::network::NetworkStackDriver, @@ -146,14 +138,14 @@ impl<'a> MdnsRunner<'a> { // V6 multicast does not work with smoltcp yet (see https://github.com/smoltcp-rs/smoltcp/pull/602) #[cfg(not(feature = "embassy-net"))] - if let Some(interface) = self.0.interface { + if let Some(interface) = self.interface { udp.join_multicast_v6(IPV6_BROADCAST_ADDR, interface) .await?; } udp.join_multicast_v4( IP_BROADCAST_ADDR, - crate::transport::network::Ipv4Addr::from(self.0.host.ip), + crate::transport::network::Ipv4Addr::from(self.host.ip), ) .await?; @@ -202,14 +194,14 @@ impl<'a> MdnsRunner<'a> { } }); - let mut run = pin!(async move { self.run(tx_pipe, rx_pipe).await }); + let mut run = pin!(async move { self.run_piped(tx_pipe, rx_pipe).await }); embassy_futures::select::select3(&mut tx, &mut rx, &mut run) .await .unwrap() } - pub async fn run(&self, tx_pipe: &Pipe<'_>, rx_pipe: &Pipe<'_>) -> Result<(), Error> { + pub async fn run_piped(&self, tx_pipe: &Pipe<'_>, rx_pipe: &Pipe<'_>) -> Result<(), Error> { let mut broadcast = pin!(self.broadcast(tx_pipe)); let mut respond = pin!(self.respond(rx_pipe, tx_pipe)); @@ -220,7 +212,7 @@ impl<'a> MdnsRunner<'a> { async fn broadcast(&self, tx_pipe: &Pipe<'_>) -> Result<(), Error> { loop { select( - self.0.notification.wait(), + self.notification.wait(), Timer::after(Duration::from_secs(30)), ) .await; @@ -229,13 +221,13 @@ impl<'a> MdnsRunner<'a> { IpAddr::V4(IP_BROADCAST_ADDR), IpAddr::V6(IPV6_BROADCAST_ADDR), ] { - if self.0.interface.is_some() || addr == IpAddr::V4(IP_BROADCAST_ADDR) { + if self.interface.is_some() || addr == IpAddr::V4(IP_BROADCAST_ADDR) { loop { let sent = { let mut data = tx_pipe.data.lock().await; if data.chunk.is_none() { - let len = self.0.host.broadcast(&self.0, data.buf, 60)?; + let len = self.host.broadcast(self, data.buf, 60)?; if len > 0 { info!("Broadasting mDNS entry to {}:{}", addr, PORT); @@ -280,7 +272,7 @@ impl<'a> MdnsRunner<'a> { let mut tx_data = tx_pipe.data.lock().await; if tx_data.chunk.is_none() { - let len = self.0.host.respond(&self.0, data, tx_data.buf, 60)?; + let len = self.host.respond(self, data, tx_data.buf, 60)?; if len > 0 { info!("Replying to mDNS query from {}", rx_chunk.addr); diff --git a/matter/src/pairing/mod.rs b/matter/src/pairing/mod.rs index 253062e..f5cb05d 100644 --- a/matter/src/pairing/mod.rs +++ b/matter/src/pairing/mod.rs @@ -96,7 +96,7 @@ pub fn print_pairing_code_and_qr( Ok(()) } -pub(self) fn passwd_from_comm_data(comm_data: &CommissioningData) -> u32 { +fn passwd_from_comm_data(comm_data: &CommissioningData) -> u32 { // todo: should this be part of the comm_data implementation? match comm_data.verifier.data { VerifierOption::Password(pwd) => pwd, diff --git a/matter/src/persist.rs b/matter/src/persist.rs index d9a2733..a25b13a 100644 --- a/matter/src/persist.rs +++ b/matter/src/persist.rs @@ -15,31 +15,63 @@ * limitations under the License. */ #[cfg(feature = "std")] -pub use file_psm::*; +pub use fileio::*; #[cfg(feature = "std")] -mod file_psm { +pub mod fileio { use std::fs; use std::io::{Read, Write}; - use std::path::PathBuf; + use std::path::{Path, PathBuf}; use log::info; use crate::error::{Error, ErrorCode}; + use crate::Matter; - pub struct FilePsm { + pub struct Psm<'a> { + matter: &'a Matter<'a>, dir: PathBuf, + buf: [u8; 4096], } - impl FilePsm { - pub fn new(dir: PathBuf) -> Result { + impl<'a> Psm<'a> { + #[inline(always)] + pub fn new(matter: &'a Matter<'a>, dir: PathBuf) -> Result { fs::create_dir_all(&dir)?; - Ok(Self { dir }) + info!("Persisting from/to {}", dir.display()); + + let mut buf = [0; 4096]; + + if let Some(data) = Self::load(&dir, "acls", &mut buf)? { + matter.load_acls(data)?; + } + + if let Some(data) = Self::load(&dir, "fabrics", &mut buf)? { + matter.load_fabrics(data)?; + } + + Ok(Self { matter, dir, buf }) } - pub fn load<'a>(&self, key: &str, buf: &'a mut [u8]) -> Result, Error> { - let path = self.dir.join(key); + pub async fn run(&mut self) -> Result<(), Error> { + loop { + self.matter.wait_changed().await; + + if self.matter.is_changed() { + if let Some(data) = self.matter.store_acls(&mut self.buf)? { + Self::store(&self.dir, "acls", data)?; + } + + if let Some(data) = self.matter.store_fabrics(&mut self.buf)? { + Self::store(&self.dir, "fabrics", data)?; + } + } + } + } + + fn load<'b>(dir: &Path, key: &str, buf: &'b mut [u8]) -> Result, Error> { + let path = dir.join(key); match fs::File::open(path) { Ok(mut file) => { @@ -69,8 +101,8 @@ mod file_psm { } } - pub fn store(&self, key: &str, data: &[u8]) -> Result<(), Error> { - let path = self.dir.join(key); + fn store(dir: &Path, key: &str, data: &[u8]) -> Result<(), Error> { + let path = dir.join(key); let mut file = fs::File::create(path)?; diff --git a/matter/src/secure_channel/crypto.rs b/matter/src/secure_channel/crypto.rs index 027db69..8fa9ec5 100644 --- a/matter/src/secure_channel/crypto.rs +++ b/matter/src/secure_channel/crypto.rs @@ -15,17 +15,13 @@ * limitations under the License. */ -#[cfg(not(any( - feature = "crypto_openssl", - feature = "crypto_mbedtls", - feature = "crypto_rustcrypto" -)))] +#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))] pub use super::crypto_dummy::CryptoSpake2; -#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))] +#[cfg(all(feature = "mbedtls", target_os = "espidf"))] pub use super::crypto_esp_mbedtls::CryptoSpake2; -#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))] +#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))] pub use super::crypto_mbedtls::CryptoSpake2; -#[cfg(feature = "crypto_openssl")] +#[cfg(feature = "openssl")] pub use super::crypto_openssl::CryptoSpake2; -#[cfg(feature = "crypto_rustcrypto")] +#[cfg(feature = "rustcrypto")] pub use super::crypto_rustcrypto::CryptoSpake2; diff --git a/matter/src/secure_channel/mod.rs b/matter/src/secure_channel/mod.rs index 58020b4..9b538b6 100644 --- a/matter/src/secure_channel/mod.rs +++ b/matter/src/secure_channel/mod.rs @@ -17,19 +17,15 @@ pub mod case; pub mod common; -#[cfg(not(any( - feature = "crypto_openssl", - feature = "crypto_mbedtls", - feature = "crypto_rustcrypto" -)))] +#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))] mod crypto_dummy; -#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))] +#[cfg(all(feature = "mbedtls", target_os = "espidf"))] mod crypto_esp_mbedtls; -#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))] +#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))] mod crypto_mbedtls; -#[cfg(feature = "crypto_openssl")] +#[cfg(feature = "openssl")] pub mod crypto_openssl; -#[cfg(feature = "crypto_rustcrypto")] +#[cfg(feature = "rustcrypto")] pub mod crypto_rustcrypto; pub mod core; diff --git a/matter/src/transport/core.rs b/matter/src/transport/core.rs index 1a51b91..0874736 100644 --- a/matter/src/transport/core.rs +++ b/matter/src/transport/core.rs @@ -15,14 +15,18 @@ * limitations under the License. */ -use core::{borrow::Borrow, cell::RefCell}; +use core::borrow::Borrow; +use core::mem::MaybeUninit; +use core::pin::pin; -use embassy_futures::select::select; +use embassy_futures::select::{select, select_slice, Either}; use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel}; use embassy_time::{Duration, Timer}; use log::{error, info, warn}; +use crate::utils::select::Notification; +use crate::CommissioningData; use crate::{ alloc, data_model::{core::DataModel, objects::DataModelHandler}, @@ -33,18 +37,17 @@ use crate::{ core::SecureChannel, }, transport::packet::Packet, + utils::select::EitherUnwrap, Matter, }; use super::{ exchange::{ - Exchange, ExchangeCtr, ExchangeCtx, ExchangeId, ExchangeState, Notification, Role, - MAX_EXCHANGES, + Exchange, ExchangeCtr, ExchangeCtx, ExchangeId, ExchangeState, Role, MAX_EXCHANGES, }, mrp::ReliableMessage, packet::{MAX_RX_BUF_SIZE, MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE}, pipe::{Chunk, Pipe}, - session::SessionMgr, }; #[derive(Debug)] @@ -66,33 +69,216 @@ impl From for OpCodeDescriptor { } } -pub struct Transport<'a> { - matter: &'a Matter<'a>, - pub(crate) exchanges: RefCell>, - pub(crate) send_notification: Notification, - pub session_mgr: RefCell, +type TxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>; +type RxBuf = MaybeUninit<[u8; MAX_RX_BUF_SIZE]>; +type SxBuf = MaybeUninit<[u8; MAX_RX_STATUS_BUF_SIZE]>; + +#[cfg(any(feature = "std", feature = "embassy-net"))] +pub struct RunBuffers { + udp_bufs: crate::transport::udp::UdpBuffers, + run_bufs: PacketBuffers, + tx_buf: TxBuf, + rx_buf: RxBuf, } -impl<'a> Transport<'a> { +#[cfg(any(feature = "std", feature = "embassy-net"))] +impl RunBuffers { #[inline(always)] - pub fn new(matter: &'a Matter<'a>) -> Self { - let epoch = matter.epoch; - let rand = matter.rand; - + pub const fn new() -> Self { Self { - matter, - exchanges: RefCell::new(heapless::Vec::new()), - send_notification: Notification::new(), - session_mgr: RefCell::new(SessionMgr::new(epoch, rand)), + udp_bufs: crate::transport::udp::UdpBuffers::new(), + run_bufs: PacketBuffers::new(), + tx_buf: core::mem::MaybeUninit::uninit(), + rx_buf: core::mem::MaybeUninit::uninit(), } } +} - pub fn matter(&self) -> &'a Matter<'a> { - self.matter +pub struct PacketBuffers { + tx: [TxBuf; MAX_EXCHANGES], + rx: [RxBuf; MAX_EXCHANGES], + sx: [SxBuf; MAX_EXCHANGES], +} + +impl PacketBuffers { + const TX_ELEM: TxBuf = MaybeUninit::uninit(); + const RX_ELEM: RxBuf = MaybeUninit::uninit(); + const SX_ELEM: SxBuf = MaybeUninit::uninit(); + + const TX_INIT: [TxBuf; MAX_EXCHANGES] = [Self::TX_ELEM; MAX_EXCHANGES]; + const RX_INIT: [RxBuf; MAX_EXCHANGES] = [Self::RX_ELEM; MAX_EXCHANGES]; + const SX_INIT: [SxBuf; MAX_EXCHANGES] = [Self::SX_ELEM; MAX_EXCHANGES]; + + #[inline(always)] + pub const fn new() -> Self { + Self { + tx: Self::TX_INIT, + rx: Self::RX_INIT, + sx: Self::SX_INIT, + } + } +} + +impl<'a> Matter<'a> { + #[cfg(any(feature = "std", feature = "embassy-net"))] + pub async fn run( + &self, + stack: &crate::transport::network::NetworkStack, + buffers: &mut RunBuffers, + dev_comm: CommissioningData, + handler: &H, + ) -> Result<(), Error> + where + D: crate::transport::network::NetworkStackDriver, + H: DataModelHandler, + { + let udp = crate::transport::udp::UdpListener::new( + stack, + crate::transport::network::SocketAddr::new( + crate::transport::network::IpAddr::V6( + crate::transport::network::Ipv6Addr::UNSPECIFIED, + ), + self.port, + ), + &mut buffers.udp_bufs, + ) + .await?; + + let tx_pipe = Pipe::new(unsafe { buffers.tx_buf.assume_init_mut() }); + let rx_pipe = Pipe::new(unsafe { buffers.rx_buf.assume_init_mut() }); + + let tx_pipe = &tx_pipe; + let rx_pipe = &rx_pipe; + let udp = &udp; + let run_bufs = &mut buffers.run_bufs; + + let mut tx = pin!(async move { + loop { + { + let mut data = tx_pipe.data.lock().await; + + if let Some(chunk) = data.chunk { + udp.send(chunk.addr.unwrap_udp(), &data.buf[chunk.start..chunk.end]) + .await?; + data.chunk = None; + tx_pipe.data_consumed_notification.signal(()); + } + } + + tx_pipe.data_supplied_notification.wait().await; + } + }); + + let mut rx = pin!(async move { + loop { + { + let mut data = rx_pipe.data.lock().await; + + if data.chunk.is_none() { + let (len, addr) = udp.recv(data.buf).await?; + + data.chunk = Some(Chunk { + start: 0, + end: len, + addr: crate::transport::network::Address::Udp(addr), + }); + rx_pipe.data_supplied_notification.signal(()); + } + } + + rx_pipe.data_consumed_notification.wait().await; + } + }); + + let mut run = pin!(async move { + self.run_piped(run_bufs, tx_pipe, rx_pipe, dev_comm, handler) + .await + }); + + embassy_futures::select::select3(&mut tx, &mut rx, &mut run) + .await + .unwrap() } - pub async fn initiate(&self, _fabric_id: u64, _node_id: u64) -> Result, Error> { - unimplemented!() + pub async fn run_piped( + &self, + buffers: &mut PacketBuffers, + tx_pipe: &Pipe<'_>, + rx_pipe: &Pipe<'_>, + dev_comm: CommissioningData, + handler: &H, + ) -> Result<(), Error> + where + H: DataModelHandler, + { + info!("Running Matter transport"); + + let buf = unsafe { buffers.rx[0].assume_init_mut() }; + + if self.start_comissioning(dev_comm, buf)? { + info!("Comissioning started"); + } + + let construction_notification = Notification::new(); + + let mut rx = pin!(self.handle_rx(buffers, rx_pipe, &construction_notification, handler)); + let mut tx = pin!(self.handle_tx(tx_pipe)); + + select(&mut rx, &mut tx).await.unwrap() + } + + #[inline(always)] + async fn handle_rx( + &self, + buffers: &mut PacketBuffers, + rx_pipe: &Pipe<'_>, + construction_notification: &Notification, + handler: &H, + ) -> Result<(), Error> + where + H: DataModelHandler, + { + info!("Creating queue for {} exchanges", 1); + + let channel = Channel::::new(); + + info!("Creating {} handlers", MAX_EXCHANGES); + let mut handlers = heapless::Vec::<_, MAX_EXCHANGES>::new(); + + info!("Handlers size: {}", core::mem::size_of_val(&handlers)); + + // Unsafely allow mutable aliasing in the packet pools by different indices + let pools: *mut PacketBuffers = buffers; + + for index in 0..MAX_EXCHANGES { + let channel = &channel; + let handler_id = index; + + let pools = unsafe { pools.as_mut() }.unwrap(); + + let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() }; + let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() }; + let sx_buf = unsafe { pools.sx[handler_id].assume_init_mut() }; + + handlers + .push(self.exchange_handler(tx_buf, rx_buf, sx_buf, handler_id, channel, handler)) + .map_err(|_| ()) + .unwrap(); + } + + let mut rx = pin!(self.handle_rx_multiplex(rx_pipe, construction_notification, &channel)); + + let result = select(&mut rx, select_slice(&mut handlers)).await; + + if let Either::First(result) = result { + if let Err(e) = &result { + error!("Exitting RX loop due to an error: {:?}", e); + } + + result?; + } + + Ok(()) } #[inline(always)] @@ -230,11 +416,11 @@ impl<'a> Transport<'a> { match rx.get_proto_id() { PROTO_ID_SECURE_CHANNEL => { - let sc = SecureChannel::new(self.matter()); + let sc = SecureChannel::new(self); sc.handle(&mut exchange, &mut rx, &mut tx).await?; - self.matter().notify_changed(); + self.notify_changed(); } PROTO_ID_INTERACTION_MODEL => { let dm = DataModel::new(handler); @@ -244,7 +430,7 @@ impl<'a> Transport<'a> { dm.handle(&mut exchange, &mut rx, &mut tx, &mut rx_status) .await?; - self.matter().notify_changed(); + self.notify_changed(); } other => { error!("Unknown Proto-ID: {}", other); @@ -254,6 +440,11 @@ impl<'a> Transport<'a> { Ok(()) } + pub fn reset_transport(&self) { + self.exchanges.borrow_mut().clear(); + self.session_mgr.borrow_mut().reset(); + } + pub fn process_rx<'r>( &'r self, construction_notification: &'r Notification, @@ -297,7 +488,7 @@ impl<'a> Transport<'a> { } } - self.matter().notify_changed(); + self.notify_changed(); } } @@ -305,13 +496,13 @@ impl<'a> Transport<'a> { let constructor = ExchangeCtr { exchange: Exchange { id: ctx.id.clone(), - transport: self, + matter: self, notification: Notification::new(), }, construction_notification, }; - self.matter().notify_changed(); + self.notify_changed(); Ok(Some(constructor)) } else if src_rx.proto.proto_id == PROTO_ID_SECURE_CHANNEL @@ -338,7 +529,7 @@ impl<'a> Transport<'a> { } } - self.matter().notify_changed(); + self.notify_changed(); Ok(None) } @@ -354,7 +545,7 @@ impl<'a> Transport<'a> { let mut exchanges = self.exchanges.borrow_mut(); - let ctx = Self::get(&mut exchanges, exchange_id).unwrap(); + let ctx = ExchangeCtx::get(&mut exchanges, exchange_id).unwrap(); let state = &mut ctx.state; @@ -397,11 +588,11 @@ impl<'a> Transport<'a> { // .. // } | ExchangeState::Complete { .. } // | ExchangeState::CompleteAcknowledge { .. } - ) || ctx.mrp.is_ack_ready(*self.matter.borrow()) + ) || ctx.mrp.is_ack_ready(*self.borrow()) }); if let Some(ctx) = ctx { - self.matter().notify_changed(); + self.notify_changed(); let state = &mut ctx.state; @@ -460,7 +651,7 @@ impl<'a> Transport<'a> { dest_tx.log("Sending packet"); self.pre_send(ctx, dest_tx)?; - self.matter().notify_changed(); + self.notify_changed(); return Ok(true); } @@ -500,7 +691,7 @@ impl<'a> Transport<'a> { let session = session_mgr.mut_by_index(sess_index).unwrap(); // Decrypt the message - session.recv(self.matter.epoch, rx)?; + session.recv(self.epoch, rx)?; // Get the exchange // TODO: Handle out of space @@ -513,7 +704,7 @@ impl<'a> Transport<'a> { )?; // Message Reliability Protocol - exch.mrp.recv(rx, self.matter.epoch)?; + exch.mrp.recv(rx, self.epoch)?; Ok((exch, new)) } @@ -576,11 +767,4 @@ impl<'a> Transport<'a> { Err(ErrorCode::NoExchange.into()) } } - - pub(crate) fn get<'r>( - exchanges: &'r mut heapless::Vec, - id: &ExchangeId, - ) -> Option<&'r mut ExchangeCtx> { - exchanges.iter_mut().find(|exchange| exchange.id == *id) - } } diff --git a/matter/src/transport/exchange.rs b/matter/src/transport/exchange.rs index fbe3d7a..585e458 100644 --- a/matter/src/transport/exchange.rs +++ b/matter/src/transport/exchange.rs @@ -1,13 +1,11 @@ -use embassy_sync::blocking_mutex::raw::NoopRawMutex; - use crate::{ acl::Accessor, error::{Error, ErrorCode}, + utils::select::Notification, Matter, }; use super::{ - core::Transport, mrp::ReliableMessage, network::Address, packet::Packet, @@ -16,8 +14,6 @@ use super::{ pub const MAX_EXCHANGES: usize = 8; -pub type Notification = embassy_sync::signal::Signal; - #[derive(Debug, PartialEq, Eq, Copy, Clone, Default)] pub(crate) enum Role { #[default] @@ -43,6 +39,15 @@ pub(crate) struct ExchangeCtx { pub(crate) state: ExchangeState, } +impl ExchangeCtx { + pub(crate) fn get<'r>( + exchanges: &'r mut heapless::Vec, + id: &ExchangeId, + ) -> Option<&'r mut ExchangeCtx> { + exchanges.iter_mut().find(|exchange| exchange.id == *id) + } +} + #[derive(Debug, Clone)] pub(crate) enum ExchangeState { Construction { @@ -144,7 +149,7 @@ impl SessionId { } pub struct Exchange<'a> { pub(crate) id: ExchangeId, - pub(crate) transport: &'a Transport<'a>, + pub(crate) matter: &'a Matter<'a>, pub(crate) notification: Notification, } @@ -153,21 +158,8 @@ impl<'a> Exchange<'a> { &self.id } - pub fn matter(&self) -> &Matter<'a> { - self.transport.matter() - } - - pub fn transport(&self) -> &Transport<'a> { - self.transport - } - pub fn accessor(&self) -> Result, Error> { - self.with_session(|sess| { - Ok(Accessor::for_session( - sess, - &self.transport.matter().acl_mgr, - )) - }) + self.with_session(|sess| Ok(Accessor::for_session(sess, &self.matter.acl_mgr))) } pub fn with_session_mut(&self, f: F) -> Result @@ -175,7 +167,7 @@ impl<'a> Exchange<'a> { F: FnOnce(&mut Session) -> Result, { self.with_ctx(|_self, ctx| { - let mut session_mgr = _self.transport.session_mgr.borrow_mut(); + let mut session_mgr = _self.matter.session_mgr.borrow_mut(); let sess_index = session_mgr .get( @@ -201,15 +193,11 @@ impl<'a> Exchange<'a> { where F: FnOnce(&mut SessionMgr) -> Result, { - let mut session_mgr = self.transport.session_mgr.borrow_mut(); + let mut session_mgr = self.matter.session_mgr.borrow_mut(); f(&mut session_mgr) } - pub async fn initiate(&mut self, fabric_id: u64, node_id: u64) -> Result, Error> { - self.transport.initiate(fabric_id, node_id).await - } - pub async fn acknowledge(&mut self) -> Result<(), Error> { let wait = self.with_ctx_mut(|_self, ctx| { if !matches!(ctx.state, ExchangeState::Active) { @@ -222,7 +210,7 @@ impl<'a> Exchange<'a> { ctx.state = ExchangeState::Acknowledge { notification: &_self.notification as *const _, }; - _self.transport.send_notification.signal(()); + _self.matter.send_notification.signal(()); Ok(true) } @@ -249,7 +237,7 @@ impl<'a> Exchange<'a> { rx: rx as *mut _, notification: &_self.notification as *const _, }; - _self.transport.send_notification.signal(()); + _self.matter.send_notification.signal(()); Ok(()) })?; @@ -275,7 +263,7 @@ impl<'a> Exchange<'a> { tx: tx as *const _, notification: &_self.notification as *const _, }; - _self.transport.send_notification.signal(()); + _self.matter.send_notification.signal(()); Ok(()) })?; @@ -289,9 +277,9 @@ impl<'a> Exchange<'a> { where F: FnOnce(&Self, &ExchangeCtx) -> Result, { - let mut exchanges = self.transport.exchanges.borrow_mut(); + let mut exchanges = self.matter.exchanges.borrow_mut(); - let exchange = Transport::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO + let exchange = ExchangeCtx::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO f(self, exchange) } @@ -300,9 +288,9 @@ impl<'a> Exchange<'a> { where F: FnOnce(&mut Self, &mut ExchangeCtx) -> Result, { - let mut exchanges = self.transport.exchanges.borrow_mut(); + let mut exchanges = self.matter.exchanges.borrow_mut(); - let exchange = Transport::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO + let exchange = ExchangeCtx::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO f(self, exchange) } @@ -312,7 +300,7 @@ impl<'a> Drop for Exchange<'a> { fn drop(&mut self) { let _ = self.with_ctx_mut(|_self, ctx| { ctx.state = ExchangeState::Closed; - _self.transport.send_notification.signal(()); + _self.matter.send_notification.signal(()); Ok(()) }); diff --git a/matter/src/transport/mod.rs b/matter/src/transport/mod.rs index 6c5601e..e968adb 100644 --- a/matter/src/transport/mod.rs +++ b/matter/src/transport/mod.rs @@ -24,6 +24,5 @@ pub mod packet; pub mod pipe; pub mod plain_hdr; pub mod proto_hdr; -pub mod runner; pub mod session; pub mod udp; diff --git a/matter/src/transport/runner.rs b/matter/src/transport/runner.rs deleted file mode 100644 index 554721b..0000000 --- a/matter/src/transport/runner.rs +++ /dev/null @@ -1,319 +0,0 @@ -/* - * - * Copyright (c) 2020-2022 Project CHIP Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use core::{mem::MaybeUninit, pin::pin}; - -use embassy_futures::select::{select, select_slice, Either}; -use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel}; - -use log::{error, info}; - -use crate::{data_model::objects::DataModelHandler, CommissioningData, Matter}; -use crate::{error::Error, transport::packet::MAX_RX_BUF_SIZE, utils::select::EitherUnwrap}; - -use super::{ - core::Transport, - exchange::{Notification, MAX_EXCHANGES}, - packet::{MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE}, - pipe::{Chunk, Pipe}, -}; - -type TxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>; -type RxBuf = MaybeUninit<[u8; MAX_RX_BUF_SIZE]>; -type SxBuf = MaybeUninit<[u8; MAX_RX_STATUS_BUF_SIZE]>; - -struct PacketPools { - tx: [TxBuf; MAX_EXCHANGES], - rx: [RxBuf; MAX_EXCHANGES], - sx: [SxBuf; MAX_EXCHANGES], -} - -impl PacketPools { - const TX_ELEM: TxBuf = MaybeUninit::uninit(); - const RX_ELEM: RxBuf = MaybeUninit::uninit(); - const SX_ELEM: SxBuf = MaybeUninit::uninit(); - - const TX_INIT: [TxBuf; MAX_EXCHANGES] = [Self::TX_ELEM; MAX_EXCHANGES]; - const RX_INIT: [RxBuf; MAX_EXCHANGES] = [Self::RX_ELEM; MAX_EXCHANGES]; - const SX_INIT: [SxBuf; MAX_EXCHANGES] = [Self::SX_ELEM; MAX_EXCHANGES]; - - #[inline(always)] - pub const fn new() -> Self { - Self { - tx: Self::TX_INIT, - rx: Self::RX_INIT, - sx: Self::SX_INIT, - } - } -} - -#[cfg(any(feature = "std", feature = "embassy-net"))] -pub struct AllUdpBuffers { - transport: TransportUdpBuffers, - mdns: crate::mdns::MdnsUdpBuffers, -} - -#[cfg(any(feature = "std", feature = "embassy-net"))] -impl AllUdpBuffers { - #[inline(always)] - pub const fn new() -> Self { - Self { - transport: TransportUdpBuffers::new(), - mdns: crate::mdns::MdnsUdpBuffers::new(), - } - } -} - -#[cfg(any(feature = "std", feature = "embassy-net"))] -pub struct TransportUdpBuffers { - udp: crate::transport::udp::UdpBuffers, - tx_buf: TxBuf, - rx_buf: RxBuf, -} - -#[cfg(any(feature = "std", feature = "embassy-net"))] -impl TransportUdpBuffers { - #[inline(always)] - pub const fn new() -> Self { - Self { - udp: crate::transport::udp::UdpBuffers::new(), - tx_buf: core::mem::MaybeUninit::uninit(), - rx_buf: core::mem::MaybeUninit::uninit(), - } - } -} - -/// This struct implements an executor-agnostic option to run the Matter transport stack end-to-end. -/// -/// Since it is not possible to use executor tasks spawning in an executor-agnostic way (yet), -/// the async loops are arranged as one giant future. Therefore, the cost is a slightly slower execution -/// due to the generated future being relatively big and deeply nested. -/// -/// Users are free to implement their own async execution loop, by utilizing the `Transport` -/// struct directly with their async executor of choice. -pub struct TransportRunner<'a> { - transport: Transport<'a>, - pools: PacketPools, -} - -impl<'a> TransportRunner<'a> { - #[inline(always)] - pub fn new(matter: &'a Matter<'a>) -> Self { - Self::wrap(Transport::new(matter)) - } - - #[inline(always)] - pub const fn wrap(transport: Transport<'a>) -> Self { - Self { - transport, - pools: PacketPools::new(), - } - } - - pub fn transport(&self) -> &Transport { - &self.transport - } - - #[cfg(any(feature = "std", feature = "embassy-net"))] - pub async fn run_udp_all( - &mut self, - stack: &crate::transport::network::NetworkStack, - mdns: &crate::mdns::MdnsService<'_>, - buffers: &mut AllUdpBuffers, - dev_comm: CommissioningData, - handler: &H, - ) -> Result<(), Error> - where - D: crate::transport::network::NetworkStackDriver, - H: DataModelHandler, - { - let mut mdns_runner = crate::mdns::MdnsRunner::new(mdns); - - let mut mdns = pin!(mdns_runner.run_udp(stack, &mut buffers.mdns)); - let mut transport = pin!(self.run_udp(stack, &mut buffers.transport, dev_comm, handler)); - - embassy_futures::select::select(&mut mdns, &mut transport) - .await - .unwrap() - } - - #[cfg(any(feature = "std", feature = "embassy-net"))] - pub async fn run_udp( - &mut self, - stack: &crate::transport::network::NetworkStack, - buffers: &mut TransportUdpBuffers, - dev_comm: CommissioningData, - handler: &H, - ) -> Result<(), Error> - where - D: crate::transport::network::NetworkStackDriver, - H: DataModelHandler, - { - let udp = crate::transport::udp::UdpListener::new( - stack, - crate::transport::network::SocketAddr::new( - crate::transport::network::IpAddr::V6( - crate::transport::network::Ipv6Addr::UNSPECIFIED, - ), - self.transport.matter().port, - ), - &mut buffers.udp, - ) - .await?; - - let tx_pipe = Pipe::new(unsafe { buffers.tx_buf.assume_init_mut() }); - let rx_pipe = Pipe::new(unsafe { buffers.rx_buf.assume_init_mut() }); - - let tx_pipe = &tx_pipe; - let rx_pipe = &rx_pipe; - let udp = &udp; - - let mut tx = pin!(async move { - loop { - { - let mut data = tx_pipe.data.lock().await; - - if let Some(chunk) = data.chunk { - udp.send(chunk.addr.unwrap_udp(), &data.buf[chunk.start..chunk.end]) - .await?; - data.chunk = None; - tx_pipe.data_consumed_notification.signal(()); - } - } - - tx_pipe.data_supplied_notification.wait().await; - } - }); - - let mut rx = pin!(async move { - loop { - { - let mut data = rx_pipe.data.lock().await; - - if data.chunk.is_none() { - let (len, addr) = udp.recv(data.buf).await?; - - data.chunk = Some(Chunk { - start: 0, - end: len, - addr: crate::transport::network::Address::Udp(addr), - }); - rx_pipe.data_supplied_notification.signal(()); - } - } - - rx_pipe.data_consumed_notification.wait().await; - } - }); - - let mut run = pin!(async move { self.run(tx_pipe, rx_pipe, dev_comm, handler).await }); - - embassy_futures::select::select3(&mut tx, &mut rx, &mut run) - .await - .unwrap() - } - - pub async fn run( - &mut self, - tx_pipe: &Pipe<'_>, - rx_pipe: &Pipe<'_>, - dev_comm: CommissioningData, - handler: &H, - ) -> Result<(), Error> - where - H: DataModelHandler, - { - info!("Running Matter transport"); - - let buf = unsafe { self.pools.rx[0].assume_init_mut() }; - - if self.transport.matter().start_comissioning(dev_comm, buf)? { - info!("Comissioning started"); - } - - let construction_notification = Notification::new(); - - let mut rx = pin!(Self::handle_rx( - &self.transport, - &mut self.pools, - rx_pipe, - &construction_notification, - handler - )); - let mut tx = pin!(self.transport.handle_tx(tx_pipe)); - - select(&mut rx, &mut tx).await.unwrap() - } - - #[inline(always)] - async fn handle_rx( - transport: &Transport<'_>, - pools: &mut PacketPools, - rx_pipe: &Pipe<'_>, - construction_notification: &Notification, - handler: &H, - ) -> Result<(), Error> - where - H: DataModelHandler, - { - info!("Creating queue for {} exchanges", 1); - - let channel = Channel::::new(); - - info!("Creating {} handlers", MAX_EXCHANGES); - let mut handlers = heapless::Vec::<_, MAX_EXCHANGES>::new(); - - info!("Handlers size: {}", core::mem::size_of_val(&handlers)); - - // Unsafely allow mutable aliasing in the packet pools by different indices - let pools: *mut PacketPools = pools; - - for index in 0..MAX_EXCHANGES { - let channel = &channel; - let handler_id = index; - - let pools = unsafe { pools.as_mut() }.unwrap(); - - let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() }; - let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() }; - let sx_buf = unsafe { pools.sx[handler_id].assume_init_mut() }; - - handlers - .push( - transport - .exchange_handler(tx_buf, rx_buf, sx_buf, handler_id, channel, handler), - ) - .map_err(|_| ()) - .unwrap(); - } - - let mut rx = - pin!(transport.handle_rx_multiplex(rx_pipe, &construction_notification, &channel)); - - let result = select(&mut rx, select_slice(&mut handlers)).await; - - if let Either::First(result) = result { - if let Err(e) = &result { - error!("Exitting RX loop due to an error: {:?}", e); - } - - result?; - } - - Ok(()) - } -} diff --git a/matter/src/transport/session.rs b/matter/src/transport/session.rs index c421244..41fbc49 100644 --- a/matter/src/transport/session.rs +++ b/matter/src/transport/session.rs @@ -306,6 +306,11 @@ impl SessionMgr { } } + pub fn reset(&mut self) { + self.sessions.clear(); + self.next_sess_id = 1; + } + pub fn mut_by_index(&mut self, index: usize) -> Option<&mut Session> { self.sessions.get_mut(index).and_then(Option::as_mut) } diff --git a/matter/src/transport/udp.rs b/matter/src/transport/udp.rs index 3d27d2d..602dc12 100644 --- a/matter/src/transport/udp.rs +++ b/matter/src/transport/udp.rs @@ -16,10 +16,10 @@ */ #[cfg(all(feature = "std", not(feature = "embassy-net")))] -pub use async_io::*; +pub use self::async_io::*; #[cfg(feature = "embassy-net")] -pub use embassy_net::*; +pub use self::embassy_net::*; #[cfg(feature = "std")] pub mod async_io { @@ -88,7 +88,7 @@ pub mod async_io { #[cfg(target_os = "espidf")] { fn esp_setsockopt( - socket: &mut UdpSocket, + socket: &UdpSocket, proto: u32, option: u32, value: T, @@ -119,7 +119,7 @@ pub mod async_io { }; esp_setsockopt( - &mut self.0, + &mut self.0.get_ref(), esp_idf_sys::IPPROTO_IP, esp_idf_sys::IP_ADD_MEMBERSHIP, mreq, diff --git a/matter/tests/common/im_engine.rs b/matter/tests/common/im_engine.rs index 8efb2c9..1cd26bd 100644 --- a/matter/tests/common/im_engine.rs +++ b/matter/tests/common/im_engine.rs @@ -48,16 +48,15 @@ use matter::{ secure_channel::{self, common::PROTO_ID_SECURE_CHANNEL, spake2p::VerifierData}, tlv::{TLVWriter, TagType, ToTLV}, transport::{ - exchange::Notification, + core::PacketBuffers, packet::{Packet, MAX_RX_BUF_SIZE, MAX_TX_BUF_SIZE}, pipe::Pipe, - runner::TransportRunner, }, transport::{ network::Address, session::{CaseDetails, CloneData, NocCatIds, SessionMode}, }, - utils::select::EitherUnwrap, + utils::select::{EitherUnwrap, Notification}, CommissioningData, Matter, MATTER_PORT, }; @@ -248,7 +247,7 @@ impl<'a> ImEngine<'a> { input: &[&ImInput], out: &mut heapless::Vec, ) -> Result<(), Error> { - let mut runner = TransportRunner::new(&self.matter); + self.matter.reset_transport(); let clone_data = CloneData::new( IM_ENGINE_REMOTE_PEER_ID, @@ -259,8 +258,8 @@ impl<'a> ImEngine<'a> { SessionMode::Case(CaseDetails::new(1, &self.cat_ids)), ); - let sess_idx = runner - .transport() + let sess_idx = self + .matter .session_mgr .borrow_mut() .clone_session(&clone_data) @@ -281,10 +280,9 @@ impl<'a> ImEngine<'a> { let rx_pipe_buf = &mut rx_pipe_buf; let handler = &handler; - let runner = &mut runner; - let mut msg_ctr = runner - .transport() + let mut msg_ctr = self + .matter .session_mgr .borrow_mut() .mut_by_index(sess_idx) @@ -294,9 +292,13 @@ impl<'a> ImEngine<'a> { let resp_notif = Notification::new(); let resp_notif = &resp_notif; + let mut buffers = PacketBuffers::new(); + let buffers = &mut buffers; + embassy_futures::block_on(async move { select3( - runner.run( + self.matter.run_piped( + buffers, tx_pipe, rx_pipe, CommissioningData { diff --git a/matter/tests/data_model/timed_requests.rs b/matter/tests/data_model/timed_requests.rs index e4eb960..c255506 100644 --- a/matter/tests/data_model/timed_requests.rs +++ b/matter/tests/data_model/timed_requests.rs @@ -100,7 +100,7 @@ fn test_timed_cmd_success() { ImEngine::timed_commands( input, &TimedInvResponse::TransactionSuccess(expected), - 400, + 2000, 0, true, ); @@ -130,7 +130,7 @@ fn test_timed_cmd_timedout_mismatch() { ImEngine::timed_commands( input, &TimedInvResponse::TransactionError(IMStatusCode::TimedRequestMisMatch), - 400, + 2000, 0, false, ); diff --git a/matter_macro_derive/Cargo.toml b/matter_macro_derive/Cargo.toml index 163ff50..5bf29bb 100644 --- a/matter_macro_derive/Cargo.toml +++ b/matter_macro_derive/Cargo.toml @@ -2,6 +2,7 @@ name = "matter_macro_derive" version = "0.1.0" edition = "2021" +license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] diff --git a/tools/tlv_tool/Cargo.toml b/tools/tlv_tool/Cargo.toml index f8c1e23..f4c1035 100644 --- a/tools/tlv_tool/Cargo.toml +++ b/tools/tlv_tool/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "tlv_tool" version = "0.1.0" -edition = "2018" +edition = "2021" +license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html