diff --git a/matter/src/data_model/core/mod.rs b/matter/src/data_model/core/mod.rs index e4cb187..1b257ff 100644 --- a/matter/src/data_model/core/mod.rs +++ b/matter/src/data_model/core/mod.rs @@ -15,6 +15,8 @@ * limitations under the License. */ +use self::subscribe::SubsCtx; + use super::{ cluster_basic_information::BasicInfoConfig, device_types::device_type_add_root_node, @@ -31,7 +33,7 @@ use crate::{ core::{IMStatusCode, OpCode}, messages::{ ib::{self, AttrData, DataVersionFilter}, - msg::{self, InvReq, ReadReq, SubscribeReq, WriteReq}, + msg::{self, InvReq, ReadReq, WriteReq}, GenericPath, }, InteractionConsumer, Transaction, @@ -320,9 +322,7 @@ impl InteractionConsumer for DataModel { let result = match *resume { ResumeReq::Read(ref mut read) => self.handle_resume_read(read, trans, tw)?, - ResumeReq::Subscribe(mut ctx) => { - self.handle_subscription_confirm(trans, tw, &mut ctx)? - } + ResumeReq::Subscribe(ref mut ctx) => ctx.handle_status_report(trans, tw, self)?, }; trans.exch.set_data_boxed(resume); Ok(result) @@ -336,7 +336,7 @@ impl InteractionConsumer for DataModel { fn consume_subscribe( &self, - req: &SubscribeReq, + rx_buf: &[u8], trans: &mut Transaction, tw: &mut TLVWriter, ) -> Result<(OpCode, ResponseRequired), Error> { @@ -344,7 +344,7 @@ impl InteractionConsumer for DataModel { error!("Exchange data already set!"); return Err(Error::InvalidState); } - let ctx = self.handle_subscribe_req(req, trans, tw)?; + let ctx = SubsCtx::new(rx_buf, trans, tw, self)?; trans .exch .set_data_boxed(Box::new(ResumeReq::Subscribe(ctx))); diff --git a/matter/src/data_model/core/read.rs b/matter/src/data_model/core/read.rs index 9c1c72b..eb12a5b 100644 --- a/matter/src/data_model/core/read.rs +++ b/matter/src/data_model/core/read.rs @@ -34,6 +34,7 @@ use crate::{ wb_shrink, wb_unshrink, }; use log::error; + /// Encoder for generating a response to a read request pub struct AttrReadEncoder<'a, 'b, 'c> { tw: &'a mut TLVWriter<'b, 'c>, @@ -107,13 +108,13 @@ impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> { pub struct ResumeReadReq { /// The Read Request Attribute Path that caused chunking, and this is the path /// that needs to be resumed. - pending_req: Option>, + pub pending_req: Option>, /// The Attribute that couldn't be encoded because our buffer got full. The next chunk /// will start encoding from this attribute onwards. /// Note that given wildcard reads, one PendingPath in the member above can generated /// multiple encode paths. Hence this has to be maintained separately. - resume_from: Option, + pub resume_from: Option, } impl ResumeReadReq { pub fn new(rx_buf: &[u8], resume_from: &Option) -> Result { diff --git a/matter/src/data_model/core/subscribe.rs b/matter/src/data_model/core/subscribe.rs index b15d41d..a65ee1f 100644 --- a/matter/src/data_model/core/subscribe.rs +++ b/matter/src/data_model/core/subscribe.rs @@ -21,71 +21,122 @@ use crate::{ error::Error, interaction_model::{ core::OpCode, - messages::msg::{self, SubscribeReq, SubscribeResp}, + messages::{ + msg::{self, SubscribeReq, SubscribeResp}, + GenericPath, + }, }, - tlv::{TLVWriter, TagType, ToTLV}, + tlv::{self, get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV}, transport::proto_demux::ResponseRequired, }; -use super::{DataModel, Transaction}; +use super::{read::ResumeReadReq, DataModel, Transaction}; static SUBS_ID: AtomicU32 = AtomicU32::new(1); -impl DataModel { - pub fn handle_subscribe_req( - &self, - req: &SubscribeReq, - trans: &mut Transaction, - tw: &mut TLVWriter, - ) -> Result { - let ctx = SubsCtx { - state: SubsState::Confirming, - // TODO - id: SUBS_ID.fetch_add(1, Ordering::SeqCst), - }; - - let read_req = req.to_read_req(); - tw.start_struct(TagType::Anonymous)?; - tw.u32( - TagType::Context(msg::ReportDataTag::SubscriptionId as u8), - ctx.id, - )?; - let mut resume_from = None; - self.handle_read_attr_array(&read_req, trans, tw, &mut resume_from)?; - tw.end_container()?; - - Ok(ctx) - } - - pub fn handle_subscription_confirm( - &self, - trans: &mut Transaction, - tw: &mut TLVWriter, - ctx: &mut SubsCtx, - ) -> Result<(OpCode, ResponseRequired), Error> { - if ctx.state != SubsState::Confirming { - // Not relevant for us - trans.complete(); - return Err(Error::Invalid); - } - ctx.state = SubsState::Confirmed; - - // TODO - let resp = SubscribeResp::new(ctx.id, 40); - resp.to_tlv(tw, TagType::Anonymous)?; - trans.complete(); - Ok((OpCode::SubscriptResponse, ResponseRequired::Yes)) - } -} - -#[derive(PartialEq, Clone, Copy)] +#[derive(PartialEq)] enum SubsState { Confirming, Confirmed, } -#[derive(Clone, Copy)] pub struct SubsCtx { state: SubsState, id: u32, + resume_read_req: Option, +} + +impl SubsCtx { + pub fn new( + rx_buf: &[u8], + trans: &mut Transaction, + tw: &mut TLVWriter, + dm: &DataModel, + ) -> Result { + let root = get_root_node_struct(rx_buf)?; + let req = SubscribeReq::from_tlv(&root)?; + + let mut ctx = SubsCtx { + state: SubsState::Confirming, + // TODO + id: SUBS_ID.fetch_add(1, Ordering::SeqCst), + resume_read_req: None, + }; + + let mut resume_from = None; + ctx.do_read(&req, trans, tw, dm, &mut resume_from)?; + if resume_from.is_some() { + // This is a multi-hop read transaction, remember this read request + ctx.resume_read_req = Some(ResumeReadReq::new(rx_buf, &resume_from)?); + } + Ok(ctx) + } + + pub fn handle_status_report( + &mut self, + trans: &mut Transaction, + tw: &mut TLVWriter, + dm: &DataModel, + ) -> Result<(OpCode, ResponseRequired), Error> { + if self.state != SubsState::Confirming { + // Not relevant for us + trans.complete(); + return Err(Error::Invalid); + } + + // Is there a previous resume read pending + if self.resume_read_req.is_some() { + let mut resume_read_req = self.resume_read_req.take().unwrap(); + if let Some(packet) = resume_read_req.pending_req.as_mut() { + let rx_buf = packet.get_parsebuf()?.as_borrow_slice(); + let root = tlv::get_root_node(rx_buf)?; + let req = SubscribeReq::from_tlv(&root)?; + + self.do_read(&req, trans, tw, dm, &mut resume_read_req.resume_from)?; + if resume_read_req.resume_from.is_some() { + // More chunks are pending, setup resume_read_req again + self.resume_read_req = Some(resume_read_req); + } + + return Ok((OpCode::ReportData, ResponseRequired::Yes)); + } + } + + // We are here implies that the read is now complete + self.confirm_subscription(trans, tw) + } + + fn confirm_subscription( + &mut self, + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + self.state = SubsState::Confirmed; + + // TODO + let resp = SubscribeResp::new(self.id, 40); + resp.to_tlv(tw, TagType::Anonymous)?; + trans.complete(); + Ok((OpCode::SubscriptResponse, ResponseRequired::Yes)) + } + + fn do_read( + &mut self, + req: &SubscribeReq, + trans: &mut Transaction, + tw: &mut TLVWriter, + dm: &DataModel, + resume_from: &mut Option, + ) -> Result<(), Error> { + let read_req = req.to_read_req(); + tw.start_struct(TagType::Anonymous)?; + tw.u32( + TagType::Context(msg::ReportDataTag::SubscriptionId as u8), + self.id, + )?; + dm.handle_read_attr_array(&read_req, trans, tw, resume_from)?; + tw.end_container()?; + + Ok(()) + } } diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index 193ebe4..4ae4e52 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -33,9 +33,9 @@ use log::{error, info}; use num; use num_derive::FromPrimitive; +use super::InteractionModel; use super::Transaction; use super::TransactionState; -use super::{messages::msg::SubscribeReq, InteractionModel}; use super::{messages::msg::TimedReq, InteractionConsumer}; /* Handle messages related to the Interation Model @@ -107,10 +107,7 @@ impl InteractionModel { proto_tx: &mut Packet, ) -> Result { let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); - let root = get_root_node_struct(rx_buf)?; - let req = SubscribeReq::from_tlv(&root)?; - - let (opcode, resp) = self.consumer.consume_subscribe(&req, trans, &mut tw)?; + let (opcode, resp) = self.consumer.consume_subscribe(rx_buf, trans, &mut tw)?; proto_tx.set_proto_opcode(opcode as u8); Ok(resp) } diff --git a/matter/src/interaction_model/mod.rs b/matter/src/interaction_model/mod.rs index 036d876..c4d401b 100644 --- a/matter/src/interaction_model/mod.rs +++ b/matter/src/interaction_model/mod.rs @@ -23,7 +23,7 @@ use crate::{ use self::{ core::OpCode, - messages::msg::{InvReq, StatusResp, SubscribeReq, WriteReq}, + messages::msg::{InvReq, StatusResp, WriteReq}, }; #[derive(PartialEq)] @@ -70,7 +70,7 @@ pub trait InteractionConsumer { fn consume_subscribe( &self, - _req: &SubscribeReq, + _req: &[u8], _trans: &mut Transaction, _tw: &mut TLVWriter, ) -> Result<(OpCode, ResponseRequired), Error>;