diff --git a/matter/src/data_model/core.rs b/matter/src/data_model/core/mod.rs similarity index 68% rename from matter/src/data_model/core.rs rename to matter/src/data_model/core/mod.rs index 55a0e9e..1b257ff 100644 --- a/matter/src/data_model/core.rs +++ b/matter/src/data_model/core/mod.rs @@ -15,6 +15,8 @@ * limitations under the License. */ +use self::subscribe::SubsCtx; + use super::{ cluster_basic_information::BasicInfoConfig, device_types::device_type_add_root_node, @@ -28,7 +30,7 @@ use crate::{ fabric::FabricMgr, interaction_model::{ command::CommandReq, - core::IMStatusCode, + core::{IMStatusCode, OpCode}, messages::{ ib::{self, AttrData, DataVersionFilter}, msg::{self, InvReq, ReadReq, WriteReq}, @@ -37,8 +39,11 @@ use crate::{ InteractionConsumer, Transaction, }, secure_channel::pake::PaseMgr, - tlv::{TLVArray, TLVWriter, TagType, ToTLV}, - transport::session::{Session, SessionMode}, + tlv::{self, FromTLV, TLVArray, TLVWriter, TagType, ToTLV}, + transport::{ + proto_demux::ResponseRequired, + session::{Session, SessionMode}, + }, }; use log::{error, info}; use std::sync::{Arc, RwLock}; @@ -76,17 +81,6 @@ impl DataModel { Ok(dm) } - pub fn read_attribute_raw( - &self, - endpoint: u16, - cluster: u32, - attr: u16, - ) -> Result { - let node = self.node.read().unwrap(); - let cluster = node.get_cluster(endpoint, cluster)?; - cluster.base().read_attribute_raw(attr).map(|a| a.clone()) - } - // Encode a write attribute from a path that may or may not be wildcard fn handle_write_attr_path( node: &mut Node, @@ -153,42 +147,6 @@ impl DataModel { } } - // Encode a read attribute from a path that may or may not be wildcard - fn handle_read_attr_path( - node: &Node, - accessor: &Accessor, - attr_encoder: &mut AttrReadEncoder, - attr_details: &mut AttrDetails, - ) { - let path = attr_encoder.path; - // Skip error reporting for wildcard paths, don't for concrete paths - attr_encoder.skip_error(path.is_wildcard()); - - let result = node.for_each_attribute(&path, |path, c| { - // Ignore processing if data filter matches. - // For a wildcard attribute, this may end happening unnecessarily for all attributes, although - // a single skip for the cluster is sufficient. That requires us to replace this for_each with a - // for_each_cluster - let cluster_data_ver = c.base().get_dataver(); - if Self::data_filter_matches(&attr_encoder.data_ver_filters, path, cluster_data_ver) { - return Ok(()); - } - - attr_details.attr_id = path.leaf.unwrap_or_default() as u16; - // Overwrite the previous path with the concrete path - attr_encoder.set_path(*path); - // Set the cluster's data version - attr_encoder.set_data_ver(cluster_data_ver); - let mut access_req = AccessReq::new(accessor, path, Access::READ); - Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details); - Ok(()) - }); - if let Err(e) = result { - // We hit this only if this is a non-wildcard path - attr_encoder.encode_status(e, 0); - } - } - // Handle command from a path that may or may not be wildcard fn handle_command_path(node: &mut Node, cmd_req: &mut CommandReq) { let wildcard = cmd_req.cmd.path.is_wildcard(); @@ -266,6 +224,15 @@ impl DataModel { } } +pub mod read; +pub mod subscribe; + +/// Type of Resume Request +enum ResumeReq { + Subscribe(subscribe::SubsCtx), + Read(read::ResumeReadReq), +} + impl objects::ChangeConsumer for DataModel { fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> { endpoint.add_cluster(DescriptorCluster::new(id, self.clone())?)?; @@ -294,45 +261,22 @@ impl InteractionConsumer for DataModel { fn consume_read_attr( &self, - read_req: &ReadReq, + rx_buf: &[u8], trans: &mut Transaction, tw: &mut TLVWriter, ) -> Result<(), Error> { - let mut attr_encoder = AttrReadEncoder::new(tw); - if let Some(filters) = &read_req.dataver_filters { - attr_encoder.set_data_ver_filters(filters); - } - - let mut attr_details = AttrDetails { - // This will be updated internally - attr_id: 0, - // This will be updated internally - list_index: None, - // This will be updated internally - fab_idx: 0, - fab_filter: read_req.fabric_filtered, - }; - - if let Some(attr_requests) = &read_req.attr_requests { - let accessor = self.sess_to_accessor(trans.session); - let node = self.node.read().unwrap(); - attr_encoder - .tw - .start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; - - for attr_path in attr_requests.iter() { - attr_encoder.set_path(attr_path.to_gp()); - // Extract the attr_path fields into various structures - attr_details.list_index = attr_path.list_index; - attr_details.fab_idx = accessor.fab_idx; - DataModel::handle_read_attr_path( - &node, - &accessor, - &mut attr_encoder, - &mut attr_details, - ); + let mut resume_from = None; + let root = tlv::get_root_node(rx_buf)?; + let req = ReadReq::from_tlv(&root)?; + self.handle_read_req(&req, trans, tw, &mut resume_from)?; + if resume_from.is_some() { + // This is a multi-hop read transaction, remember this read request + let resume = read::ResumeReadReq::new(rx_buf, &resume_from)?; + if !trans.exch.is_data_none() { + error!("Exchange data already set, and multi-hop read"); + return Err(Error::InvalidState); } - tw.end_container()?; + trans.exch.set_data_boxed(Box::new(ResumeReq::Read(resume))); } Ok(()) } @@ -367,61 +311,44 @@ impl InteractionConsumer for DataModel { Ok(()) } -} -/// Encoder for generating a response to a read request -pub struct AttrReadEncoder<'a, 'b, 'c> { - tw: &'a mut TLVWriter<'b, 'c>, - data_ver: u32, - path: GenericPath, - skip_error: bool, - data_ver_filters: Option<&'a TLVArray<'a, DataVersionFilter>>, -} + fn consume_status_report( + &self, + req: &msg::StatusResp, + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + if let Some(mut resume) = trans.exch.take_data_boxed::() { + let result = match *resume { + ResumeReq::Read(ref mut read) => self.handle_resume_read(read, trans, tw)?, -impl<'a, 'b, 'c> AttrReadEncoder<'a, 'b, 'c> { - pub fn new(tw: &'a mut TLVWriter<'b, 'c>) -> Self { - Self { - tw, - data_ver: 0, - skip_error: false, - path: Default::default(), - data_ver_filters: None, + ResumeReq::Subscribe(ref mut ctx) => ctx.handle_status_report(trans, tw, self)?, + }; + trans.exch.set_data_boxed(resume); + Ok(result) + } else { + // Nothing to do for now + trans.complete(); + info!("Received status report with status {:?}", req.status); + Ok((OpCode::Reserved, ResponseRequired::No)) } } - pub fn skip_error(&mut self, skip: bool) { - self.skip_error = skip; - } - - pub fn set_data_ver(&mut self, data_ver: u32) { - self.data_ver = data_ver; - } - - pub fn set_data_ver_filters(&mut self, filters: &'a TLVArray<'a, DataVersionFilter>) { - self.data_ver_filters = Some(filters); - } - - pub fn set_path(&mut self, path: GenericPath) { - self.path = path; - } -} - -impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> { - fn encode(&mut self, value: EncodeValue) { - let resp = ib::AttrResp::Data(ib::AttrData::new( - Some(self.data_ver), - ib::AttrPath::new(&self.path), - value, - )); - let _ = resp.to_tlv(self.tw, TagType::Anonymous); - } - - fn encode_status(&mut self, status: IMStatusCode, cluster_status: u16) { - if !self.skip_error { - let resp = - ib::AttrResp::Status(ib::AttrStatus::new(&self.path, status, cluster_status)); - let _ = resp.to_tlv(self.tw, TagType::Anonymous); + fn consume_subscribe( + &self, + rx_buf: &[u8], + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + if !trans.exch.is_data_none() { + error!("Exchange data already set!"); + return Err(Error::InvalidState); } + let ctx = SubsCtx::new(rx_buf, trans, tw, self)?; + trans + .exch + .set_data_boxed(Box::new(ResumeReq::Subscribe(ctx))); + Ok((OpCode::ReportData, ResponseRequired::Yes)) } } diff --git a/matter/src/data_model/core/read.rs b/matter/src/data_model/core/read.rs new file mode 100644 index 0000000..eb12a5b --- /dev/null +++ b/matter/src/data_model/core/read.rs @@ -0,0 +1,319 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::{ + acl::{AccessReq, Accessor}, + data_model::{core::DataModel, objects::*}, + error::*, + interaction_model::{ + core::{IMStatusCode, OpCode}, + messages::{ + ib::{self, DataVersionFilter}, + msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs, ReportDataTag::SupressResponse}, + GenericPath, + }, + Transaction, + }, + tlv::{self, FromTLV, TLVArray, TLVWriter, TagType, ToTLV}, + transport::{packet::Packet, proto_demux::ResponseRequired}, + utils::writebuf::WriteBuf, + wb_shrink, wb_unshrink, +}; +use log::error; + +/// Encoder for generating a response to a read request +pub struct AttrReadEncoder<'a, 'b, 'c> { + tw: &'a mut TLVWriter<'b, 'c>, + data_ver: u32, + path: GenericPath, + skip_error: bool, + data_ver_filters: Option<&'a TLVArray<'a, DataVersionFilter>>, + is_buffer_full: bool, +} + +impl<'a, 'b, 'c> AttrReadEncoder<'a, 'b, 'c> { + pub fn new(tw: &'a mut TLVWriter<'b, 'c>) -> Self { + Self { + tw, + data_ver: 0, + skip_error: false, + path: Default::default(), + data_ver_filters: None, + is_buffer_full: false, + } + } + + pub fn skip_error(&mut self, skip: bool) { + self.skip_error = skip; + } + + pub fn set_data_ver(&mut self, data_ver: u32) { + self.data_ver = data_ver; + } + + pub fn set_data_ver_filters(&mut self, filters: &'a TLVArray<'a, DataVersionFilter>) { + self.data_ver_filters = Some(filters); + } + + pub fn set_path(&mut self, path: GenericPath) { + self.path = path; + } + + pub fn is_buffer_full(&self) -> bool { + self.is_buffer_full + } +} + +impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> { + fn encode(&mut self, value: EncodeValue) { + let resp = ib::AttrResp::Data(ib::AttrData::new( + Some(self.data_ver), + ib::AttrPath::new(&self.path), + value, + )); + + let anchor = self.tw.get_tail(); + if resp.to_tlv(self.tw, TagType::Anonymous).is_err() { + self.is_buffer_full = true; + self.tw.rewind_to(anchor); + } + } + + fn encode_status(&mut self, status: IMStatusCode, cluster_status: u16) { + if !self.skip_error { + let resp = + ib::AttrResp::Status(ib::AttrStatus::new(&self.path, status, cluster_status)); + let _ = resp.to_tlv(self.tw, TagType::Anonymous); + } + } +} + +/// State to maintain when a Read Request needs to be resumed +/// resumed - the next chunk of the read needs to be returned +#[derive(Default)] +pub struct ResumeReadReq { + /// The Read Request Attribute Path that caused chunking, and this is the path + /// that needs to be resumed. + pub pending_req: Option>, + + /// The Attribute that couldn't be encoded because our buffer got full. The next chunk + /// will start encoding from this attribute onwards. + /// Note that given wildcard reads, one PendingPath in the member above can generated + /// multiple encode paths. Hence this has to be maintained separately. + pub resume_from: Option, +} +impl ResumeReadReq { + pub fn new(rx_buf: &[u8], resume_from: &Option) -> Result { + let mut packet = Packet::new_rx()?; + let dst = packet.as_borrow_slice(); + + let src_len = rx_buf.len(); + dst[..src_len].copy_from_slice(rx_buf); + packet.get_parsebuf()?.set_len(src_len); + Ok(ResumeReadReq { + pending_req: Some(packet), + resume_from: *resume_from, + }) + } +} + +impl DataModel { + pub fn read_attribute_raw( + &self, + endpoint: u16, + cluster: u32, + attr: u16, + ) -> Result { + let node = self.node.read().unwrap(); + let cluster = node.get_cluster(endpoint, cluster)?; + cluster.base().read_attribute_raw(attr).map(|a| a.clone()) + } + /// Encode a read attribute from a path that may or may not be wildcard + /// + /// If the buffer gets full while generating the read response, we will return + /// an Err(path), where the path is the path that we should resume from, for the next chunk. + /// This facilitates chunk management + fn handle_read_attr_path( + node: &Node, + accessor: &Accessor, + attr_encoder: &mut AttrReadEncoder, + attr_details: &mut AttrDetails, + resume_from: &mut Option, + ) -> Result<(), Error> { + let mut status = Ok(()); + let path = attr_encoder.path; + + // Skip error reporting for wildcard paths, don't for concrete paths + attr_encoder.skip_error(path.is_wildcard()); + + let result = node.for_each_attribute(&path, |path, c| { + // Ignore processing if data filter matches. + // For a wildcard attribute, this may end happening unnecessarily for all attributes, although + // a single skip for the cluster is sufficient. That requires us to replace this for_each with a + // for_each_cluster + let cluster_data_ver = c.base().get_dataver(); + if Self::data_filter_matches(&attr_encoder.data_ver_filters, path, cluster_data_ver) { + return Ok(()); + } + + // The resume_from indicates that this is the next chunk of a previous Read Request. In such cases, we + // need to skip until we hit this path. + if let Some(r) = resume_from { + // If resume_from is valid, and we haven't hit the resume_from yet, skip encoding + if r != path { + return Ok(()); + } else { + // Else, wipe out the resume_from so subsequent paths can be encoded + *resume_from = None; + } + } + + attr_details.attr_id = path.leaf.unwrap_or_default() as u16; + // Overwrite the previous path with the concrete path + attr_encoder.set_path(*path); + // Set the cluster's data version + attr_encoder.set_data_ver(cluster_data_ver); + let mut access_req = AccessReq::new(accessor, path, Access::READ); + Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details); + if attr_encoder.is_buffer_full() { + // Buffer is full, next time resume from this attribute + *resume_from = Some(*path); + status = Err(Error::NoSpace); + } + Ok(()) + }); + if let Err(e) = result { + // We hit this only if this is a non-wildcard path + attr_encoder.encode_status(e, 0); + } + status + } + + /// Process an array of Attribute Read Requests + /// + /// When the API returns the chunked read is on, if *resume_from is Some(x) otherwise + /// the read is complete + pub(super) fn handle_read_attr_array( + &self, + read_req: &ReadReq, + trans: &mut Transaction, + old_tw: &mut TLVWriter, + resume_from: &mut Option, + ) -> Result<(), Error> { + let old_wb = old_tw.get_buf(); + // Note, this function may be called from multiple places: a) an actual read + // request, a b) resumed read request, c) subscribe request or d) resumed subscribe + // request. Hopefully 18 is sufficient to address all those scenarios. + // + // This is the amount of space we reserve for other things to be attached towards + // the end + const RESERVE_SIZE: usize = 18; + let mut new_wb = wb_shrink!(old_wb, RESERVE_SIZE); + let mut tw = TLVWriter::new(&mut new_wb); + + let mut attr_encoder = AttrReadEncoder::new(&mut tw); + if let Some(filters) = &read_req.dataver_filters { + attr_encoder.set_data_ver_filters(filters); + } + + if let Some(attr_requests) = &read_req.attr_requests { + let accessor = self.sess_to_accessor(trans.session); + let mut attr_details = AttrDetails::new(accessor.fab_idx, read_req.fabric_filtered); + let node = self.node.read().unwrap(); + attr_encoder + .tw + .start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; + + let mut result = Ok(()); + for attr_path in attr_requests.iter() { + attr_encoder.set_path(attr_path.to_gp()); + // Extract the attr_path fields into various structures + attr_details.list_index = attr_path.list_index; + result = DataModel::handle_read_attr_path( + &node, + &accessor, + &mut attr_encoder, + &mut attr_details, + resume_from, + ); + if result.is_err() { + break; + } + } + // Now that all the read reports are captured, let's use the old_tw that is + // the full writebuf, and hopefully as all the necessary space to store this + wb_unshrink!(old_wb, new_wb); + old_tw.end_container()?; // Finish the AttrReports + + if result.is_err() { + // If there was an error, indicate chunking. The resume_read_req would have been + // already populated in the loop above. + old_tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?; + } else { + // A None resume_from indicates no chunking + *resume_from = None; + } + } + Ok(()) + } + + /// Handle a read request + /// + /// This could be called from an actual read request or a resumed read request. Subscription + /// requests do not come to this function. + /// When the API returns the chunked read is on, if *resume_from is Some(x) otherwise + /// the read is complete + pub fn handle_read_req( + &self, + read_req: &ReadReq, + trans: &mut Transaction, + tw: &mut TLVWriter, + resume_from: &mut Option, + ) -> Result<(OpCode, ResponseRequired), Error> { + tw.start_struct(TagType::Anonymous)?; + + self.handle_read_attr_array(read_req, trans, tw, resume_from)?; + + if resume_from.is_none() { + tw.bool(TagType::Context(SupressResponse as u8), true)?; + // Mark transaction complete, if not chunked + trans.complete(); + } + tw.end_container()?; + Ok((OpCode::ReportData, ResponseRequired::Yes)) + } + + /// Handle a resumed read request + pub fn handle_resume_read( + &self, + resume_read_req: &mut ResumeReadReq, + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + if let Some(packet) = resume_read_req.pending_req.as_mut() { + let rx_buf = packet.get_parsebuf()?.as_borrow_slice(); + let root = tlv::get_root_node(rx_buf)?; + let req = ReadReq::from_tlv(&root)?; + + self.handle_read_req(&req, trans, tw, &mut resume_read_req.resume_from) + } else { + // No pending req, is that even possible? + error!("This shouldn't have happened"); + Ok((OpCode::Reserved, ResponseRequired::No)) + } + } +} diff --git a/matter/src/data_model/core/subscribe.rs b/matter/src/data_model/core/subscribe.rs new file mode 100644 index 0000000..a65ee1f --- /dev/null +++ b/matter/src/data_model/core/subscribe.rs @@ -0,0 +1,142 @@ +/* + * + * Copyright (c) 2023 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::atomic::{AtomicU32, Ordering}; + +use crate::{ + error::Error, + interaction_model::{ + core::OpCode, + messages::{ + msg::{self, SubscribeReq, SubscribeResp}, + GenericPath, + }, + }, + tlv::{self, get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV}, + transport::proto_demux::ResponseRequired, +}; + +use super::{read::ResumeReadReq, DataModel, Transaction}; + +static SUBS_ID: AtomicU32 = AtomicU32::new(1); + +#[derive(PartialEq)] +enum SubsState { + Confirming, + Confirmed, +} + +pub struct SubsCtx { + state: SubsState, + id: u32, + resume_read_req: Option, +} + +impl SubsCtx { + pub fn new( + rx_buf: &[u8], + trans: &mut Transaction, + tw: &mut TLVWriter, + dm: &DataModel, + ) -> Result { + let root = get_root_node_struct(rx_buf)?; + let req = SubscribeReq::from_tlv(&root)?; + + let mut ctx = SubsCtx { + state: SubsState::Confirming, + // TODO + id: SUBS_ID.fetch_add(1, Ordering::SeqCst), + resume_read_req: None, + }; + + let mut resume_from = None; + ctx.do_read(&req, trans, tw, dm, &mut resume_from)?; + if resume_from.is_some() { + // This is a multi-hop read transaction, remember this read request + ctx.resume_read_req = Some(ResumeReadReq::new(rx_buf, &resume_from)?); + } + Ok(ctx) + } + + pub fn handle_status_report( + &mut self, + trans: &mut Transaction, + tw: &mut TLVWriter, + dm: &DataModel, + ) -> Result<(OpCode, ResponseRequired), Error> { + if self.state != SubsState::Confirming { + // Not relevant for us + trans.complete(); + return Err(Error::Invalid); + } + + // Is there a previous resume read pending + if self.resume_read_req.is_some() { + let mut resume_read_req = self.resume_read_req.take().unwrap(); + if let Some(packet) = resume_read_req.pending_req.as_mut() { + let rx_buf = packet.get_parsebuf()?.as_borrow_slice(); + let root = tlv::get_root_node(rx_buf)?; + let req = SubscribeReq::from_tlv(&root)?; + + self.do_read(&req, trans, tw, dm, &mut resume_read_req.resume_from)?; + if resume_read_req.resume_from.is_some() { + // More chunks are pending, setup resume_read_req again + self.resume_read_req = Some(resume_read_req); + } + + return Ok((OpCode::ReportData, ResponseRequired::Yes)); + } + } + + // We are here implies that the read is now complete + self.confirm_subscription(trans, tw) + } + + fn confirm_subscription( + &mut self, + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + self.state = SubsState::Confirmed; + + // TODO + let resp = SubscribeResp::new(self.id, 40); + resp.to_tlv(tw, TagType::Anonymous)?; + trans.complete(); + Ok((OpCode::SubscriptResponse, ResponseRequired::Yes)) + } + + fn do_read( + &mut self, + req: &SubscribeReq, + trans: &mut Transaction, + tw: &mut TLVWriter, + dm: &DataModel, + resume_from: &mut Option, + ) -> Result<(), Error> { + let read_req = req.to_read_req(); + tw.start_struct(TagType::Anonymous)?; + tw.u32( + TagType::Context(msg::ReportDataTag::SubscriptionId as u8), + self.id, + )?; + dm.handle_read_attr_array(&read_req, trans, tw, resume_from)?; + tw.end_container()?; + + Ok(()) + } +} diff --git a/matter/src/data_model/objects/cluster.rs b/matter/src/data_model/objects/cluster.rs index 68a4860..654a0a5 100644 --- a/matter/src/data_model/objects/cluster.rs +++ b/matter/src/data_model/objects/cluster.rs @@ -46,11 +46,28 @@ pub enum GlobalElements { // TODO: What if we instead of creating this, we just pass the AttrData/AttrPath to the read/write // methods? +/// The Attribute Details structure records the details about the attribute under consideration. +/// Typically this structure is progressively built as we proceed through the request processing. pub struct AttrDetails { - pub attr_id: u16, - pub list_index: Option>, - pub fab_idx: u8, + /// Fabric Filtering Activated pub fab_filter: bool, + /// The current Fabric Index + pub fab_idx: u8, + /// List Index, if any + pub list_index: Option>, + /// The actual attribute ID + pub attr_id: u16, +} + +impl AttrDetails { + pub fn new(fab_idx: u8, fab_filter: bool) -> Self { + Self { + fab_filter, + fab_idx, + list_index: None, + attr_id: 0, + } + } } pub trait ClusterType { diff --git a/matter/src/data_model/system_model/access_control.rs b/matter/src/data_model/system_model/access_control.rs index 2e5a992..b9b8f78 100644 --- a/matter/src/data_model/system_model/access_control.rs +++ b/matter/src/data_model/system_model/access_control.rs @@ -195,7 +195,7 @@ mod tests { use crate::{ acl::{AclEntry, AclMgr, AuthMode}, data_model::{ - core::AttrReadEncoder, + core::read::AttrReadEncoder, objects::{AttrDetails, ClusterType, Privilege}, }, interaction_model::messages::ib::ListOperation, diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index 114e86a..4ae4e52 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -100,24 +100,30 @@ impl InteractionModel { InteractionModel { consumer } } + pub fn handle_subscribe_req( + &mut self, + trans: &mut Transaction, + rx_buf: &[u8], + proto_tx: &mut Packet, + ) -> Result { + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); + let (opcode, resp) = self.consumer.consume_subscribe(rx_buf, trans, &mut tw)?; + proto_tx.set_proto_opcode(opcode as u8); + Ok(resp) + } + pub fn handle_status_resp( &mut self, trans: &mut Transaction, rx_buf: &[u8], proto_tx: &mut Packet, ) -> Result { + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); let root = get_root_node_struct(rx_buf)?; let req = StatusResp::from_tlv(&root)?; - - let mut handled = false; - let result = self.handle_subscription_confirm(trans, proto_tx, &mut handled); - if handled { - result - } else { - // Nothing to do for now - info!("Received status report with status {:?}", req.status); - Ok(ResponseRequired::No) - } + let (opcode, resp) = self.consumer.consume_status_report(&req, trans, &mut tw)?; + proto_tx.set_proto_opcode(opcode as u8); + Ok(resp) } pub fn handle_timed_req( diff --git a/matter/src/interaction_model/messages.rs b/matter/src/interaction_model/messages.rs index e5c3ba8..3c164ae 100644 --- a/matter/src/interaction_model/messages.rs +++ b/matter/src/interaction_model/messages.rs @@ -223,7 +223,7 @@ pub mod msg { SubscriptionId = 0, AttributeReports = 1, _EventReport = 2, - _MoreChunkedMsgs = 3, + MoreChunkedMsgs = 3, SupressResponse = 4, } diff --git a/matter/src/interaction_model/mod.rs b/matter/src/interaction_model/mod.rs index 5b4996f..c4d401b 100644 --- a/matter/src/interaction_model/mod.rs +++ b/matter/src/interaction_model/mod.rs @@ -18,10 +18,13 @@ use crate::{ error::Error, tlv::TLVWriter, - transport::{exchange::Exchange, session::Session}, + transport::{exchange::Exchange, proto_demux::ResponseRequired, session::Session}, }; -use self::messages::msg::{InvReq, ReadReq, WriteReq}; +use self::{ + core::OpCode, + messages::msg::{InvReq, StatusResp, WriteReq}, +}; #[derive(PartialEq)] pub enum TransactionState { @@ -44,7 +47,9 @@ pub trait InteractionConsumer { fn consume_read_attr( &self, - req: &ReadReq, + // TODO: This handling is different from the other APIs here, identify + // consistent options for this trait + req: &[u8], trans: &mut Transaction, tw: &mut TLVWriter, ) -> Result<(), Error>; @@ -55,6 +60,20 @@ pub trait InteractionConsumer { trans: &mut Transaction, tw: &mut TLVWriter, ) -> Result<(), Error>; + + fn consume_status_report( + &self, + _req: &StatusResp, + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error>; + + fn consume_subscribe( + &self, + _req: &[u8], + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error>; } pub struct InteractionModel { @@ -64,5 +83,4 @@ pub mod command; pub mod core; pub mod messages; pub mod read; -pub mod subscribe; pub mod write; diff --git a/matter/src/interaction_model/read.rs b/matter/src/interaction_model/read.rs index 58f336e..0985eea 100644 --- a/matter/src/interaction_model/read.rs +++ b/matter/src/interaction_model/read.rs @@ -18,14 +18,11 @@ use crate::{ error::Error, interaction_model::core::OpCode, - tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType}, + tlv::TLVWriter, transport::{packet::Packet, proto_demux::ResponseRequired}, }; -use super::{ - messages::msg::{self, ReadReq}, - InteractionModel, Transaction, -}; +use super::{InteractionModel, Transaction}; impl InteractionModel { pub fn handle_read_req( @@ -35,21 +32,11 @@ impl InteractionModel { proto_tx: &mut Packet, ) -> Result { proto_tx.set_proto_opcode(OpCode::ReportData as u8); + let proto_tx_wb = proto_tx.get_writebuf()?; + let mut tw = TLVWriter::new(proto_tx_wb); - let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); - let root = get_root_node_struct(rx_buf)?; - let read_req = ReadReq::from_tlv(&root)?; + self.consumer.consume_read_attr(rx_buf, trans, &mut tw)?; - tw.start_struct(TagType::Anonymous)?; - self.consumer.consume_read_attr(&read_req, trans, &mut tw)?; - // Supress response always true for read interaction - tw.bool( - TagType::Context(msg::ReportDataTag::SupressResponse as u8), - true, - )?; - tw.end_container()?; - - trans.complete(); Ok(ResponseRequired::Yes) } } diff --git a/matter/src/interaction_model/subscribe.rs b/matter/src/interaction_model/subscribe.rs deleted file mode 100644 index 330ab7f..0000000 --- a/matter/src/interaction_model/subscribe.rs +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Copyright (c) 2023 Project CHIP Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::sync::atomic::{AtomicU32, Ordering}; - -use crate::{ - error::Error, - interaction_model::core::OpCode, - tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV}, - transport::{packet::Packet, proto_demux::ResponseRequired}, -}; - -use log::error; - -use super::{ - messages::msg::{self, SubscribeReq, SubscribeResp}, - InteractionModel, Transaction, -}; - -static SUBS_ID: AtomicU32 = AtomicU32::new(1); - -impl InteractionModel { - pub fn handle_subscribe_req( - &mut self, - trans: &mut Transaction, - rx_buf: &[u8], - proto_tx: &mut Packet, - ) -> Result { - proto_tx.set_proto_opcode(OpCode::ReportData as u8); - - let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); - let root = get_root_node_struct(rx_buf)?; - let req = SubscribeReq::from_tlv(&root)?; - - let ctx = Box::new(SubsCtx { - state: SubsState::Confirming, - // TODO - id: SUBS_ID.fetch_add(1, Ordering::SeqCst), - }); - - let read_req = req.to_read_req(); - tw.start_struct(TagType::Anonymous)?; - tw.u32( - TagType::Context(msg::ReportDataTag::SubscriptionId as u8), - ctx.id, - )?; - self.consumer.consume_read_attr(&read_req, trans, &mut tw)?; - tw.bool( - TagType::Context(msg::ReportDataTag::SupressResponse as u8), - false, - )?; - tw.end_container()?; - - if !trans.exch.is_data_none() { - error!("Exchange data already set!"); - return Err(Error::InvalidState); - } - trans.exch.set_data_boxed(ctx); - - Ok(ResponseRequired::Yes) - } - - pub fn handle_subscription_confirm( - &mut self, - trans: &mut Transaction, - proto_tx: &mut Packet, - request_handled: &mut bool, - ) -> Result { - *request_handled = false; - if let Some(ctx) = trans.exch.get_data_boxed::() { - if ctx.state != SubsState::Confirming { - // Not relevant for us - return Err(Error::Invalid); - } - *request_handled = true; - ctx.state = SubsState::Confirmed; - proto_tx.set_proto_opcode(OpCode::SubscriptResponse as u8); - - // TODO - let resp = SubscribeResp::new(ctx.id, 40); - let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); - resp.to_tlv(&mut tw, TagType::Anonymous)?; - trans.complete(); - Ok(ResponseRequired::Yes) - } else { - trans.complete(); - Err(Error::Invalid) - } - } -} - -#[derive(PartialEq)] -enum SubsState { - Confirming, - Confirmed, -} - -struct SubsCtx { - state: SubsState, - id: u32, -} diff --git a/matter/src/secure_channel/pake.rs b/matter/src/secure_channel/pake.rs index b096cf6..27aaeb0 100644 --- a/matter/src/secure_channel/pake.rs +++ b/matter/src/secure_channel/pake.rs @@ -115,6 +115,12 @@ impl PaseMgr { } } +impl Default for PaseMgr { + fn default() -> Self { + Self::new() + } +} + // This file basically deals with the handlers for the PASE secure channel protocol // TLV extraction and encoding is done in this file. // We create a Spake2p object and set it up in the exchange-data. This object then diff --git a/matter/src/tlv/traits.rs b/matter/src/tlv/traits.rs index 60bf73a..2d3cedd 100644 --- a/matter/src/tlv/traits.rs +++ b/matter/src/tlv/traits.rs @@ -383,7 +383,7 @@ impl<'a, T: ToTLV> ToTLV for TLVArray<'a, T> { } tw.end_container() } - Self::Ptr(_) => Err(Error::Invalid), + Self::Ptr(t) => t.to_tlv(tw, tag_type), } } } @@ -404,6 +404,31 @@ impl<'a, T: Debug + ToTLV + FromTLV<'a> + Copy> Debug for TLVArray<'a, T> { } } +impl<'a> ToTLV for TLVElement<'a> { + fn to_tlv(&self, tw: &mut TLVWriter, _tag_type: TagType) -> Result<(), Error> { + match self.get_element_type() { + ElementType::S8(v) => v.to_tlv(tw, self.get_tag()), + ElementType::U8(v) => v.to_tlv(tw, self.get_tag()), + ElementType::U16(v) => v.to_tlv(tw, self.get_tag()), + ElementType::U32(v) => v.to_tlv(tw, self.get_tag()), + ElementType::U64(v) => v.to_tlv(tw, self.get_tag()), + ElementType::False => tw.bool(self.get_tag(), false), + ElementType::True => tw.bool(self.get_tag(), true), + ElementType::Utf8l(v) | ElementType::Utf16l(v) => tw.utf16(self.get_tag(), v), + ElementType::Str8l(v) | ElementType::Str16l(v) => tw.str16(self.get_tag(), v), + ElementType::Null => tw.null(self.get_tag()), + ElementType::Struct(_) => tw.start_struct(self.get_tag()), + ElementType::Array(_) => tw.start_array(self.get_tag()), + ElementType::List(_) => tw.start_list(self.get_tag()), + ElementType::EndCnt => tw.end_container(), + _ => { + error!("ToTLV Not supported"); + Err(Error::Invalid) + } + } + } +} + #[cfg(test)] mod tests { use super::{FromTLV, OctetStr, TLVElement, TLVWriter, TagType, ToTLV}; diff --git a/matter/src/tlv/writer.rs b/matter/src/tlv/writer.rs index 7459aa3..cdf914a 100644 --- a/matter/src/tlv/writer.rs +++ b/matter/src/tlv/writer.rs @@ -264,6 +264,10 @@ impl<'a, 'b> TLVWriter<'a, 'b> { pub fn rewind_to(&mut self, anchor: usize) { self.buf.rewind_tail_to(anchor); } + + pub fn get_buf<'c>(&'c mut self) -> &'c mut WriteBuf<'a> { + self.buf + } } #[cfg(test)] diff --git a/matter/src/transport/proto_demux.rs b/matter/src/transport/proto_demux.rs index c2d5500..263ffc9 100644 --- a/matter/src/transport/proto_demux.rs +++ b/matter/src/transport/proto_demux.rs @@ -24,7 +24,7 @@ use super::packet::PacketPool; const MAX_PROTOCOLS: usize = 4; -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub enum ResponseRequired { Yes, No, diff --git a/matter/src/utils/writebuf.rs b/matter/src/utils/writebuf.rs index 00c5e88..cf28888 100644 --- a/matter/src/utils/writebuf.rs +++ b/matter/src/utils/writebuf.rs @@ -18,6 +18,33 @@ use crate::error::*; use byteorder::{ByteOrder, LittleEndian}; +/// Shrink WriteBuf +/// +/// This Macro creates a new (child) WriteBuf which has a truncated slice end. +/// - It accepts a WriteBuf, and the size to reserve (truncate) towards the end. +/// - It returns the new (child) WriteBuf +#[macro_export] +macro_rules! wb_shrink { + ($orig_wb:ident, $reserve:ident) => {{ + let m_data = $orig_wb.empty_as_mut_slice(); + let m_wb = WriteBuf::new(m_data, m_data.len() - $reserve); + (m_wb) + }}; +} + +/// Unshrink WriteBuf +/// +/// This macro unshrinks the WriteBuf +/// - It accepts the original WriteBuf and the child WriteBuf (that was the result of wb_shrink) +/// After this call, the child WriteBuf shouldn't be used +#[macro_export] +macro_rules! wb_unshrink { + ($orig_wb:ident, $new_wb:ident) => {{ + let m_data_len = $new_wb.as_slice().len(); + $orig_wb.forward_tail_by(m_data_len); + }}; +} + #[derive(Debug)] pub struct WriteBuf<'a> { buf: &'a mut [u8], diff --git a/matter/tests/interaction_model.rs b/matter/tests/interaction_model.rs index 3f831c4..8a99fa3 100644 --- a/matter/tests/interaction_model.rs +++ b/matter/tests/interaction_model.rs @@ -19,7 +19,6 @@ use boxslab::Slab; use matter::error::Error; use matter::interaction_model::core::OpCode; use matter::interaction_model::messages::msg::InvReq; -use matter::interaction_model::messages::msg::ReadReq; use matter::interaction_model::messages::msg::WriteReq; use matter::interaction_model::InteractionConsumer; use matter::interaction_model::InteractionModel; @@ -32,6 +31,7 @@ use matter::transport::packet::Packet; use matter::transport::packet::PacketPool; use matter::transport::proto_demux::HandleProto; use matter::transport::proto_demux::ProtoCtx; +use matter::transport::proto_demux::ResponseRequired; use matter::transport::session::SessionMgr; use std::net::Ipv4Addr; use std::net::SocketAddr; @@ -93,7 +93,7 @@ impl InteractionConsumer for DataModel { fn consume_read_attr( &self, - _req: &ReadReq, + _req: &[u8], _trans: &mut Transaction, _tlvwriter: &mut TLVWriter, ) -> Result<(), Error> { @@ -108,6 +108,24 @@ impl InteractionConsumer for DataModel { ) -> Result<(), Error> { Ok(()) } + + fn consume_status_report( + &self, + _req: &matter::interaction_model::messages::msg::StatusResp, + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + Ok((OpCode::StatusResponse, ResponseRequired::No)) + } + + fn consume_subscribe( + &self, + _req: &[u8], + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, matter::transport::proto_demux::ResponseRequired), Error> { + Ok((OpCode::StatusResponse, ResponseRequired::No)) + } } fn handle_data(action: OpCode, data_in: &[u8], data_out: &mut [u8]) -> (DataModel, usize) {