diff --git a/matter/src/data_model/core/mod.rs b/matter/src/data_model/core/mod.rs index 2975b11..be7f1f0 100644 --- a/matter/src/data_model/core/mod.rs +++ b/matter/src/data_model/core/mod.rs @@ -28,17 +28,20 @@ use crate::{ fabric::FabricMgr, interaction_model::{ command::CommandReq, - core::IMStatusCode, + core::{IMStatusCode, OpCode}, messages::{ ib::{self, AttrData, DataVersionFilter}, - msg::{self, InvReq, ReadReq, WriteReq}, + msg::{self, InvReq, ReadReq, ReportDataTag::SupressResponse, SubscribeReq, WriteReq}, GenericPath, }, InteractionConsumer, Transaction, }, secure_channel::pake::PaseMgr, tlv::{TLVArray, TLVWriter, TagType, ToTLV}, - transport::session::{Session, SessionMode}, + transport::{ + proto_demux::ResponseRequired, + session::{Session, SessionMode}, + }, }; use log::{error, info}; use std::sync::{Arc, RwLock}; @@ -220,6 +223,7 @@ impl DataModel { } pub mod read; +pub mod subscribe; impl objects::ChangeConsumer for DataModel { fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> { @@ -253,7 +257,13 @@ impl InteractionConsumer for DataModel { trans: &mut Transaction, tw: &mut TLVWriter, ) -> Result<(), Error> { - self.handle_read_attr_array(req, trans, tw) + let is_chunked = self.handle_read_attr_array(req, trans, tw)?; + if !is_chunked { + tw.bool(TagType::Context(SupressResponse as u8), true)?; + // Mark transaction complete, if not chunked + trans.complete(); + } + Ok(()) } fn consume_invoke_cmd( @@ -286,6 +296,32 @@ impl InteractionConsumer for DataModel { Ok(()) } + + fn consume_status_report( + &self, + req: &msg::StatusResp, + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + let mut handled = false; + let result = self.handle_subscription_confirm(trans, tw, &mut handled); + if handled { + result + } else { + // Nothing to do for now + info!("Received status report with status {:?}", req.status); + Ok((OpCode::StatusResponse, ResponseRequired::No)) + } + } + + fn consume_subscribe( + &self, + req: &SubscribeReq, + trans: &mut Transaction, + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + self.handle_subscribe_req(req, trans, tw) + } } /// Encoder for generating a response to a write request diff --git a/matter/src/data_model/core/read.rs b/matter/src/data_model/core/read.rs index 475c83e..2a12d6b 100644 --- a/matter/src/data_model/core/read.rs +++ b/matter/src/data_model/core/read.rs @@ -23,7 +23,7 @@ use crate::{ core::IMStatusCode, messages::{ ib::{self, DataVersionFilter}, - msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs, ReportDataTag::SupressResponse}, + msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs}, GenericPath, }, Transaction, @@ -191,12 +191,14 @@ impl DataModel { } /// Process an array of Attribute Read Requests + /// + /// This API returns whether the read response is chunked or not pub(super) fn handle_read_attr_array( &self, read_req: &ReadReq, trans: &mut Transaction, tw: &mut TLVWriter, - ) -> Result<(), Error> { + ) -> Result { let mut resume_read_req: ResumeReadReq = Default::default(); let mut attr_encoder = AttrReadEncoder::new(tw); @@ -234,13 +236,9 @@ impl DataModel { // If there was an error, indicate chunking. The resume_read_req would have been // already populated from in the loop above. tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?; - tw.bool(TagType::Context(SupressResponse as u8), false)?; - // Don't complete the transaction - } else { - tw.bool(TagType::Context(SupressResponse as u8), true)?; - trans.complete(); + return Ok(true); } } - Ok(()) + Ok(false) } } diff --git a/matter/src/interaction_model/subscribe.rs b/matter/src/data_model/core/subscribe.rs similarity index 65% rename from matter/src/interaction_model/subscribe.rs rename to matter/src/data_model/core/subscribe.rs index 330ab7f..3fc601a 100644 --- a/matter/src/interaction_model/subscribe.rs +++ b/matter/src/data_model/core/subscribe.rs @@ -19,33 +19,27 @@ use std::sync::atomic::{AtomicU32, Ordering}; use crate::{ error::Error, - interaction_model::core::OpCode, - tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV}, - transport::{packet::Packet, proto_demux::ResponseRequired}, + interaction_model::{ + core::OpCode, + messages::msg::{self, SubscribeReq, SubscribeResp}, + }, + tlv::{TLVWriter, TagType, ToTLV}, + transport::proto_demux::ResponseRequired, }; use log::error; -use super::{ - messages::msg::{self, SubscribeReq, SubscribeResp}, - InteractionModel, Transaction, -}; +use super::{DataModel, Transaction}; static SUBS_ID: AtomicU32 = AtomicU32::new(1); -impl InteractionModel { +impl DataModel { pub fn handle_subscribe_req( - &mut self, + &self, + req: &SubscribeReq, trans: &mut Transaction, - rx_buf: &[u8], - proto_tx: &mut Packet, - ) -> Result { - proto_tx.set_proto_opcode(OpCode::ReportData as u8); - - let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); - let root = get_root_node_struct(rx_buf)?; - let req = SubscribeReq::from_tlv(&root)?; - + tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { let ctx = Box::new(SubsCtx { state: SubsState::Confirming, // TODO @@ -58,11 +52,7 @@ impl InteractionModel { TagType::Context(msg::ReportDataTag::SubscriptionId as u8), ctx.id, )?; - self.consumer.consume_read_attr(&read_req, trans, &mut tw)?; - tw.bool( - TagType::Context(msg::ReportDataTag::SupressResponse as u8), - false, - )?; + self.handle_read_attr_array(&read_req, trans, tw)?; tw.end_container()?; if !trans.exch.is_data_none() { @@ -71,15 +61,15 @@ impl InteractionModel { } trans.exch.set_data_boxed(ctx); - Ok(ResponseRequired::Yes) + Ok((OpCode::ReportData, ResponseRequired::Yes)) } pub fn handle_subscription_confirm( - &mut self, + &self, trans: &mut Transaction, - proto_tx: &mut Packet, + tw: &mut TLVWriter, request_handled: &mut bool, - ) -> Result { + ) -> Result<(OpCode, ResponseRequired), Error> { *request_handled = false; if let Some(ctx) = trans.exch.get_data_boxed::() { if ctx.state != SubsState::Confirming { @@ -88,14 +78,12 @@ impl InteractionModel { } *request_handled = true; ctx.state = SubsState::Confirmed; - proto_tx.set_proto_opcode(OpCode::SubscriptResponse as u8); // TODO let resp = SubscribeResp::new(ctx.id, 40); - let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); - resp.to_tlv(&mut tw, TagType::Anonymous)?; + resp.to_tlv(tw, TagType::Anonymous)?; trans.complete(); - Ok(ResponseRequired::Yes) + Ok((OpCode::SubscriptResponse, ResponseRequired::Yes)) } else { trans.complete(); Err(Error::Invalid) diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index 114e86a..193ebe4 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -33,9 +33,9 @@ use log::{error, info}; use num; use num_derive::FromPrimitive; -use super::InteractionModel; use super::Transaction; use super::TransactionState; +use super::{messages::msg::SubscribeReq, InteractionModel}; use super::{messages::msg::TimedReq, InteractionConsumer}; /* Handle messages related to the Interation Model @@ -100,24 +100,33 @@ impl InteractionModel { InteractionModel { consumer } } + pub fn handle_subscribe_req( + &mut self, + trans: &mut Transaction, + rx_buf: &[u8], + proto_tx: &mut Packet, + ) -> Result { + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); + let root = get_root_node_struct(rx_buf)?; + let req = SubscribeReq::from_tlv(&root)?; + + let (opcode, resp) = self.consumer.consume_subscribe(&req, trans, &mut tw)?; + proto_tx.set_proto_opcode(opcode as u8); + Ok(resp) + } + pub fn handle_status_resp( &mut self, trans: &mut Transaction, rx_buf: &[u8], proto_tx: &mut Packet, ) -> Result { + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); let root = get_root_node_struct(rx_buf)?; let req = StatusResp::from_tlv(&root)?; - - let mut handled = false; - let result = self.handle_subscription_confirm(trans, proto_tx, &mut handled); - if handled { - result - } else { - // Nothing to do for now - info!("Received status report with status {:?}", req.status); - Ok(ResponseRequired::No) - } + let (opcode, resp) = self.consumer.consume_status_report(&req, trans, &mut tw)?; + proto_tx.set_proto_opcode(opcode as u8); + Ok(resp) } pub fn handle_timed_req( diff --git a/matter/src/interaction_model/mod.rs b/matter/src/interaction_model/mod.rs index 5b4996f..3a08176 100644 --- a/matter/src/interaction_model/mod.rs +++ b/matter/src/interaction_model/mod.rs @@ -18,10 +18,13 @@ use crate::{ error::Error, tlv::TLVWriter, - transport::{exchange::Exchange, session::Session}, + transport::{exchange::Exchange, proto_demux::ResponseRequired, session::Session}, }; -use self::messages::msg::{InvReq, ReadReq, WriteReq}; +use self::{ + core::OpCode, + messages::msg::{InvReq, ReadReq, StatusResp, SubscribeReq, WriteReq}, +}; #[derive(PartialEq)] pub enum TransactionState { @@ -55,6 +58,20 @@ pub trait InteractionConsumer { trans: &mut Transaction, tw: &mut TLVWriter, ) -> Result<(), Error>; + + fn consume_status_report( + &self, + _req: &StatusResp, + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error>; + + fn consume_subscribe( + &self, + _req: &SubscribeReq, + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error>; } pub struct InteractionModel { @@ -64,5 +81,4 @@ pub mod command; pub mod core; pub mod messages; pub mod read; -pub mod subscribe; pub mod write; diff --git a/matter/src/transport/proto_demux.rs b/matter/src/transport/proto_demux.rs index c2d5500..263ffc9 100644 --- a/matter/src/transport/proto_demux.rs +++ b/matter/src/transport/proto_demux.rs @@ -24,7 +24,7 @@ use super::packet::PacketPool; const MAX_PROTOCOLS: usize = 4; -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub enum ResponseRequired { Yes, No, diff --git a/matter/tests/interaction_model.rs b/matter/tests/interaction_model.rs index 3f831c4..39052b3 100644 --- a/matter/tests/interaction_model.rs +++ b/matter/tests/interaction_model.rs @@ -32,6 +32,7 @@ use matter::transport::packet::Packet; use matter::transport::packet::PacketPool; use matter::transport::proto_demux::HandleProto; use matter::transport::proto_demux::ProtoCtx; +use matter::transport::proto_demux::ResponseRequired; use matter::transport::session::SessionMgr; use std::net::Ipv4Addr; use std::net::SocketAddr; @@ -108,6 +109,24 @@ impl InteractionConsumer for DataModel { ) -> Result<(), Error> { Ok(()) } + + fn consume_status_report( + &self, + _req: &matter::interaction_model::messages::msg::StatusResp, + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, ResponseRequired), Error> { + Ok((OpCode::StatusResponse, ResponseRequired::No)) + } + + fn consume_subscribe( + &self, + _req: &matter::interaction_model::messages::msg::SubscribeReq, + _trans: &mut Transaction, + _tw: &mut TLVWriter, + ) -> Result<(OpCode, matter::transport::proto_demux::ResponseRequired), Error> { + Ok((OpCode::StatusResponse, ResponseRequired::No)) + } } fn handle_data(action: OpCode, data_in: &[u8], data_out: &mut [u8]) -> (DataModel, usize) {