From 0a42c974d0bed597db3016174661f710d15581e6 Mon Sep 17 00:00:00 2001 From: Kedar Sovani Date: Mon, 20 Feb 2023 21:51:00 +0530 Subject: [PATCH] ReadReq: Move stuff into common file Read request handling is getting fairly large --- .../src/data_model/{core.rs => core/mod.rs} | 218 +--------------- matter/src/data_model/core/read.rs | 246 ++++++++++++++++++ .../data_model/system_model/access_control.rs | 2 +- 3 files changed, 251 insertions(+), 215 deletions(-) rename matter/src/data_model/{core.rs => core/mod.rs} (56%) create mode 100644 matter/src/data_model/core/read.rs diff --git a/matter/src/data_model/core.rs b/matter/src/data_model/core/mod.rs similarity index 56% rename from matter/src/data_model/core.rs rename to matter/src/data_model/core/mod.rs index 1a9cc2f..2975b11 100644 --- a/matter/src/data_model/core.rs +++ b/matter/src/data_model/core/mod.rs @@ -76,17 +76,6 @@ impl DataModel { Ok(dm) } - pub fn read_attribute_raw( - &self, - endpoint: u16, - cluster: u32, - attr: u16, - ) -> Result { - let node = self.node.read().unwrap(); - let cluster = node.get_cluster(endpoint, cluster)?; - cluster.base().read_attribute_raw(attr).map(|a| a.clone()) - } - // Encode a write attribute from a path that may or may not be wildcard fn handle_write_attr_path( node: &mut Node, @@ -153,67 +142,6 @@ impl DataModel { } } - /// Encode a read attribute from a path that may or may not be wildcard - /// - /// If the buffer gets full while generating the read response, we will return - /// an Err(path), where the path is the path that we should resume from, for the next chunk. - /// This facilitates chunk management - fn handle_read_attr_path( - node: &Node, - accessor: &Accessor, - attr_encoder: &mut AttrReadEncoder, - attr_details: &mut AttrDetails, - resume_from: &mut Option, - ) -> Result<(), Error> { - let mut status = Ok(()); - let path = attr_encoder.path; - - // Skip error reporting for wildcard paths, don't for concrete paths - attr_encoder.skip_error(path.is_wildcard()); - - let result = node.for_each_attribute(&path, |path, c| { - // Ignore processing if data filter matches. - // For a wildcard attribute, this may end happening unnecessarily for all attributes, although - // a single skip for the cluster is sufficient. That requires us to replace this for_each with a - // for_each_cluster - let cluster_data_ver = c.base().get_dataver(); - if Self::data_filter_matches(&attr_encoder.data_ver_filters, path, cluster_data_ver) { - return Ok(()); - } - - // The resume_from indicates that this is the next chunk of a previous Read Request. In such cases, we - // need to skip until we hit this path. - if let Some(r) = resume_from { - // If resume_from is valid, and we haven't hit the resume_from yet, skip encoding - if r != path { - return Ok(()); - } else { - // Else, wipe out the resume_from so subsequent paths can be encoded - *resume_from = None; - } - } - - attr_details.attr_id = path.leaf.unwrap_or_default() as u16; - // Overwrite the previous path with the concrete path - attr_encoder.set_path(*path); - // Set the cluster's data version - attr_encoder.set_data_ver(cluster_data_ver); - let mut access_req = AccessReq::new(accessor, path, Access::READ); - Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details); - if attr_encoder.is_buffer_full() { - // Buffer is full, next time resume from this attribute - *resume_from = Some(*path); - status = Err(Error::NoSpace); - } - Ok(()) - }); - if let Err(e) = result { - // We hit this only if this is a non-wildcard path - attr_encoder.encode_status(e, 0); - } - status - } - // Handle command from a path that may or may not be wildcard fn handle_command_path(node: &mut Node, cmd_req: &mut CommandReq) { let wildcard = cmd_req.cmd.path.is_wildcard(); @@ -291,6 +219,8 @@ impl DataModel { } } +pub mod read; + impl objects::ChangeConsumer for DataModel { fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> { endpoint.add_cluster(DescriptorCluster::new(id, self.clone())?)?; @@ -298,26 +228,6 @@ impl objects::ChangeConsumer for DataModel { } } -/// State to maintain when a Read Request needs to be resumed -/// resumed - the next chunk of the read needs to be returned -#[derive(Default)] -pub struct ResumeReadReq { - /// The Read Request Attribute Path that caused chunking, and this is the path - /// that needs to be resumed. - /// - /// TODO: Ideally, the entire ReadRequest (with any subsequent AttrPaths) should also - /// be maintained. But for now, we just store the AttrPath that caused the overflow - /// and chunking. Hopefully, the other end requests any pending paths when it sees no - /// more chunks. - pending_path: GenericPath, - - /// The Attribute that couldn't be encoded because our buffer got full. The next chunk - /// will start encoding from this attribute onwards. - /// Note that given wildcard reads, one PendingPath in the member above can generated - /// multiple encode paths. Hence this has to be maintained separately. - resume_encode: Option, -} - impl InteractionConsumer for DataModel { fn consume_write_attr( &self, @@ -339,64 +249,11 @@ impl InteractionConsumer for DataModel { fn consume_read_attr( &self, - read_req: &ReadReq, + req: &ReadReq, trans: &mut Transaction, tw: &mut TLVWriter, ) -> Result<(), Error> { - let mut resume_read_req: ResumeReadReq = Default::default(); - - let mut attr_encoder = AttrReadEncoder::new(tw); - if let Some(filters) = &read_req.dataver_filters { - attr_encoder.set_data_ver_filters(filters); - } - - if let Some(attr_requests) = &read_req.attr_requests { - let accessor = self.sess_to_accessor(trans.session); - let mut attr_details = AttrDetails::new(accessor.fab_idx, read_req.fabric_filtered); - let node = self.node.read().unwrap(); - attr_encoder - .tw - .start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; - - let mut result = Ok(()); - for attr_path in attr_requests.iter() { - attr_encoder.set_path(attr_path.to_gp()); - // Extract the attr_path fields into various structures - attr_details.list_index = attr_path.list_index; - result = DataModel::handle_read_attr_path( - &node, - &accessor, - &mut attr_encoder, - &mut attr_details, - &mut resume_read_req.resume_encode, - ); - if result.is_err() { - resume_read_req.pending_path = attr_path.to_gp(); - break; - } - } - tw.end_container()?; - if result.is_err() { - // If there was an error, indicate chunking. The resume_read_req would have been - // already populated from in the loop above. - tw.bool( - TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8), - true, - )?; - tw.bool( - TagType::Context(msg::ReportDataTag::SupressResponse as u8), - false, - )?; - // Don't complete the transaction - } else { - tw.bool( - TagType::Context(msg::ReportDataTag::SupressResponse as u8), - true, - )?; - trans.complete(); - } - } - Ok(()) + self.handle_read_attr_array(req, trans, tw) } fn consume_invoke_cmd( @@ -431,73 +288,6 @@ impl InteractionConsumer for DataModel { } } -/// Encoder for generating a response to a read request -pub struct AttrReadEncoder<'a, 'b, 'c> { - tw: &'a mut TLVWriter<'b, 'c>, - data_ver: u32, - path: GenericPath, - skip_error: bool, - data_ver_filters: Option<&'a TLVArray<'a, DataVersionFilter>>, - is_buffer_full: bool, -} - -impl<'a, 'b, 'c> AttrReadEncoder<'a, 'b, 'c> { - pub fn new(tw: &'a mut TLVWriter<'b, 'c>) -> Self { - Self { - tw, - data_ver: 0, - skip_error: false, - path: Default::default(), - data_ver_filters: None, - is_buffer_full: false, - } - } - - pub fn skip_error(&mut self, skip: bool) { - self.skip_error = skip; - } - - pub fn set_data_ver(&mut self, data_ver: u32) { - self.data_ver = data_ver; - } - - pub fn set_data_ver_filters(&mut self, filters: &'a TLVArray<'a, DataVersionFilter>) { - self.data_ver_filters = Some(filters); - } - - pub fn set_path(&mut self, path: GenericPath) { - self.path = path; - } - - pub fn is_buffer_full(&self) -> bool { - self.is_buffer_full - } -} - -impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> { - fn encode(&mut self, value: EncodeValue) { - let resp = ib::AttrResp::Data(ib::AttrData::new( - Some(self.data_ver), - ib::AttrPath::new(&self.path), - value, - )); - - let anchor = self.tw.get_tail(); - if resp.to_tlv(self.tw, TagType::Anonymous).is_err() { - self.is_buffer_full = true; - self.tw.rewind_to(anchor); - } - } - - fn encode_status(&mut self, status: IMStatusCode, cluster_status: u16) { - if !self.skip_error { - let resp = - ib::AttrResp::Status(ib::AttrStatus::new(&self.path, status, cluster_status)); - let _ = resp.to_tlv(self.tw, TagType::Anonymous); - } - } -} - /// Encoder for generating a response to a write request pub struct AttrWriteEncoder<'a, 'b, 'c> { tw: &'a mut TLVWriter<'b, 'c>, diff --git a/matter/src/data_model/core/read.rs b/matter/src/data_model/core/read.rs new file mode 100644 index 0000000..475c83e --- /dev/null +++ b/matter/src/data_model/core/read.rs @@ -0,0 +1,246 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::data_model::{core::DataModel, objects::*}; +use crate::{ + acl::{AccessReq, Accessor}, + error::*, + interaction_model::{ + core::IMStatusCode, + messages::{ + ib::{self, DataVersionFilter}, + msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs, ReportDataTag::SupressResponse}, + GenericPath, + }, + Transaction, + }, + tlv::{TLVArray, TLVWriter, TagType, ToTLV}, +}; + +/// Encoder for generating a response to a read request +pub struct AttrReadEncoder<'a, 'b, 'c> { + tw: &'a mut TLVWriter<'b, 'c>, + data_ver: u32, + path: GenericPath, + skip_error: bool, + data_ver_filters: Option<&'a TLVArray<'a, DataVersionFilter>>, + is_buffer_full: bool, +} + +impl<'a, 'b, 'c> AttrReadEncoder<'a, 'b, 'c> { + pub fn new(tw: &'a mut TLVWriter<'b, 'c>) -> Self { + Self { + tw, + data_ver: 0, + skip_error: false, + path: Default::default(), + data_ver_filters: None, + is_buffer_full: false, + } + } + + pub fn skip_error(&mut self, skip: bool) { + self.skip_error = skip; + } + + pub fn set_data_ver(&mut self, data_ver: u32) { + self.data_ver = data_ver; + } + + pub fn set_data_ver_filters(&mut self, filters: &'a TLVArray<'a, DataVersionFilter>) { + self.data_ver_filters = Some(filters); + } + + pub fn set_path(&mut self, path: GenericPath) { + self.path = path; + } + + pub fn is_buffer_full(&self) -> bool { + self.is_buffer_full + } +} + +impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> { + fn encode(&mut self, value: EncodeValue) { + let resp = ib::AttrResp::Data(ib::AttrData::new( + Some(self.data_ver), + ib::AttrPath::new(&self.path), + value, + )); + + let anchor = self.tw.get_tail(); + if resp.to_tlv(self.tw, TagType::Anonymous).is_err() { + self.is_buffer_full = true; + self.tw.rewind_to(anchor); + } + } + + fn encode_status(&mut self, status: IMStatusCode, cluster_status: u16) { + if !self.skip_error { + let resp = + ib::AttrResp::Status(ib::AttrStatus::new(&self.path, status, cluster_status)); + let _ = resp.to_tlv(self.tw, TagType::Anonymous); + } + } +} + +/// State to maintain when a Read Request needs to be resumed +/// resumed - the next chunk of the read needs to be returned +#[derive(Default)] +pub struct ResumeReadReq { + /// The Read Request Attribute Path that caused chunking, and this is the path + /// that needs to be resumed. + /// + /// TODO: Ideally, the entire ReadRequest (with any subsequent AttrPaths) should also + /// be maintained. But for now, we just store the AttrPath that caused the overflow + /// and chunking. Hopefully, the other end requests any pending paths when it sees no + /// more chunks. + pending_path: GenericPath, + + /// The Attribute that couldn't be encoded because our buffer got full. The next chunk + /// will start encoding from this attribute onwards. + /// Note that given wildcard reads, one PendingPath in the member above can generated + /// multiple encode paths. Hence this has to be maintained separately. + resume_encode: Option, +} + +impl DataModel { + pub fn read_attribute_raw( + &self, + endpoint: u16, + cluster: u32, + attr: u16, + ) -> Result { + let node = self.node.read().unwrap(); + let cluster = node.get_cluster(endpoint, cluster)?; + cluster.base().read_attribute_raw(attr).map(|a| a.clone()) + } + /// Encode a read attribute from a path that may or may not be wildcard + /// + /// If the buffer gets full while generating the read response, we will return + /// an Err(path), where the path is the path that we should resume from, for the next chunk. + /// This facilitates chunk management + fn handle_read_attr_path( + node: &Node, + accessor: &Accessor, + attr_encoder: &mut AttrReadEncoder, + attr_details: &mut AttrDetails, + resume_from: &mut Option, + ) -> Result<(), Error> { + let mut status = Ok(()); + let path = attr_encoder.path; + + // Skip error reporting for wildcard paths, don't for concrete paths + attr_encoder.skip_error(path.is_wildcard()); + + let result = node.for_each_attribute(&path, |path, c| { + // Ignore processing if data filter matches. + // For a wildcard attribute, this may end happening unnecessarily for all attributes, although + // a single skip for the cluster is sufficient. That requires us to replace this for_each with a + // for_each_cluster + let cluster_data_ver = c.base().get_dataver(); + if Self::data_filter_matches(&attr_encoder.data_ver_filters, path, cluster_data_ver) { + return Ok(()); + } + + // The resume_from indicates that this is the next chunk of a previous Read Request. In such cases, we + // need to skip until we hit this path. + if let Some(r) = resume_from { + // If resume_from is valid, and we haven't hit the resume_from yet, skip encoding + if r != path { + return Ok(()); + } else { + // Else, wipe out the resume_from so subsequent paths can be encoded + *resume_from = None; + } + } + + attr_details.attr_id = path.leaf.unwrap_or_default() as u16; + // Overwrite the previous path with the concrete path + attr_encoder.set_path(*path); + // Set the cluster's data version + attr_encoder.set_data_ver(cluster_data_ver); + let mut access_req = AccessReq::new(accessor, path, Access::READ); + Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details); + if attr_encoder.is_buffer_full() { + // Buffer is full, next time resume from this attribute + *resume_from = Some(*path); + status = Err(Error::NoSpace); + } + Ok(()) + }); + if let Err(e) = result { + // We hit this only if this is a non-wildcard path + attr_encoder.encode_status(e, 0); + } + status + } + + /// Process an array of Attribute Read Requests + pub(super) fn handle_read_attr_array( + &self, + read_req: &ReadReq, + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(), Error> { + let mut resume_read_req: ResumeReadReq = Default::default(); + + let mut attr_encoder = AttrReadEncoder::new(tw); + if let Some(filters) = &read_req.dataver_filters { + attr_encoder.set_data_ver_filters(filters); + } + + if let Some(attr_requests) = &read_req.attr_requests { + let accessor = self.sess_to_accessor(trans.session); + let mut attr_details = AttrDetails::new(accessor.fab_idx, read_req.fabric_filtered); + let node = self.node.read().unwrap(); + attr_encoder + .tw + .start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; + + let mut result = Ok(()); + for attr_path in attr_requests.iter() { + attr_encoder.set_path(attr_path.to_gp()); + // Extract the attr_path fields into various structures + attr_details.list_index = attr_path.list_index; + result = DataModel::handle_read_attr_path( + &node, + &accessor, + &mut attr_encoder, + &mut attr_details, + &mut resume_read_req.resume_encode, + ); + if result.is_err() { + resume_read_req.pending_path = attr_path.to_gp(); + break; + } + } + tw.end_container()?; + if result.is_err() { + // If there was an error, indicate chunking. The resume_read_req would have been + // already populated from in the loop above. + tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?; + tw.bool(TagType::Context(SupressResponse as u8), false)?; + // Don't complete the transaction + } else { + tw.bool(TagType::Context(SupressResponse as u8), true)?; + trans.complete(); + } + } + Ok(()) + } +} diff --git a/matter/src/data_model/system_model/access_control.rs b/matter/src/data_model/system_model/access_control.rs index 2e5a992..b9b8f78 100644 --- a/matter/src/data_model/system_model/access_control.rs +++ b/matter/src/data_model/system_model/access_control.rs @@ -195,7 +195,7 @@ mod tests { use crate::{ acl::{AclEntry, AclMgr, AuthMode}, data_model::{ - core::AttrReadEncoder, + core::read::AttrReadEncoder, objects::{AttrDetails, ClusterType, Privilege}, }, interaction_model::messages::ib::ListOperation,