Simplify API by combining Matter, Transport and TransportRunner; simplify Mdns and Psm runners

This commit is contained in:
ivmarkov 2023-07-21 12:06:58 +00:00
parent 71b9a578d0
commit 916f2148f8
29 changed files with 520 additions and 629 deletions

View file

@ -15,11 +15,11 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
crypto-backend: ['crypto_openssl', 'crypto_rustcrypto', 'crypto_mbedtls']
crypto-backend: ['rustcrypto', 'mbedtls', 'openssl']
steps:
- uses: actions/checkout@v2
- name: Build
run: cd matter; cargo build --verbose --no-default-features --features ${{matrix.crypto-backend}}
run: cd matter; cargo build --no-default-features --features ${{matrix.crypto-backend}}
- name: Run tests
run: cd matter; cargo test --verbose --no-default-features --features ${{matrix.crypto-backend}} -- --test-threads=1
run: cd matter; cargo test --no-default-features --features os,${{matrix.crypto-backend}} -- --test-threads=1

View file

@ -1,7 +1,8 @@
[workspace]
resolver = "2"
members = ["matter", "matter_macro_derive"]
exclude = ["examples/*"]
exclude = ["examples/*", "tools/tlv_tool"]
# For compatibility with ESP IDF
[patch.crates-io]

View file

@ -41,23 +41,24 @@ The `async` metaphor however comes with a bit higher memory usage, due to not en
## Build
Building the library:
### Building the library
```
$ cargo build
```
Building and running the example (Linux, MacOS X):
### Building and running the example (Linux, MacOS X)
```
$ cargo run --example onoff_light
```
Building the example (Espressif's ESP-IDF):
### Building the example (Espressif's ESP-IDF)
* Install all build prerequisites described [here](https://github.com/esp-rs/esp-idf-template#prerequisites)
* Build with the following command line:
```
export MCU=esp32; export CARGO_TARGET_XTENSA_ESP32_ESPIDF_LINKER=ldproxy; export RUSTFLAGS="-C default-linker-libraries"; export WIFI_SSID=ssid;export WIFI_PASS=pass; cargo build --example onoff_light --no-default-features --features std,crypto_rustcrypto --target xtensa-esp32-espidf -Zbuild-std=std,panic_abort
export MCU=esp32; export CARGO_TARGET_XTENSA_ESP32_ESPIDF_LINKER=ldproxy; export RUSTFLAGS="-C default-linker-libraries"; export WIFI_SSID=ssid;export WIFI_PASS=pass; cargo build --example onoff_light --no-default-features --features esp-idf --target xtensa-esp32-espidf -Zbuild-std=std,panic_abort
```
* If you are building for a different Espressif MCU, change the `MCU` variable, the `xtensa-esp32-espidf` target and the name of the `CARGO_TARGET_<esp-idf-target-uppercase>_LINKER` variable to match your MCU and its Rust target. Available Espressif MCUs and targets are:
* esp32 / xtensa-esp32-espidf
@ -69,6 +70,10 @@ export MCU=esp32; export CARGO_TARGET_XTENSA_ESP32_ESPIDF_LINKER=ldproxy; export
* Put in `WIFI_SSID` / `WIFI_PASS` the SSID & password for your wireless router
* Flash using the `espflash` utility described in the build prerequsites' link above
### Building the example (ESP32-XX baremetal or RP2040)
Coming soon!
## Test
With the `chip-tool` (the current tool for testing Matter) use the Ethernet commissioning mechanism:

View file

@ -18,6 +18,7 @@
use core::borrow::Borrow;
use core::pin::pin;
use embassy_futures::select::select3;
use log::info;
use matter::core::{CommissioningData, Matter};
use matter::data_model::cluster_basic_information::BasicInfoConfig;
@ -27,18 +28,18 @@ use matter::data_model::objects::*;
use matter::data_model::root_endpoint;
use matter::data_model::system_model::descriptor;
use matter::error::Error;
use matter::mdns::MdnsService;
use matter::persist::FilePsm;
use matter::mdns::{MdnsRunBuffers, MdnsService};
use matter::secure_channel::spake2p::VerifierData;
use matter::transport::core::RunBuffers;
use matter::transport::network::{Ipv4Addr, Ipv6Addr, NetworkStack};
use matter::transport::runner::{AllUdpBuffers, TransportRunner};
use matter::utils::select::EitherUnwrap;
mod dev_att;
#[cfg(feature = "std")]
fn main() -> Result<(), Error> {
let thread = std::thread::Builder::new()
.stack_size(140 * 1024)
.stack_size(150 * 1024)
.spawn(run)
.unwrap();
@ -56,11 +57,11 @@ fn run() -> Result<(), Error> {
initialize_logger();
info!(
"Matter memory: mDNS={}, Matter={}, TransportRunner={}, UdpBuffers={}",
"Matter memory: mDNS={}, Matter={}, MdnsBuffers={}, RunBuffers={}",
core::mem::size_of::<MdnsService>(),
core::mem::size_of::<Matter>(),
core::mem::size_of::<TransportRunner>(),
core::mem::size_of::<AllUdpBuffers>(),
core::mem::size_of::<MdnsRunBuffers>(),
core::mem::size_of::<RunBuffers>(),
);
let dev_det = BasicInfoConfig {
@ -73,12 +74,6 @@ fn run() -> Result<(), Error> {
device_name: "OnOff Light",
};
let psm_path = std::env::temp_dir().join("matter-iot");
info!("Persisting from/to {}", psm_path.display());
#[cfg(all(feature = "std", not(target_os = "espidf")))]
let psm = matter::persist::FilePsm::new(psm_path)?;
let (ipv4_addr, ipv6_addr, interface) = initialize_network()?;
let dev_att = dev_att::HardCodedDevAtt::new();
@ -106,7 +101,7 @@ fn run() -> Result<(), Error> {
matter::MATTER_PORT,
);
info!("mDNS initialized: {:p}", &mdns);
info!("mDNS initialized");
let matter = Matter::new(
// vid/pid should match those in the DAC
@ -118,36 +113,28 @@ fn run() -> Result<(), Error> {
matter::MATTER_PORT,
);
info!("Matter initialized: {:p}", &matter);
info!("Matter initialized");
#[cfg(all(feature = "std", not(target_os = "espidf")))]
{
let mut buf = [0; 4096];
let buf = &mut buf;
if let Some(data) = psm.load("acls", buf)? {
matter.load_acls(data)?;
}
if let Some(data) = psm.load("fabrics", buf)? {
matter.load_fabrics(data)?;
}
}
let mut runner = TransportRunner::new(&matter);
info!("Transport runner initialized: {:p}", &runner);
let mut psm = matter::persist::Psm::new(&matter, std::env::temp_dir().join("matter-iot"))?;
let handler = HandlerCompat(handler(&matter));
// NOTE (no_std): If using the `embassy-net` UDP implementation, replace this dummy stack with the `embassy-net` one
// When using a custom UDP stack, remove this
// When using a custom UDP stack, remove the network stack initialization below
// and call `Matter::run_piped()` instead, by utilizing the TX & RX `Pipe` structs
// to push/pull your UDP packets from/to the Matter stack.
// Ditto for `MdnsService`.
//
// When using the `embassy-net` feature (as opposed to the Rust Standard Library network stack),
// this initialization would be more complex.
let stack = NetworkStack::new();
let mut buffers = AllUdpBuffers::new();
let mut mdns_buffers = MdnsRunBuffers::new();
let mut mdns_runner = pin!(mdns.run(&stack, &mut mdns_buffers));
let mut fut = pin!(runner.run_udp_all(
let mut buffers = RunBuffers::new();
let mut runner = matter.run(
&stack,
&mdns,
&mut buffers,
CommissioningData {
// TODO: Hard-coded for now
@ -155,16 +142,30 @@ fn run() -> Result<(), Error> {
discriminator: 250,
},
&handler,
));
);
info!(
"Matter transport runner memory: {}",
core::mem::size_of_val(&runner)
);
let mut runner = pin!(runner);
#[cfg(all(feature = "std", not(target_os = "espidf")))]
let mut psm_runner = pin!(psm.run());
#[cfg(not(all(feature = "std", not(target_os = "espidf"))))]
let mut psm_runner = pin!(core::future::pending());
let mut runner = select3(&mut runner, &mut mdns_runner, &mut psm_runner);
// NOTE: For no_std, replace with your own no_std way of polling the future
#[cfg(feature = "std")]
async_io::block_on(&mut fut)?;
async_io::block_on(&mut runner).unwrap()?;
// NOTE (no_std): For no_std, replace with your own more efficient no_std executor,
// because the executor used below is a simple busy-loop poller
#[cfg(not(feature = "std"))]
embassy_futures::block_on(&mut fut)?;
embassy_futures::block_on(&mut runner).unwrap()?;
Ok(())
}
@ -268,26 +269,6 @@ fn initialize_network() -> Result<(Ipv4Addr, Ipv6Addr, u32), Error> {
Ok((ip, ipv6, 0 as _))
}
#[cfg(all(feature = "std", not(target_os = "espidf")))]
#[inline(never)]
async fn save(matter: &Matter<'_>, psm: &FilePsm) -> Result<(), Error> {
let mut buf = [0; 4096];
let buf = &mut buf;
loop {
matter.wait_changed().await;
if matter.is_changed() {
if let Some(data) = matter.store_acls(buf)? {
psm.store("acls", data)?;
}
if let Some(data) = matter.store_fabrics(buf)? {
psm.store("fabrics", data)?;
}
}
}
}
#[cfg(target_os = "espidf")]
#[inline(never)]
fn initialize_logger() {

View file

@ -8,23 +8,23 @@ repository = "https://github.com/kedars/matter-rs"
readme = "README.md"
keywords = ["matter", "smart", "smart-home", "IoT", "ESP32"]
categories = ["embedded", "network-programming"]
license = "MIT"
license = "Apache-2.0"
[lib]
name = "matter"
path = "src/lib.rs"
[features]
default = ["os", "crypto_rustcrypto"]
default = ["os", "mbedtls"]
os = ["std", "backtrace", "env_logger", "nix", "critical-section/std", "embassy-sync/std", "embassy-time/std"]
esp-idf = ["std", "crypto_rustcrypto", "esp-idf-sys", "esp-idf-hal", "esp-idf-svc"]
esp-idf = ["std", "rustcrypto", "esp-idf-sys"]
std = ["alloc", "rand", "qrcode", "async-io", "esp-idf-sys?/std", "embassy-time/generic-queue-16"]
backtrace = []
alloc = []
nightly = []
crypto_openssl = ["alloc", "openssl", "foreign-types", "hmac", "sha2"]
crypto_mbedtls = ["alloc", "mbedtls"]
crypto_rustcrypto = ["alloc", "sha2", "hmac", "pbkdf2", "hkdf", "aes", "ccm", "p256", "elliptic-curve", "crypto-bigint", "x509-cert", "rand_core"]
openssl = ["alloc", "dep:openssl", "foreign-types", "hmac", "sha2"]
mbedtls = ["alloc", "dep:mbedtls"]
rustcrypto = ["alloc", "sha2", "hmac", "pbkdf2", "hkdf", "aes", "ccm", "p256", "elliptic-curve", "crypto-bigint", "x509-cert", "rand_core"]
embassy-net = ["dep:embassy-net", "dep:embassy-net-driver", "smoltcp"]
[dependencies]
@ -58,10 +58,10 @@ smoltcp = { version = "0.10", default-features = false, optional = true }
# STD-only dependencies
rand = { version = "0.8.5", optional = true }
qrcode = { version = "0.12", default-features = false, optional = true } # Print QR code
async-io = { version = "=1.12", optional = true } # =1.2 for compatibility with ESP IDF
async-io = { version = "=1.12", optional = true } # =1.12 for compatibility with ESP IDF
# crypto
openssl = { git = "https://github.com/sfackler/rust-openssl", optional = true }
openssl = { version = "0.10.55", optional = true }
foreign-types = { version = "0.3.2", optional = true }
# rust-crypto
@ -81,18 +81,22 @@ x509-cert = { version = "0.2.0", default-features = false, features = ["pem"], o
astro-dnssd = { version = "0.3" }
[target.'cfg(not(target_os = "espidf"))'.dependencies]
mbedtls = { git = "https://github.com/fortanix/rust-mbedtls", optional = true }
mbedtls = { version = "0.9", optional = true }
env_logger = { version = "0.10.0", optional = true }
nix = { version = "0.26", features = ["net"], optional = true }
[target.'cfg(target_os = "espidf")'.dependencies]
esp-idf-sys = { version = "0.33", optional = true, default-features = false, features = ["native", "binstart"] }
esp-idf-hal = { version = "0.41", optional = true, features = ["embassy-sync", "critical-section"] } # TODO: Only necessary for the examples
esp-idf-svc = { version = "0.46", optional = true, features = ["embassy-time-driver"] } # TODO: Only necessary for the examples
esp-idf-sys = { version = "0.33", optional = true, default-features = false, features = ["native"] }
[build-dependencies]
embuild = "0.31.2"
[target.'cfg(target_os = "espidf")'.dev-dependencies]
esp-idf-sys = { version = "0.33", default-features = false, features = ["binstart"] }
esp-idf-hal = { version = "0.41", features = ["embassy-sync", "critical-section"] }
esp-idf-svc = { version = "0.46", features = ["embassy-time-driver"] }
embedded-svc = { version = "0.25" }
[[example]]
name = "onoff_light"
path = "../examples/onoff_light/src/main.rs"

View file

@ -28,8 +28,11 @@ use crate::{
mdns::Mdns,
pairing::{print_pairing_code_and_qr, DiscoveryCapabilities},
secure_channel::{pake::PaseMgr, spake2p::VerifierData},
transport::exchange::Notification,
utils::{epoch::Epoch, rand::Rand},
transport::{
exchange::{ExchangeCtx, MAX_EXCHANGES},
session::SessionMgr,
},
utils::{epoch::Epoch, rand::Rand, select::Notification},
};
/* The Matter Port */
@ -45,17 +48,20 @@ pub struct CommissioningData {
/// The primary Matter Object
pub struct Matter<'a> {
pub fabric_mgr: RefCell<FabricMgr>,
pub acl_mgr: RefCell<AclMgr>,
pub pase_mgr: RefCell<PaseMgr>,
pub failsafe: RefCell<FailSafe>,
pub persist_notification: Notification,
pub mdns: &'a dyn Mdns,
pub epoch: Epoch,
pub rand: Rand,
pub dev_det: &'a BasicInfoConfig<'a>,
pub dev_att: &'a dyn DevAttDataFetcher,
pub port: u16,
fabric_mgr: RefCell<FabricMgr>,
pub acl_mgr: RefCell<AclMgr>, // Public for tests
pase_mgr: RefCell<PaseMgr>,
failsafe: RefCell<FailSafe>,
persist_notification: Notification,
pub(crate) send_notification: Notification,
mdns: &'a dyn Mdns,
pub(crate) epoch: Epoch,
pub(crate) rand: Rand,
dev_det: &'a BasicInfoConfig<'a>,
dev_att: &'a dyn DevAttDataFetcher,
pub(crate) port: u16,
pub(crate) exchanges: RefCell<heapless::Vec<ExchangeCtx, MAX_EXCHANGES>>,
pub session_mgr: RefCell<SessionMgr>, // Public for tests
}
impl<'a> Matter<'a> {
@ -94,12 +100,15 @@ impl<'a> Matter<'a> {
pase_mgr: RefCell::new(PaseMgr::new(epoch, rand)),
failsafe: RefCell::new(FailSafe::new()),
persist_notification: Notification::new(),
send_notification: Notification::new(),
mdns,
epoch,
rand,
dev_det,
dev_att,
port,
exchanges: RefCell::new(heapless::Vec::new()),
session_mgr: RefCell::new(SessionMgr::new(epoch, rand)),
}
}
@ -160,6 +169,7 @@ impl<'a> Matter<'a> {
Ok(false)
}
}
pub fn notify_changed(&self) {
if self.is_changed() {
self.persist_notification.signal(());

View file

@ -17,6 +17,8 @@
extern crate alloc;
use core::fmt::{self, Debug};
use alloc::sync::Arc;
use log::{error, info};
@ -355,3 +357,9 @@ impl Sha256 {
Ok(())
}
}
impl Debug for Sha256 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Sha256")
}
}

View file

@ -15,6 +15,8 @@
* limitations under the License.
*/
use core::fmt::{self, Debug};
use crate::error::{Error, ErrorCode};
use crate::utils::rand::Rand;
@ -391,3 +393,9 @@ impl Sha256 {
Ok(())
}
}
impl Debug for Sha256 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Sha256")
}
}

View file

@ -37,37 +37,29 @@ pub const ECDH_SHARED_SECRET_LEN_BYTES: usize = 32;
pub const EC_SIGNATURE_LEN_BYTES: usize = 64;
#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))]
#[cfg(all(feature = "mbedtls", target_os = "espidf"))]
mod crypto_esp_mbedtls;
#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))]
#[cfg(all(feature = "mbedtls", target_os = "espidf"))]
pub use self::crypto_esp_mbedtls::*;
#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))]
#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))]
mod crypto_mbedtls;
#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))]
#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))]
pub use self::crypto_mbedtls::*;
#[cfg(feature = "crypto_openssl")]
#[cfg(feature = "openssl")]
mod crypto_openssl;
#[cfg(feature = "crypto_openssl")]
#[cfg(feature = "openssl")]
pub use self::crypto_openssl::*;
#[cfg(feature = "crypto_rustcrypto")]
#[cfg(feature = "rustcrypto")]
mod crypto_rustcrypto;
#[cfg(feature = "crypto_rustcrypto")]
#[cfg(feature = "rustcrypto")]
pub use self::crypto_rustcrypto::*;
#[cfg(not(any(
feature = "crypto_openssl",
feature = "crypto_mbedtls",
feature = "crypto_rustcrypto"
)))]
#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))]
pub mod crypto_dummy;
#[cfg(not(any(
feature = "crypto_openssl",
feature = "crypto_mbedtls",
feature = "crypto_rustcrypto"
)))]
#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))]
pub use self::crypto_dummy::*;
impl<'a> FromTLV<'a> for KeyPair {

View file

@ -1,3 +1,20 @@
/*
*
* Copyright (c) 2020-2022 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::data_model::objects::Node;
#[cfg(feature = "nightly")]

View file

@ -149,7 +149,7 @@ impl<T> From<std::sync::PoisonError<T>> for Error {
}
}
#[cfg(feature = "crypto_openssl")]
#[cfg(feature = "openssl")]
impl From<openssl::error::ErrorStack> for Error {
fn from(e: openssl::error::ErrorStack) -> Self {
::log::error!("Error in TLS: {}", e);
@ -157,7 +157,7 @@ impl From<openssl::error::ErrorStack> for Error {
}
}
#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))]
#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))]
impl From<mbedtls::Error> for Error {
fn from(e: mbedtls::Error) -> Self {
::log::error!("Error in TLS: {}", e);
@ -173,7 +173,7 @@ impl From<esp_idf_sys::EspError> for Error {
}
}
#[cfg(feature = "crypto_rustcrypto")]
#[cfg(feature = "rustcrypto")]
impl From<ccm::aead::Error> for Error {
fn from(_e: ccm::aead::Error) -> Self {
Self::new(ErrorCode::Crypto)

View file

@ -609,7 +609,7 @@ impl<'a, 'r, 'p> Interaction<'a, 'r, 'p> {
rx: &mut Packet<'_>,
tx: &mut Packet<'_>,
) -> Result<Option<Duration>, Error> {
let epoch = exchange.transport().matter().epoch;
let epoch = exchange.matter.epoch;
let mut opcode: OpCode = rx.get_proto_opcode()?;
@ -641,7 +641,7 @@ impl<'a, 'r, 'p> Interaction<'a, 'r, 'p> {
where
S: FnOnce() -> u32,
{
let epoch = exchange.transport().matter().epoch;
let epoch = exchange.matter.epoch;
let opcode = rx.get_proto_opcode()?;
let rx_data = rx.as_slice();

View file

@ -55,19 +55,15 @@ where
}
}
#[cfg(all(feature = "std", target_os = "macos"))]
pub use astro::MdnsRunner;
#[cfg(all(feature = "std", target_os = "macos"))]
pub use astro::MdnsService;
#[cfg(all(feature = "std", target_os = "macos"))]
pub use astro::MdnsUdpBuffers;
#[cfg(not(all(feature = "std", target_os = "macos")))]
pub use builtin::MdnsRunner;
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub use builtin::MdnsRunBuffers;
#[cfg(not(all(feature = "std", target_os = "macos")))]
pub use builtin::MdnsService;
#[cfg(not(all(feature = "std", target_os = "macos")))]
pub use builtin::MdnsUdpBuffers;
pub struct DummyMdns;

View file

@ -11,6 +11,17 @@ use log::info;
use super::ServiceMode;
/// Only for API-compatibility with builtin::MdnsRunner
pub struct MdnsUdpBuffers(());
/// Only for API-compatibility with builtin::MdnsRunner
impl MdnsUdpBuffers {
#[inline(always)]
pub const fn new() -> Self {
Self(())
}
}
pub struct MdnsService<'a> {
dev_det: &'a BasicInfoConfig<'a>,
matter_port: u16,
@ -78,16 +89,15 @@ impl<'a> MdnsService<'a> {
Ok(())
}
/// Only for API-compatibility with builtin::MdnsRunner
pub async fn run_udp(&mut self, buffers: &mut MdnsUdpBuffers) -> Result<(), Error> {
core::future::pending::<Result<(), Error>>().await
}
/// Only for API-compatibility with builtin::MdnsRunner
pub struct MdnsUdpBuffers(());
/// Only for API-compatibility with builtin::MdnsRunner
impl MdnsUdpBuffers {
#[inline(always)]
pub const fn new() -> Self {
Self(())
pub async fn run(&self, _tx_pipe: &Pipe<'_>, _rx_pipe: &Pipe<'_>) -> Result<(), Error> {
core::future::pending::<Result<(), Error>>().await
}
}
@ -100,21 +110,3 @@ impl<'a> super::Mdns for MdnsService<'a> {
MdnsService::remove(self, service)
}
}
/// Only for API-compatibility with builtin::MdnsRunner
pub struct MdnsRunner<'a>(&'a MdnsService<'a>);
/// Only for API-compatibility with builtin::MdnsRunner
impl<'a> MdnsRunner<'a> {
pub const fn new(mdns: &'a MdnsService<'a>) -> Self {
Self(mdns)
}
pub async fn run_udp(&mut self, buffers: &mut MdnsUdpBuffers) -> Result<(), Error> {
core::future::pending::<Result<(), Error>>().await
}
pub async fn run(&self, _tx_pipe: &Pipe<'_>, _rx_pipe: &Pipe<'_>) -> Result<(), Error> {
core::future::pending::<Result<(), Error>>().await
}
}

View file

@ -22,6 +22,25 @@ const IPV6_BROADCAST_ADDR: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x
const PORT: u16 = 5353;
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub struct MdnsRunBuffers {
udp: crate::transport::udp::UdpBuffers,
tx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_TX_BUF_SIZE]>,
rx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_RX_BUF_SIZE]>,
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
impl MdnsRunBuffers {
#[inline(always)]
pub const fn new() -> Self {
Self {
udp: crate::transport::udp::UdpBuffers::new(),
tx_buf: core::mem::MaybeUninit::uninit(),
rx_buf: core::mem::MaybeUninit::uninit(),
}
}
}
pub struct MdnsService<'a> {
host: Host<'a>,
#[allow(unused)]
@ -100,39 +119,12 @@ impl<'a> MdnsService<'a> {
Ok(())
}
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub struct MdnsUdpBuffers {
udp: crate::transport::udp::UdpBuffers,
tx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_TX_BUF_SIZE]>,
rx_buf: core::mem::MaybeUninit<[u8; crate::transport::packet::MAX_RX_BUF_SIZE]>,
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
impl MdnsUdpBuffers {
#[inline(always)]
pub const fn new() -> Self {
Self {
udp: crate::transport::udp::UdpBuffers::new(),
tx_buf: core::mem::MaybeUninit::uninit(),
rx_buf: core::mem::MaybeUninit::uninit(),
}
}
}
pub struct MdnsRunner<'a>(&'a MdnsService<'a>);
impl<'a> MdnsRunner<'a> {
pub const fn new(mdns: &'a MdnsService<'a>) -> Self {
Self(mdns)
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub async fn run_udp<D>(
&mut self,
pub async fn run<D>(
&self,
stack: &crate::transport::network::NetworkStack<D>,
buffers: &mut MdnsUdpBuffers,
buffers: &mut MdnsRunBuffers,
) -> Result<(), Error>
where
D: crate::transport::network::NetworkStackDriver,
@ -146,14 +138,14 @@ impl<'a> MdnsRunner<'a> {
// V6 multicast does not work with smoltcp yet (see https://github.com/smoltcp-rs/smoltcp/pull/602)
#[cfg(not(feature = "embassy-net"))]
if let Some(interface) = self.0.interface {
if let Some(interface) = self.interface {
udp.join_multicast_v6(IPV6_BROADCAST_ADDR, interface)
.await?;
}
udp.join_multicast_v4(
IP_BROADCAST_ADDR,
crate::transport::network::Ipv4Addr::from(self.0.host.ip),
crate::transport::network::Ipv4Addr::from(self.host.ip),
)
.await?;
@ -202,14 +194,14 @@ impl<'a> MdnsRunner<'a> {
}
});
let mut run = pin!(async move { self.run(tx_pipe, rx_pipe).await });
let mut run = pin!(async move { self.run_piped(tx_pipe, rx_pipe).await });
embassy_futures::select::select3(&mut tx, &mut rx, &mut run)
.await
.unwrap()
}
pub async fn run(&self, tx_pipe: &Pipe<'_>, rx_pipe: &Pipe<'_>) -> Result<(), Error> {
pub async fn run_piped(&self, tx_pipe: &Pipe<'_>, rx_pipe: &Pipe<'_>) -> Result<(), Error> {
let mut broadcast = pin!(self.broadcast(tx_pipe));
let mut respond = pin!(self.respond(rx_pipe, tx_pipe));
@ -220,7 +212,7 @@ impl<'a> MdnsRunner<'a> {
async fn broadcast(&self, tx_pipe: &Pipe<'_>) -> Result<(), Error> {
loop {
select(
self.0.notification.wait(),
self.notification.wait(),
Timer::after(Duration::from_secs(30)),
)
.await;
@ -229,13 +221,13 @@ impl<'a> MdnsRunner<'a> {
IpAddr::V4(IP_BROADCAST_ADDR),
IpAddr::V6(IPV6_BROADCAST_ADDR),
] {
if self.0.interface.is_some() || addr == IpAddr::V4(IP_BROADCAST_ADDR) {
if self.interface.is_some() || addr == IpAddr::V4(IP_BROADCAST_ADDR) {
loop {
let sent = {
let mut data = tx_pipe.data.lock().await;
if data.chunk.is_none() {
let len = self.0.host.broadcast(&self.0, data.buf, 60)?;
let len = self.host.broadcast(self, data.buf, 60)?;
if len > 0 {
info!("Broadasting mDNS entry to {}:{}", addr, PORT);
@ -280,7 +272,7 @@ impl<'a> MdnsRunner<'a> {
let mut tx_data = tx_pipe.data.lock().await;
if tx_data.chunk.is_none() {
let len = self.0.host.respond(&self.0, data, tx_data.buf, 60)?;
let len = self.host.respond(self, data, tx_data.buf, 60)?;
if len > 0 {
info!("Replying to mDNS query from {}", rx_chunk.addr);

View file

@ -96,7 +96,7 @@ pub fn print_pairing_code_and_qr(
Ok(())
}
pub(self) fn passwd_from_comm_data(comm_data: &CommissioningData) -> u32 {
fn passwd_from_comm_data(comm_data: &CommissioningData) -> u32 {
// todo: should this be part of the comm_data implementation?
match comm_data.verifier.data {
VerifierOption::Password(pwd) => pwd,

View file

@ -15,31 +15,63 @@
* limitations under the License.
*/
#[cfg(feature = "std")]
pub use file_psm::*;
pub use fileio::*;
#[cfg(feature = "std")]
mod file_psm {
pub mod fileio {
use std::fs;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use log::info;
use crate::error::{Error, ErrorCode};
use crate::Matter;
pub struct FilePsm {
pub struct Psm<'a> {
matter: &'a Matter<'a>,
dir: PathBuf,
buf: [u8; 4096],
}
impl FilePsm {
pub fn new(dir: PathBuf) -> Result<Self, Error> {
impl<'a> Psm<'a> {
#[inline(always)]
pub fn new(matter: &'a Matter<'a>, dir: PathBuf) -> Result<Self, Error> {
fs::create_dir_all(&dir)?;
Ok(Self { dir })
info!("Persisting from/to {}", dir.display());
let mut buf = [0; 4096];
if let Some(data) = Self::load(&dir, "acls", &mut buf)? {
matter.load_acls(data)?;
}
pub fn load<'a>(&self, key: &str, buf: &'a mut [u8]) -> Result<Option<&'a [u8]>, Error> {
let path = self.dir.join(key);
if let Some(data) = Self::load(&dir, "fabrics", &mut buf)? {
matter.load_fabrics(data)?;
}
Ok(Self { matter, dir, buf })
}
pub async fn run(&mut self) -> Result<(), Error> {
loop {
self.matter.wait_changed().await;
if self.matter.is_changed() {
if let Some(data) = self.matter.store_acls(&mut self.buf)? {
Self::store(&self.dir, "acls", data)?;
}
if let Some(data) = self.matter.store_fabrics(&mut self.buf)? {
Self::store(&self.dir, "fabrics", data)?;
}
}
}
}
fn load<'b>(dir: &Path, key: &str, buf: &'b mut [u8]) -> Result<Option<&'b [u8]>, Error> {
let path = dir.join(key);
match fs::File::open(path) {
Ok(mut file) => {
@ -69,8 +101,8 @@ mod file_psm {
}
}
pub fn store(&self, key: &str, data: &[u8]) -> Result<(), Error> {
let path = self.dir.join(key);
fn store(dir: &Path, key: &str, data: &[u8]) -> Result<(), Error> {
let path = dir.join(key);
let mut file = fs::File::create(path)?;

View file

@ -15,17 +15,13 @@
* limitations under the License.
*/
#[cfg(not(any(
feature = "crypto_openssl",
feature = "crypto_mbedtls",
feature = "crypto_rustcrypto"
)))]
#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))]
pub use super::crypto_dummy::CryptoSpake2;
#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))]
#[cfg(all(feature = "mbedtls", target_os = "espidf"))]
pub use super::crypto_esp_mbedtls::CryptoSpake2;
#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))]
#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))]
pub use super::crypto_mbedtls::CryptoSpake2;
#[cfg(feature = "crypto_openssl")]
#[cfg(feature = "openssl")]
pub use super::crypto_openssl::CryptoSpake2;
#[cfg(feature = "crypto_rustcrypto")]
#[cfg(feature = "rustcrypto")]
pub use super::crypto_rustcrypto::CryptoSpake2;

View file

@ -17,19 +17,15 @@
pub mod case;
pub mod common;
#[cfg(not(any(
feature = "crypto_openssl",
feature = "crypto_mbedtls",
feature = "crypto_rustcrypto"
)))]
#[cfg(not(any(feature = "openssl", feature = "mbedtls", feature = "rustcrypto")))]
mod crypto_dummy;
#[cfg(all(feature = "crypto_mbedtls", target_os = "espidf"))]
#[cfg(all(feature = "mbedtls", target_os = "espidf"))]
mod crypto_esp_mbedtls;
#[cfg(all(feature = "crypto_mbedtls", not(target_os = "espidf")))]
#[cfg(all(feature = "mbedtls", not(target_os = "espidf")))]
mod crypto_mbedtls;
#[cfg(feature = "crypto_openssl")]
#[cfg(feature = "openssl")]
pub mod crypto_openssl;
#[cfg(feature = "crypto_rustcrypto")]
#[cfg(feature = "rustcrypto")]
pub mod crypto_rustcrypto;
pub mod core;

View file

@ -15,14 +15,18 @@
* limitations under the License.
*/
use core::{borrow::Borrow, cell::RefCell};
use core::borrow::Borrow;
use core::mem::MaybeUninit;
use core::pin::pin;
use embassy_futures::select::select;
use embassy_futures::select::{select, select_slice, Either};
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel};
use embassy_time::{Duration, Timer};
use log::{error, info, warn};
use crate::utils::select::Notification;
use crate::CommissioningData;
use crate::{
alloc,
data_model::{core::DataModel, objects::DataModelHandler},
@ -33,18 +37,17 @@ use crate::{
core::SecureChannel,
},
transport::packet::Packet,
utils::select::EitherUnwrap,
Matter,
};
use super::{
exchange::{
Exchange, ExchangeCtr, ExchangeCtx, ExchangeId, ExchangeState, Notification, Role,
MAX_EXCHANGES,
Exchange, ExchangeCtr, ExchangeCtx, ExchangeId, ExchangeState, Role, MAX_EXCHANGES,
},
mrp::ReliableMessage,
packet::{MAX_RX_BUF_SIZE, MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE},
pipe::{Chunk, Pipe},
session::SessionMgr,
};
#[derive(Debug)]
@ -66,33 +69,216 @@ impl From<u8> for OpCodeDescriptor {
}
}
pub struct Transport<'a> {
matter: &'a Matter<'a>,
pub(crate) exchanges: RefCell<heapless::Vec<ExchangeCtx, MAX_EXCHANGES>>,
pub(crate) send_notification: Notification,
pub session_mgr: RefCell<SessionMgr>,
type TxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>;
type RxBuf = MaybeUninit<[u8; MAX_RX_BUF_SIZE]>;
type SxBuf = MaybeUninit<[u8; MAX_RX_STATUS_BUF_SIZE]>;
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub struct RunBuffers {
udp_bufs: crate::transport::udp::UdpBuffers,
run_bufs: PacketBuffers,
tx_buf: TxBuf,
rx_buf: RxBuf,
}
impl<'a> Transport<'a> {
#[cfg(any(feature = "std", feature = "embassy-net"))]
impl RunBuffers {
#[inline(always)]
pub fn new(matter: &'a Matter<'a>) -> Self {
let epoch = matter.epoch;
let rand = matter.rand;
pub const fn new() -> Self {
Self {
matter,
exchanges: RefCell::new(heapless::Vec::new()),
send_notification: Notification::new(),
session_mgr: RefCell::new(SessionMgr::new(epoch, rand)),
udp_bufs: crate::transport::udp::UdpBuffers::new(),
run_bufs: PacketBuffers::new(),
tx_buf: core::mem::MaybeUninit::uninit(),
rx_buf: core::mem::MaybeUninit::uninit(),
}
}
}
pub fn matter(&self) -> &'a Matter<'a> {
self.matter
pub struct PacketBuffers {
tx: [TxBuf; MAX_EXCHANGES],
rx: [RxBuf; MAX_EXCHANGES],
sx: [SxBuf; MAX_EXCHANGES],
}
pub async fn initiate(&self, _fabric_id: u64, _node_id: u64) -> Result<Exchange<'a>, Error> {
unimplemented!()
impl PacketBuffers {
const TX_ELEM: TxBuf = MaybeUninit::uninit();
const RX_ELEM: RxBuf = MaybeUninit::uninit();
const SX_ELEM: SxBuf = MaybeUninit::uninit();
const TX_INIT: [TxBuf; MAX_EXCHANGES] = [Self::TX_ELEM; MAX_EXCHANGES];
const RX_INIT: [RxBuf; MAX_EXCHANGES] = [Self::RX_ELEM; MAX_EXCHANGES];
const SX_INIT: [SxBuf; MAX_EXCHANGES] = [Self::SX_ELEM; MAX_EXCHANGES];
#[inline(always)]
pub const fn new() -> Self {
Self {
tx: Self::TX_INIT,
rx: Self::RX_INIT,
sx: Self::SX_INIT,
}
}
}
impl<'a> Matter<'a> {
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub async fn run<D, H>(
&self,
stack: &crate::transport::network::NetworkStack<D>,
buffers: &mut RunBuffers,
dev_comm: CommissioningData,
handler: &H,
) -> Result<(), Error>
where
D: crate::transport::network::NetworkStackDriver,
H: DataModelHandler,
{
let udp = crate::transport::udp::UdpListener::new(
stack,
crate::transport::network::SocketAddr::new(
crate::transport::network::IpAddr::V6(
crate::transport::network::Ipv6Addr::UNSPECIFIED,
),
self.port,
),
&mut buffers.udp_bufs,
)
.await?;
let tx_pipe = Pipe::new(unsafe { buffers.tx_buf.assume_init_mut() });
let rx_pipe = Pipe::new(unsafe { buffers.rx_buf.assume_init_mut() });
let tx_pipe = &tx_pipe;
let rx_pipe = &rx_pipe;
let udp = &udp;
let run_bufs = &mut buffers.run_bufs;
let mut tx = pin!(async move {
loop {
{
let mut data = tx_pipe.data.lock().await;
if let Some(chunk) = data.chunk {
udp.send(chunk.addr.unwrap_udp(), &data.buf[chunk.start..chunk.end])
.await?;
data.chunk = None;
tx_pipe.data_consumed_notification.signal(());
}
}
tx_pipe.data_supplied_notification.wait().await;
}
});
let mut rx = pin!(async move {
loop {
{
let mut data = rx_pipe.data.lock().await;
if data.chunk.is_none() {
let (len, addr) = udp.recv(data.buf).await?;
data.chunk = Some(Chunk {
start: 0,
end: len,
addr: crate::transport::network::Address::Udp(addr),
});
rx_pipe.data_supplied_notification.signal(());
}
}
rx_pipe.data_consumed_notification.wait().await;
}
});
let mut run = pin!(async move {
self.run_piped(run_bufs, tx_pipe, rx_pipe, dev_comm, handler)
.await
});
embassy_futures::select::select3(&mut tx, &mut rx, &mut run)
.await
.unwrap()
}
pub async fn run_piped<H>(
&self,
buffers: &mut PacketBuffers,
tx_pipe: &Pipe<'_>,
rx_pipe: &Pipe<'_>,
dev_comm: CommissioningData,
handler: &H,
) -> Result<(), Error>
where
H: DataModelHandler,
{
info!("Running Matter transport");
let buf = unsafe { buffers.rx[0].assume_init_mut() };
if self.start_comissioning(dev_comm, buf)? {
info!("Comissioning started");
}
let construction_notification = Notification::new();
let mut rx = pin!(self.handle_rx(buffers, rx_pipe, &construction_notification, handler));
let mut tx = pin!(self.handle_tx(tx_pipe));
select(&mut rx, &mut tx).await.unwrap()
}
#[inline(always)]
async fn handle_rx<H>(
&self,
buffers: &mut PacketBuffers,
rx_pipe: &Pipe<'_>,
construction_notification: &Notification,
handler: &H,
) -> Result<(), Error>
where
H: DataModelHandler,
{
info!("Creating queue for {} exchanges", 1);
let channel = Channel::<NoopRawMutex, _, 1>::new();
info!("Creating {} handlers", MAX_EXCHANGES);
let mut handlers = heapless::Vec::<_, MAX_EXCHANGES>::new();
info!("Handlers size: {}", core::mem::size_of_val(&handlers));
// Unsafely allow mutable aliasing in the packet pools by different indices
let pools: *mut PacketBuffers = buffers;
for index in 0..MAX_EXCHANGES {
let channel = &channel;
let handler_id = index;
let pools = unsafe { pools.as_mut() }.unwrap();
let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() };
let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() };
let sx_buf = unsafe { pools.sx[handler_id].assume_init_mut() };
handlers
.push(self.exchange_handler(tx_buf, rx_buf, sx_buf, handler_id, channel, handler))
.map_err(|_| ())
.unwrap();
}
let mut rx = pin!(self.handle_rx_multiplex(rx_pipe, construction_notification, &channel));
let result = select(&mut rx, select_slice(&mut handlers)).await;
if let Either::First(result) = result {
if let Err(e) = &result {
error!("Exitting RX loop due to an error: {:?}", e);
}
result?;
}
Ok(())
}
#[inline(always)]
@ -230,11 +416,11 @@ impl<'a> Transport<'a> {
match rx.get_proto_id() {
PROTO_ID_SECURE_CHANNEL => {
let sc = SecureChannel::new(self.matter());
let sc = SecureChannel::new(self);
sc.handle(&mut exchange, &mut rx, &mut tx).await?;
self.matter().notify_changed();
self.notify_changed();
}
PROTO_ID_INTERACTION_MODEL => {
let dm = DataModel::new(handler);
@ -244,7 +430,7 @@ impl<'a> Transport<'a> {
dm.handle(&mut exchange, &mut rx, &mut tx, &mut rx_status)
.await?;
self.matter().notify_changed();
self.notify_changed();
}
other => {
error!("Unknown Proto-ID: {}", other);
@ -254,6 +440,11 @@ impl<'a> Transport<'a> {
Ok(())
}
pub fn reset_transport(&self) {
self.exchanges.borrow_mut().clear();
self.session_mgr.borrow_mut().reset();
}
pub fn process_rx<'r>(
&'r self,
construction_notification: &'r Notification,
@ -297,7 +488,7 @@ impl<'a> Transport<'a> {
}
}
self.matter().notify_changed();
self.notify_changed();
}
}
@ -305,13 +496,13 @@ impl<'a> Transport<'a> {
let constructor = ExchangeCtr {
exchange: Exchange {
id: ctx.id.clone(),
transport: self,
matter: self,
notification: Notification::new(),
},
construction_notification,
};
self.matter().notify_changed();
self.notify_changed();
Ok(Some(constructor))
} else if src_rx.proto.proto_id == PROTO_ID_SECURE_CHANNEL
@ -338,7 +529,7 @@ impl<'a> Transport<'a> {
}
}
self.matter().notify_changed();
self.notify_changed();
Ok(None)
}
@ -354,7 +545,7 @@ impl<'a> Transport<'a> {
let mut exchanges = self.exchanges.borrow_mut();
let ctx = Self::get(&mut exchanges, exchange_id).unwrap();
let ctx = ExchangeCtx::get(&mut exchanges, exchange_id).unwrap();
let state = &mut ctx.state;
@ -397,11 +588,11 @@ impl<'a> Transport<'a> {
// ..
// }
| ExchangeState::Complete { .. } // | ExchangeState::CompleteAcknowledge { .. }
) || ctx.mrp.is_ack_ready(*self.matter.borrow())
) || ctx.mrp.is_ack_ready(*self.borrow())
});
if let Some(ctx) = ctx {
self.matter().notify_changed();
self.notify_changed();
let state = &mut ctx.state;
@ -460,7 +651,7 @@ impl<'a> Transport<'a> {
dest_tx.log("Sending packet");
self.pre_send(ctx, dest_tx)?;
self.matter().notify_changed();
self.notify_changed();
return Ok(true);
}
@ -500,7 +691,7 @@ impl<'a> Transport<'a> {
let session = session_mgr.mut_by_index(sess_index).unwrap();
// Decrypt the message
session.recv(self.matter.epoch, rx)?;
session.recv(self.epoch, rx)?;
// Get the exchange
// TODO: Handle out of space
@ -513,7 +704,7 @@ impl<'a> Transport<'a> {
)?;
// Message Reliability Protocol
exch.mrp.recv(rx, self.matter.epoch)?;
exch.mrp.recv(rx, self.epoch)?;
Ok((exch, new))
}
@ -576,11 +767,4 @@ impl<'a> Transport<'a> {
Err(ErrorCode::NoExchange.into())
}
}
pub(crate) fn get<'r>(
exchanges: &'r mut heapless::Vec<ExchangeCtx, MAX_EXCHANGES>,
id: &ExchangeId,
) -> Option<&'r mut ExchangeCtx> {
exchanges.iter_mut().find(|exchange| exchange.id == *id)
}
}

View file

@ -1,13 +1,11 @@
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use crate::{
acl::Accessor,
error::{Error, ErrorCode},
utils::select::Notification,
Matter,
};
use super::{
core::Transport,
mrp::ReliableMessage,
network::Address,
packet::Packet,
@ -16,8 +14,6 @@ use super::{
pub const MAX_EXCHANGES: usize = 8;
pub type Notification = embassy_sync::signal::Signal<NoopRawMutex, ()>;
#[derive(Debug, PartialEq, Eq, Copy, Clone, Default)]
pub(crate) enum Role {
#[default]
@ -43,6 +39,15 @@ pub(crate) struct ExchangeCtx {
pub(crate) state: ExchangeState,
}
impl ExchangeCtx {
pub(crate) fn get<'r>(
exchanges: &'r mut heapless::Vec<ExchangeCtx, MAX_EXCHANGES>,
id: &ExchangeId,
) -> Option<&'r mut ExchangeCtx> {
exchanges.iter_mut().find(|exchange| exchange.id == *id)
}
}
#[derive(Debug, Clone)]
pub(crate) enum ExchangeState {
Construction {
@ -144,7 +149,7 @@ impl SessionId {
}
pub struct Exchange<'a> {
pub(crate) id: ExchangeId,
pub(crate) transport: &'a Transport<'a>,
pub(crate) matter: &'a Matter<'a>,
pub(crate) notification: Notification,
}
@ -153,21 +158,8 @@ impl<'a> Exchange<'a> {
&self.id
}
pub fn matter(&self) -> &Matter<'a> {
self.transport.matter()
}
pub fn transport(&self) -> &Transport<'a> {
self.transport
}
pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
self.with_session(|sess| {
Ok(Accessor::for_session(
sess,
&self.transport.matter().acl_mgr,
))
})
self.with_session(|sess| Ok(Accessor::for_session(sess, &self.matter.acl_mgr)))
}
pub fn with_session_mut<F, T>(&self, f: F) -> Result<T, Error>
@ -175,7 +167,7 @@ impl<'a> Exchange<'a> {
F: FnOnce(&mut Session) -> Result<T, Error>,
{
self.with_ctx(|_self, ctx| {
let mut session_mgr = _self.transport.session_mgr.borrow_mut();
let mut session_mgr = _self.matter.session_mgr.borrow_mut();
let sess_index = session_mgr
.get(
@ -201,15 +193,11 @@ impl<'a> Exchange<'a> {
where
F: FnOnce(&mut SessionMgr) -> Result<T, Error>,
{
let mut session_mgr = self.transport.session_mgr.borrow_mut();
let mut session_mgr = self.matter.session_mgr.borrow_mut();
f(&mut session_mgr)
}
pub async fn initiate(&mut self, fabric_id: u64, node_id: u64) -> Result<Exchange<'a>, Error> {
self.transport.initiate(fabric_id, node_id).await
}
pub async fn acknowledge(&mut self) -> Result<(), Error> {
let wait = self.with_ctx_mut(|_self, ctx| {
if !matches!(ctx.state, ExchangeState::Active) {
@ -222,7 +210,7 @@ impl<'a> Exchange<'a> {
ctx.state = ExchangeState::Acknowledge {
notification: &_self.notification as *const _,
};
_self.transport.send_notification.signal(());
_self.matter.send_notification.signal(());
Ok(true)
}
@ -249,7 +237,7 @@ impl<'a> Exchange<'a> {
rx: rx as *mut _,
notification: &_self.notification as *const _,
};
_self.transport.send_notification.signal(());
_self.matter.send_notification.signal(());
Ok(())
})?;
@ -275,7 +263,7 @@ impl<'a> Exchange<'a> {
tx: tx as *const _,
notification: &_self.notification as *const _,
};
_self.transport.send_notification.signal(());
_self.matter.send_notification.signal(());
Ok(())
})?;
@ -289,9 +277,9 @@ impl<'a> Exchange<'a> {
where
F: FnOnce(&Self, &ExchangeCtx) -> Result<T, Error>,
{
let mut exchanges = self.transport.exchanges.borrow_mut();
let mut exchanges = self.matter.exchanges.borrow_mut();
let exchange = Transport::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO
let exchange = ExchangeCtx::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO
f(self, exchange)
}
@ -300,9 +288,9 @@ impl<'a> Exchange<'a> {
where
F: FnOnce(&mut Self, &mut ExchangeCtx) -> Result<T, Error>,
{
let mut exchanges = self.transport.exchanges.borrow_mut();
let mut exchanges = self.matter.exchanges.borrow_mut();
let exchange = Transport::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO
let exchange = ExchangeCtx::get(&mut exchanges, &self.id).ok_or(ErrorCode::NoExchange)?; // TODO
f(self, exchange)
}
@ -312,7 +300,7 @@ impl<'a> Drop for Exchange<'a> {
fn drop(&mut self) {
let _ = self.with_ctx_mut(|_self, ctx| {
ctx.state = ExchangeState::Closed;
_self.transport.send_notification.signal(());
_self.matter.send_notification.signal(());
Ok(())
});

View file

@ -24,6 +24,5 @@ pub mod packet;
pub mod pipe;
pub mod plain_hdr;
pub mod proto_hdr;
pub mod runner;
pub mod session;
pub mod udp;

View file

@ -1,319 +0,0 @@
/*
*
* Copyright (c) 2020-2022 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use core::{mem::MaybeUninit, pin::pin};
use embassy_futures::select::{select, select_slice, Either};
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel};
use log::{error, info};
use crate::{data_model::objects::DataModelHandler, CommissioningData, Matter};
use crate::{error::Error, transport::packet::MAX_RX_BUF_SIZE, utils::select::EitherUnwrap};
use super::{
core::Transport,
exchange::{Notification, MAX_EXCHANGES},
packet::{MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE},
pipe::{Chunk, Pipe},
};
type TxBuf = MaybeUninit<[u8; MAX_TX_BUF_SIZE]>;
type RxBuf = MaybeUninit<[u8; MAX_RX_BUF_SIZE]>;
type SxBuf = MaybeUninit<[u8; MAX_RX_STATUS_BUF_SIZE]>;
struct PacketPools {
tx: [TxBuf; MAX_EXCHANGES],
rx: [RxBuf; MAX_EXCHANGES],
sx: [SxBuf; MAX_EXCHANGES],
}
impl PacketPools {
const TX_ELEM: TxBuf = MaybeUninit::uninit();
const RX_ELEM: RxBuf = MaybeUninit::uninit();
const SX_ELEM: SxBuf = MaybeUninit::uninit();
const TX_INIT: [TxBuf; MAX_EXCHANGES] = [Self::TX_ELEM; MAX_EXCHANGES];
const RX_INIT: [RxBuf; MAX_EXCHANGES] = [Self::RX_ELEM; MAX_EXCHANGES];
const SX_INIT: [SxBuf; MAX_EXCHANGES] = [Self::SX_ELEM; MAX_EXCHANGES];
#[inline(always)]
pub const fn new() -> Self {
Self {
tx: Self::TX_INIT,
rx: Self::RX_INIT,
sx: Self::SX_INIT,
}
}
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub struct AllUdpBuffers {
transport: TransportUdpBuffers,
mdns: crate::mdns::MdnsUdpBuffers,
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
impl AllUdpBuffers {
#[inline(always)]
pub const fn new() -> Self {
Self {
transport: TransportUdpBuffers::new(),
mdns: crate::mdns::MdnsUdpBuffers::new(),
}
}
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub struct TransportUdpBuffers {
udp: crate::transport::udp::UdpBuffers,
tx_buf: TxBuf,
rx_buf: RxBuf,
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
impl TransportUdpBuffers {
#[inline(always)]
pub const fn new() -> Self {
Self {
udp: crate::transport::udp::UdpBuffers::new(),
tx_buf: core::mem::MaybeUninit::uninit(),
rx_buf: core::mem::MaybeUninit::uninit(),
}
}
}
/// This struct implements an executor-agnostic option to run the Matter transport stack end-to-end.
///
/// Since it is not possible to use executor tasks spawning in an executor-agnostic way (yet),
/// the async loops are arranged as one giant future. Therefore, the cost is a slightly slower execution
/// due to the generated future being relatively big and deeply nested.
///
/// Users are free to implement their own async execution loop, by utilizing the `Transport`
/// struct directly with their async executor of choice.
pub struct TransportRunner<'a> {
transport: Transport<'a>,
pools: PacketPools,
}
impl<'a> TransportRunner<'a> {
#[inline(always)]
pub fn new(matter: &'a Matter<'a>) -> Self {
Self::wrap(Transport::new(matter))
}
#[inline(always)]
pub const fn wrap(transport: Transport<'a>) -> Self {
Self {
transport,
pools: PacketPools::new(),
}
}
pub fn transport(&self) -> &Transport {
&self.transport
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub async fn run_udp_all<D, H>(
&mut self,
stack: &crate::transport::network::NetworkStack<D>,
mdns: &crate::mdns::MdnsService<'_>,
buffers: &mut AllUdpBuffers,
dev_comm: CommissioningData,
handler: &H,
) -> Result<(), Error>
where
D: crate::transport::network::NetworkStackDriver,
H: DataModelHandler,
{
let mut mdns_runner = crate::mdns::MdnsRunner::new(mdns);
let mut mdns = pin!(mdns_runner.run_udp(stack, &mut buffers.mdns));
let mut transport = pin!(self.run_udp(stack, &mut buffers.transport, dev_comm, handler));
embassy_futures::select::select(&mut mdns, &mut transport)
.await
.unwrap()
}
#[cfg(any(feature = "std", feature = "embassy-net"))]
pub async fn run_udp<D, H>(
&mut self,
stack: &crate::transport::network::NetworkStack<D>,
buffers: &mut TransportUdpBuffers,
dev_comm: CommissioningData,
handler: &H,
) -> Result<(), Error>
where
D: crate::transport::network::NetworkStackDriver,
H: DataModelHandler,
{
let udp = crate::transport::udp::UdpListener::new(
stack,
crate::transport::network::SocketAddr::new(
crate::transport::network::IpAddr::V6(
crate::transport::network::Ipv6Addr::UNSPECIFIED,
),
self.transport.matter().port,
),
&mut buffers.udp,
)
.await?;
let tx_pipe = Pipe::new(unsafe { buffers.tx_buf.assume_init_mut() });
let rx_pipe = Pipe::new(unsafe { buffers.rx_buf.assume_init_mut() });
let tx_pipe = &tx_pipe;
let rx_pipe = &rx_pipe;
let udp = &udp;
let mut tx = pin!(async move {
loop {
{
let mut data = tx_pipe.data.lock().await;
if let Some(chunk) = data.chunk {
udp.send(chunk.addr.unwrap_udp(), &data.buf[chunk.start..chunk.end])
.await?;
data.chunk = None;
tx_pipe.data_consumed_notification.signal(());
}
}
tx_pipe.data_supplied_notification.wait().await;
}
});
let mut rx = pin!(async move {
loop {
{
let mut data = rx_pipe.data.lock().await;
if data.chunk.is_none() {
let (len, addr) = udp.recv(data.buf).await?;
data.chunk = Some(Chunk {
start: 0,
end: len,
addr: crate::transport::network::Address::Udp(addr),
});
rx_pipe.data_supplied_notification.signal(());
}
}
rx_pipe.data_consumed_notification.wait().await;
}
});
let mut run = pin!(async move { self.run(tx_pipe, rx_pipe, dev_comm, handler).await });
embassy_futures::select::select3(&mut tx, &mut rx, &mut run)
.await
.unwrap()
}
pub async fn run<H>(
&mut self,
tx_pipe: &Pipe<'_>,
rx_pipe: &Pipe<'_>,
dev_comm: CommissioningData,
handler: &H,
) -> Result<(), Error>
where
H: DataModelHandler,
{
info!("Running Matter transport");
let buf = unsafe { self.pools.rx[0].assume_init_mut() };
if self.transport.matter().start_comissioning(dev_comm, buf)? {
info!("Comissioning started");
}
let construction_notification = Notification::new();
let mut rx = pin!(Self::handle_rx(
&self.transport,
&mut self.pools,
rx_pipe,
&construction_notification,
handler
));
let mut tx = pin!(self.transport.handle_tx(tx_pipe));
select(&mut rx, &mut tx).await.unwrap()
}
#[inline(always)]
async fn handle_rx<H>(
transport: &Transport<'_>,
pools: &mut PacketPools,
rx_pipe: &Pipe<'_>,
construction_notification: &Notification,
handler: &H,
) -> Result<(), Error>
where
H: DataModelHandler,
{
info!("Creating queue for {} exchanges", 1);
let channel = Channel::<NoopRawMutex, _, 1>::new();
info!("Creating {} handlers", MAX_EXCHANGES);
let mut handlers = heapless::Vec::<_, MAX_EXCHANGES>::new();
info!("Handlers size: {}", core::mem::size_of_val(&handlers));
// Unsafely allow mutable aliasing in the packet pools by different indices
let pools: *mut PacketPools = pools;
for index in 0..MAX_EXCHANGES {
let channel = &channel;
let handler_id = index;
let pools = unsafe { pools.as_mut() }.unwrap();
let tx_buf = unsafe { pools.tx[handler_id].assume_init_mut() };
let rx_buf = unsafe { pools.rx[handler_id].assume_init_mut() };
let sx_buf = unsafe { pools.sx[handler_id].assume_init_mut() };
handlers
.push(
transport
.exchange_handler(tx_buf, rx_buf, sx_buf, handler_id, channel, handler),
)
.map_err(|_| ())
.unwrap();
}
let mut rx =
pin!(transport.handle_rx_multiplex(rx_pipe, &construction_notification, &channel));
let result = select(&mut rx, select_slice(&mut handlers)).await;
if let Either::First(result) = result {
if let Err(e) = &result {
error!("Exitting RX loop due to an error: {:?}", e);
}
result?;
}
Ok(())
}
}

View file

@ -306,6 +306,11 @@ impl SessionMgr {
}
}
pub fn reset(&mut self) {
self.sessions.clear();
self.next_sess_id = 1;
}
pub fn mut_by_index(&mut self, index: usize) -> Option<&mut Session> {
self.sessions.get_mut(index).and_then(Option::as_mut)
}

View file

@ -16,10 +16,10 @@
*/
#[cfg(all(feature = "std", not(feature = "embassy-net")))]
pub use async_io::*;
pub use self::async_io::*;
#[cfg(feature = "embassy-net")]
pub use embassy_net::*;
pub use self::embassy_net::*;
#[cfg(feature = "std")]
pub mod async_io {
@ -88,7 +88,7 @@ pub mod async_io {
#[cfg(target_os = "espidf")]
{
fn esp_setsockopt<T>(
socket: &mut UdpSocket,
socket: &UdpSocket,
proto: u32,
option: u32,
value: T,
@ -119,7 +119,7 @@ pub mod async_io {
};
esp_setsockopt(
&mut self.0,
&mut self.0.get_ref(),
esp_idf_sys::IPPROTO_IP,
esp_idf_sys::IP_ADD_MEMBERSHIP,
mreq,

View file

@ -48,16 +48,15 @@ use matter::{
secure_channel::{self, common::PROTO_ID_SECURE_CHANNEL, spake2p::VerifierData},
tlv::{TLVWriter, TagType, ToTLV},
transport::{
exchange::Notification,
core::PacketBuffers,
packet::{Packet, MAX_RX_BUF_SIZE, MAX_TX_BUF_SIZE},
pipe::Pipe,
runner::TransportRunner,
},
transport::{
network::Address,
session::{CaseDetails, CloneData, NocCatIds, SessionMode},
},
utils::select::EitherUnwrap,
utils::select::{EitherUnwrap, Notification},
CommissioningData, Matter, MATTER_PORT,
};
@ -248,7 +247,7 @@ impl<'a> ImEngine<'a> {
input: &[&ImInput],
out: &mut heapless::Vec<ImOutput, N>,
) -> Result<(), Error> {
let mut runner = TransportRunner::new(&self.matter);
self.matter.reset_transport();
let clone_data = CloneData::new(
IM_ENGINE_REMOTE_PEER_ID,
@ -259,8 +258,8 @@ impl<'a> ImEngine<'a> {
SessionMode::Case(CaseDetails::new(1, &self.cat_ids)),
);
let sess_idx = runner
.transport()
let sess_idx = self
.matter
.session_mgr
.borrow_mut()
.clone_session(&clone_data)
@ -281,10 +280,9 @@ impl<'a> ImEngine<'a> {
let rx_pipe_buf = &mut rx_pipe_buf;
let handler = &handler;
let runner = &mut runner;
let mut msg_ctr = runner
.transport()
let mut msg_ctr = self
.matter
.session_mgr
.borrow_mut()
.mut_by_index(sess_idx)
@ -294,9 +292,13 @@ impl<'a> ImEngine<'a> {
let resp_notif = Notification::new();
let resp_notif = &resp_notif;
let mut buffers = PacketBuffers::new();
let buffers = &mut buffers;
embassy_futures::block_on(async move {
select3(
runner.run(
self.matter.run_piped(
buffers,
tx_pipe,
rx_pipe,
CommissioningData {

View file

@ -100,7 +100,7 @@ fn test_timed_cmd_success() {
ImEngine::timed_commands(
input,
&TimedInvResponse::TransactionSuccess(expected),
400,
2000,
0,
true,
);
@ -130,7 +130,7 @@ fn test_timed_cmd_timedout_mismatch() {
ImEngine::timed_commands(
input,
&TimedInvResponse::TransactionError(IMStatusCode::TimedRequestMisMatch),
400,
2000,
0,
false,
);

View file

@ -2,6 +2,7 @@
name = "matter_macro_derive"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]

View file

@ -1,7 +1,8 @@
[package]
name = "tlv_tool"
version = "0.1.0"
edition = "2018"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html