diff --git a/matter/src/data_model/core.rs b/matter/src/data_model/core.rs index b53f955..20efeb7 100644 --- a/matter/src/data_model/core.rs +++ b/matter/src/data_model/core.rs @@ -15,26 +15,17 @@ * limitations under the License. */ -use core::{ - cell::RefCell, - sync::atomic::{AtomicU32, Ordering}, -}; +use core::cell::RefCell; use super::objects::*; use crate::{ acl::{Accessor, AclMgr}, error::*, - interaction_model::{ - core::{Interaction, Transaction}, - messages::msg::SubscribeResp, - }, - tlv::{TLVWriter, TagType, ToTLV}, + interaction_model::core::{Interaction, Transaction}, + tlv::TLVWriter, transport::packet::Packet, }; -// TODO: For now... -static SUBS_ID: AtomicU32 = AtomicU32::new(1); - pub struct DataModel<'a, T> { pub acl_mgr: &'a RefCell, pub node: &'a Node<'a>, @@ -120,19 +111,12 @@ impl<'a, T> DataModel<'a, T> { Interaction::ResumeSubscribe(req) => { let mut resume_path = None; - if req.resume_path.is_some() { - for item in self.node.resume_subscribing_read(&req, &accessor) { - if let Some(path) = - AttrDataEncoder::handle_read(item, &self.handler, &mut tw)? - { - resume_path = Some(path); - break; - } + for item in self.node.resume_subscribing_read(&req, &accessor) { + if let Some(path) = AttrDataEncoder::handle_read(item, &self.handler, &mut tw)? + { + resume_path = Some(path); + break; } - } else { - // TODO - let resp = SubscribeResp::new(SUBS_ID.fetch_add(1, Ordering::SeqCst), 40); - resp.to_tlv(&mut tw, TagType::Anonymous)?; } req.complete(tx, transaction, resume_path) @@ -215,19 +199,13 @@ impl<'a, T> DataModel<'a, T> { Interaction::ResumeSubscribe(req) => { let mut resume_path = None; - if req.resume_path.is_some() { - for item in self.node.resume_subscribing_read(&req, &accessor) { - if let Some(path) = - AttrDataEncoder::handle_read_async(item, &self.handler, &mut tw).await? - { - resume_path = Some(path); - break; - } + for item in self.node.resume_subscribing_read(&req, &accessor) { + if let Some(path) = + AttrDataEncoder::handle_read_async(item, &self.handler, &mut tw).await? + { + resume_path = Some(path); + break; } - } else { - // TODO - let resp = SubscribeResp::new(SUBS_ID.fetch_add(1, Ordering::SeqCst), 40); - resp.to_tlv(&mut tw, TagType::Anonymous)?; } req.complete(tx, transaction, resume_path) diff --git a/matter/src/data_model/sdm/noc.rs b/matter/src/data_model/sdm/noc.rs index 3bf6d85..634ba85 100644 --- a/matter/src/data_model/sdm/noc.rs +++ b/matter/src/data_model/sdm/noc.rs @@ -388,7 +388,6 @@ impl<'a> NocCluster<'a> { self.failsafe.borrow_mut().record_add_noc(fab_idx)?; - transaction.complete(); Ok(fab_idx) } @@ -479,7 +478,10 @@ impl<'a> NocCluster<'a> { Err(NocError::Error(error)) => Err(error)?, }; - Self::create_nocresponse(encoder, status, fab_idx, "") + Self::create_nocresponse(encoder, status, fab_idx, "")?; + transaction.complete(); + + Ok(()) } fn handle_command_attrequest( diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index abf76a4..67e9181 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +use core::sync::atomic::{AtomicU32, Ordering}; use core::time::Duration; use crate::{ @@ -35,7 +36,7 @@ use num_derive::FromPrimitive; use super::messages::{ ib::{AttrPath, DataVersionFilter}, - msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, TimedReq, WriteReq}, + msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, SubscribeResp, TimedReq, WriteReq}, GenericPath, }; @@ -206,6 +207,9 @@ const MAX_RESUME_DATAVER_FILTERS: usize = 128; // the end of long reads. const LONG_READS_TLV_RESERVE_SIZE: usize = 24; +// TODO: For now... +static SUBS_ID: AtomicU32 = AtomicU32::new(1); + pub enum Interaction<'a> { Read(ReadReq<'a>), Write(WriteReq<'a>), @@ -511,8 +515,13 @@ impl TimedReq { } impl<'a> SubscribeReq<'a> { - fn suspend(&self, resume_path: Option) -> ResumeSubscribeReq { + fn suspend( + &self, + resume_path: Option, + subscription_id: u32, + ) -> ResumeSubscribeReq { ResumeSubscribeReq { + subscription_id, paths: self .attr_requests .iter() @@ -531,7 +540,7 @@ impl<'a> SubscribeReq<'a> { } } - fn initiate(&self, tx: &mut Packet, _transaction: &mut Transaction) -> Result { + fn initiate(&self, tx: &mut Packet, transaction: &mut Transaction) -> Result { tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16); tx.set_proto_opcode(OpCode::ReportData as u8); @@ -539,6 +548,14 @@ impl<'a> SubscribeReq<'a> { tw.start_struct(TagType::Anonymous)?; + let subscription_id = SUBS_ID.fetch_add(1, Ordering::SeqCst); + transaction.exch_mut().set_subscription_id(subscription_id); + + tw.u32( + TagType::Context(msg::ReportDataTag::SubscriptionId as u8), + subscription_id, + )?; + if self.attr_requests.is_some() { tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; } @@ -565,9 +582,11 @@ impl<'a> SubscribeReq<'a> { )?; } + let subscription_id = transaction.exch_mut().take_subscription_id().unwrap(); + transaction .exch_mut() - .set_suspended_subscribe_req(self.suspend(resume_path)); + .set_suspended_subscribe_req(self.suspend(resume_path, subscription_id)); tw.bool( TagType::Context(msg::ReportDataTag::SupressResponse as u8), @@ -640,6 +659,7 @@ impl ResumeReadReq { } pub struct ResumeSubscribeReq { + pub subscription_id: u32, pub paths: heapless::Vec, pub filters: heapless::Vec, pub fabric_filtered: bool, @@ -660,15 +680,28 @@ impl ResumeSubscribeReq { tw.start_struct(TagType::Anonymous)?; + tw.u32( + TagType::Context(msg::ReportDataTag::SubscriptionId as u8), + self.subscription_id, + )?; + tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; + + Ok(true) } else { tx.set_proto_opcode(OpCode::SubscribeResponse as u8); - // let mut tw = TLVWriter::new(tx.get_writebuf()?); - // tw.start_struct(TagType::Anonymous)?; - } + let mut tw = TLVWriter::new(tx.get_writebuf()?); - Ok(true) + tw.start_struct(TagType::Anonymous)?; + + let resp = SubscribeResp::new(self.subscription_id, 40); + resp.to_tlv(&mut tw, TagType::Anonymous)?; + + tw.end_container()?; + + Ok(false) + } } pub fn complete( @@ -677,41 +710,34 @@ impl ResumeSubscribeReq { transaction: &mut Transaction, resume_path: Option, ) -> Result { - if self.resume_path.is_none() && resume_path.is_some() { - panic!("Cannot resume subscribe"); + if self.resume_path.is_none() { + // Should not get here as initiate() should've sent the subscribe response already + panic!("Subscription was already processed"); } - if self.resume_path.is_some() { - // Completing a ReportData message - let mut tw = ReadReq::restore_long_read_space(tx)?; + // Completing a ReportData message - tw.end_container()?; + let mut tw = ReadReq::restore_long_read_space(tx)?; - if resume_path.is_some() { - tw.bool( - TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8), - true, - )?; - } + tw.end_container()?; + if resume_path.is_some() { tw.bool( - TagType::Context(msg::ReportDataTag::SupressResponse as u8), - false, + TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8), + true, )?; - - tw.end_container()?; - - self.resume_path = resume_path; - transaction.exch_mut().set_suspended_subscribe_req(self); - } else { - // Completing a SubscribeResponse message - - // let mut tw = TLVWriter::new(tx.get_writebuf()?); - // tw.end_container()?; - - transaction.complete(); } + tw.bool( + TagType::Context(msg::ReportDataTag::SupressResponse as u8), + false, + )?; + + tw.end_container()?; + + self.resume_path = resume_path; + transaction.exch_mut().set_suspended_subscribe_req(self); + Ok(true) } } diff --git a/matter/src/transport/exchange.rs b/matter/src/transport/exchange.rs index a25baba..5d7a79c 100644 --- a/matter/src/transport/exchange.rs +++ b/matter/src/transport/exchange.rs @@ -71,6 +71,7 @@ pub enum DataOption { CaseSession(CaseSession), Time(Duration), SuspendedReadReq(ResumeReadReq), + SubscriptionId(u32), SuspendedSubscibeReq(ResumeSubscribeReq), #[default] None, @@ -168,6 +169,20 @@ impl Exchange { } } + pub fn set_subscription_id(&mut self, id: u32) { + self.data = DataOption::SubscriptionId(id); + } + + pub fn take_subscription_id(&mut self) -> Option { + let old = core::mem::replace(&mut self.data, DataOption::None); + if let DataOption::SubscriptionId(id) = old { + Some(id) + } else { + self.data = old; + None + } + } + pub fn set_suspended_subscribe_req(&mut self, req: ResumeSubscribeReq) { self.data = DataOption::SuspendedSubscibeReq(req); } diff --git a/matter/src/transport/mgr.rs b/matter/src/transport/mgr.rs index e33e30e..0db6390 100644 --- a/matter/src/transport/mgr.rs +++ b/matter/src/transport/mgr.rs @@ -70,6 +70,7 @@ impl<'r, 'a, 'p> RecvCompletion<'r, 'a, 'p> { fn maybe_next_action(&mut self) -> Result>>, Error> { self.mgr.exch_mgr.purge(); + self.tx.reset(); let (state, next) = match core::mem::replace(&mut self.state, RecvState::New) { RecvState::New => { @@ -108,7 +109,7 @@ impl<'r, 'a, 'p> RecvCompletion<'r, 'a, 'p> { } } Ok(None) => (RecvState::Ack, None), - Err(Error::Duplicate) => (RecvState::Ack, Some(None)), + Err(Error::Duplicate) => (RecvState::Ack, None), Err(Error::NoSpace) => (RecvState::EvictSession, None), Err(err) => Err(err)?, },