From d67e134e26de29493e4fee6abb666a05f1e107c4 Mon Sep 17 00:00:00 2001 From: Kedar Sovani Date: Tue, 3 Jan 2023 15:25:44 +0530 Subject: [PATCH 1/2] Baseline timed request handling --- matter/src/interaction_model/command.rs | 14 ++++- matter/src/interaction_model/core.rs | 80 ++++++++++++++++++++++-- matter/src/interaction_model/messages.rs | 11 ++++ matter/src/interaction_model/mod.rs | 10 +-- matter/src/interaction_model/write.rs | 3 + matter/src/transport/exchange.rs | 63 ++++++++++++++++--- 6 files changed, 163 insertions(+), 18 deletions(-) diff --git a/matter/src/interaction_model/command.rs b/matter/src/interaction_model/command.rs index a566e17..f6e298d 100644 --- a/matter/src/interaction_model/command.rs +++ b/matter/src/interaction_model/command.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +use super::core::IMStatusCode; use super::core::OpCode; use super::messages::ib; use super::messages::msg; @@ -50,12 +51,23 @@ impl InteractionModel { rx_buf: &[u8], proto_tx: &mut Packet, ) -> Result { - proto_tx.set_proto_opcode(OpCode::InvokeResponse as u8); + if InteractionModel::req_timeout_handled(trans, proto_tx)? == true { + return Ok(ResponseRequired::Yes); + } + proto_tx.set_proto_opcode(OpCode::InvokeResponse as u8); let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); let root = get_root_node_struct(rx_buf)?; let inv_req = InvReq::from_tlv(&root)?; + let timed_tx = trans.exch.get_expiry_ts().map(|_| true); + let timed_request = inv_req.timed_request.filter(|a| *a == true); + // Either both should be None, or both should be Some(true) + if timed_tx != timed_request { + InteractionModel::create_status_response(proto_tx, IMStatusCode::TimedRequestMisMatch)?; + return Ok(ResponseRequired::Yes); + } + tw.start_struct(TagType::Anonymous)?; // Suppress Response -> TODO: Need to revisit this for cases where we send a command back tw.bool( diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index 86e0bef..bb13588 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -15,10 +15,15 @@ * limitations under the License. */ +use std::time::{Duration, SystemTime}; + use crate::{ error::*, - tlv::{self, FromTLV, TLVElement, TLVWriter, TagType, ToTLV}, + interaction_model::messages::msg::StatusResp, + tlv::{self, get_root_node_struct, FromTLV, TLVElement, TLVWriter, TagType, ToTLV}, transport::{ + exchange::Exchange, + packet::Packet, proto_demux::{self, ProtoCtx, ResponseRequired}, session::Session, }, @@ -28,10 +33,10 @@ use log::{error, info}; use num; use num_derive::FromPrimitive; -use super::InteractionConsumer; use super::InteractionModel; use super::Transaction; use super::TransactionState; +use super::{messages::msg::TimedReq, InteractionConsumer}; /* Handle messages related to the Interation Model */ @@ -55,11 +60,11 @@ pub enum OpCode { } impl<'a> Transaction<'a> { - pub fn new(session: &'a mut Session) -> Self { + pub fn new(session: &'a mut Session, exch: &'a mut Exchange) -> Self { Self { state: TransactionState::Ongoing, - data: None, session, + exch, } } @@ -70,17 +75,77 @@ impl<'a> Transaction<'a> { pub fn is_complete(&self) -> bool { self.state == TransactionState::Complete } + + pub fn set_timeout(&mut self, timeout: u64) { + self.exch + .set_expiry_ts(SystemTime::now().checked_add(Duration::from_millis(timeout))); + } + + pub fn has_timed_out(&self) -> bool { + if let Some(timeout) = self.exch.get_expiry_ts() { + if SystemTime::now() > timeout { + return true; + } + } + false + } } impl InteractionModel { pub fn new(consumer: Box) -> InteractionModel { InteractionModel { consumer } } + + pub fn handle_timed_req( + &mut self, + trans: &mut Transaction, + rx_buf: &[u8], + proto_tx: &mut Packet, + ) -> Result { + proto_tx.set_proto_opcode(OpCode::StatusResponse as u8); + + let root = get_root_node_struct(rx_buf)?; + let req = TimedReq::from_tlv(&root)?; + trans.set_timeout(req.timeout.into()); + + let status = StatusResp { + status: IMStatusCode::Sucess, + }; + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); + let _ = status.to_tlv(&mut tw, TagType::Anonymous); + Ok(ResponseRequired::Yes) + } + + /// Handle Request Timeouts + /// This API checks if a request was a timed request, and if so, and if the timeout has + /// expired, it will generate the appropriate response as expected + pub(super) fn req_timeout_handled( + trans: &mut Transaction, + proto_tx: &mut Packet, + ) -> Result { + if trans.has_timed_out() { + trans.complete(); + InteractionModel::create_status_response(proto_tx, IMStatusCode::Timeout)?; + Ok(true) + } else { + Ok(false) + } + } + + pub(super) fn create_status_response( + proto_tx: &mut Packet, + status: IMStatusCode, + ) -> Result<(), Error> { + proto_tx.set_proto_opcode(OpCode::StatusResponse as u8); + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); + let status = StatusResp { status }; + status.to_tlv(&mut tw, TagType::Anonymous) + } } impl proto_demux::HandleProto for InteractionModel { fn handle_proto_id(&mut self, ctx: &mut ProtoCtx) -> Result { - let mut trans = Transaction::new(&mut ctx.exch_ctx.sess); + let mut trans = Transaction::new(&mut ctx.exch_ctx.sess, ctx.exch_ctx.exch); let proto_opcode: OpCode = num::FromPrimitive::from_u8(ctx.rx.get_proto_opcode()).ok_or(Error::Invalid)?; ctx.tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16); @@ -92,6 +157,7 @@ impl proto_demux::HandleProto for InteractionModel { OpCode::InvokeRequest => self.handle_invoke_req(&mut trans, buf, &mut ctx.tx)?, OpCode::ReadRequest => self.handle_read_req(&mut trans, buf, &mut ctx.tx)?, OpCode::WriteRequest => self.handle_write_req(&mut trans, buf, &mut ctx.tx)?, + OpCode::TimedRequest => self.handle_timed_req(&mut trans, buf, &mut ctx.tx)?, _ => { error!("Opcode Not Handled: {:?}", proto_opcode); return Err(Error::InvalidOpcode); @@ -137,6 +203,10 @@ pub enum IMStatusCode { UnsupportedCluster = 0xc3, NoUpstreamSubscription = 0xc5, NeedsTimedInteraction = 0xc6, + UnsupportedEvent = 0xc7, + PathsExhausted = 0xc8, + TimedRequestMisMatch = 0xc9, + FailSafeRequired = 0xca, } impl From for IMStatusCode { diff --git a/matter/src/interaction_model/messages.rs b/matter/src/interaction_model/messages.rs index 974e9af..e54bad0 100644 --- a/matter/src/interaction_model/messages.rs +++ b/matter/src/interaction_model/messages.rs @@ -67,11 +67,22 @@ pub mod msg { use crate::{ error::Error, + interaction_model::core::IMStatusCode, tlv::{FromTLV, TLVArray, TLVElement, TLVWriter, TagType, ToTLV}, }; use super::ib::{AttrData, AttrPath, AttrResp, CmdData, DataVersionFilter}; + #[derive(FromTLV)] + pub struct TimedReq { + pub timeout: u16, + } + + #[derive(ToTLV)] + pub struct StatusResp { + pub status: IMStatusCode, + } + #[derive(FromTLV)] #[tlvargs(lifetime = "'a")] pub struct InvReq<'a> { diff --git a/matter/src/interaction_model/mod.rs b/matter/src/interaction_model/mod.rs index 3bd45ca..7d5f757 100644 --- a/matter/src/interaction_model/mod.rs +++ b/matter/src/interaction_model/mod.rs @@ -15,9 +15,11 @@ * limitations under the License. */ -use std::any::Any; - -use crate::{error::Error, tlv::TLVWriter, transport::session::Session}; +use crate::{ + error::Error, + tlv::TLVWriter, + transport::{exchange::Exchange, session::Session}, +}; use self::messages::msg::{InvReq, ReadReq, WriteReq}; @@ -28,8 +30,8 @@ pub enum TransactionState { } pub struct Transaction<'a> { pub state: TransactionState, - pub data: Option>, pub session: &'a mut Session, + pub exch: &'a mut Exchange, } pub trait InteractionConsumer { diff --git a/matter/src/interaction_model/write.rs b/matter/src/interaction_model/write.rs index 2a78e50..cad15eb 100644 --- a/matter/src/interaction_model/write.rs +++ b/matter/src/interaction_model/write.rs @@ -32,6 +32,9 @@ impl InteractionModel { rx_buf: &[u8], proto_tx: &mut Packet, ) -> Result { + if InteractionModel::req_timeout_handled(trans, proto_tx)? == true { + return Ok(ResponseRequired::Yes); + } proto_tx.set_proto_opcode(OpCode::WriteResponse as u8); let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); diff --git a/matter/src/transport/exchange.rs b/matter/src/transport/exchange.rs index 1c181c5..ea8690c 100644 --- a/matter/src/transport/exchange.rs +++ b/matter/src/transport/exchange.rs @@ -20,6 +20,7 @@ use colored::*; use log::{error, info, trace}; use std::any::Any; use std::fmt; +use std::time::SystemTime; use crate::error::Error; use crate::secure_channel; @@ -59,17 +60,32 @@ impl Default for State { } } +// Instead of just doing an Option<>, we create some special handling +// where the commonly used higher layer data store does't have to do a Box +#[derive(Debug)] +pub enum DataOption { + Boxed(Box), + Time(SystemTime), + None, +} + +impl Default for DataOption { + fn default() -> Self { + DataOption::None + } +} + #[derive(Debug, Default)] pub struct Exchange { id: u16, sess_idx: usize, role: Role, state: State, + mrp: ReliableMessage, // Currently I see this primarily used in PASE and CASE. If that is the limited use // of this, we might move this into a separate data structure, so as not to burden // all 'exchanges'. - data: Option>, - mrp: ReliableMessage, + data: DataOption, } impl Exchange { @@ -79,13 +95,13 @@ impl Exchange { sess_idx, role, state: State::Open, - data: None, mrp: ReliableMessage::new(), + ..Default::default() } } pub fn close(&mut self) { - self.data = None; + self.data = DataOption::None; self.state = State::Close; } @@ -106,20 +122,51 @@ impl Exchange { self.role } + pub fn is_data_none(&self) -> bool { + match self.data { + DataOption::None => true, + _ => false, + } + } + pub fn set_exchange_data(&mut self, data: Box) { - self.data = Some(data); + self.data = DataOption::Boxed(data); } pub fn clear_exchange_data(&mut self) { - self.data = None; + self.data = DataOption::None; } pub fn get_exchange_data(&mut self) -> Option<&mut T> { - self.data.as_mut()?.downcast_mut::() + if let DataOption::Boxed(a) = &mut self.data { + a.downcast_mut::() + } else { + None + } } pub fn take_exchange_data(&mut self) -> Option> { - self.data.take()?.downcast::().ok() + let old = std::mem::replace(&mut self.data, DataOption::None); + match old { + DataOption::Boxed(d) => d.downcast::().ok(), + _ => { + self.data = old; + None + } + } + } + + pub fn set_expiry_ts(&mut self, expiry_ts: Option) { + if let Some(t) = expiry_ts { + self.data = DataOption::Time(t); + } + } + + pub fn get_expiry_ts(&self) -> Option { + match self.data { + DataOption::Time(t) => Some(t), + _ => None, + } } fn send( From e398fbfe000b4eced1e0e46e69ae1493cd17a09f Mon Sep 17 00:00:00 2001 From: Kedar Sovani Date: Wed, 4 Jan 2023 08:38:18 +0530 Subject: [PATCH 2/2] Transaction: Meaningful method names --- matter/src/interaction_model/command.rs | 2 +- matter/src/interaction_model/core.rs | 8 ++++++-- matter/src/secure_channel/case.rs | 6 +++--- matter/src/transport/exchange.rs | 12 ++++++------ 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/matter/src/interaction_model/command.rs b/matter/src/interaction_model/command.rs index f6e298d..4867ac9 100644 --- a/matter/src/interaction_model/command.rs +++ b/matter/src/interaction_model/command.rs @@ -60,7 +60,7 @@ impl InteractionModel { let root = get_root_node_struct(rx_buf)?; let inv_req = InvReq::from_tlv(&root)?; - let timed_tx = trans.exch.get_expiry_ts().map(|_| true); + let timed_tx = trans.get_timeout().map(|_| true); let timed_request = inv_req.timed_request.filter(|a| *a == true); // Either both should be None, or both should be Some(true) if timed_tx != timed_request { diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index bb13588..bc22385 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -78,11 +78,15 @@ impl<'a> Transaction<'a> { pub fn set_timeout(&mut self, timeout: u64) { self.exch - .set_expiry_ts(SystemTime::now().checked_add(Duration::from_millis(timeout))); + .set_data_time(SystemTime::now().checked_add(Duration::from_millis(timeout))); + } + + pub fn get_timeout(&mut self) -> Option { + self.exch.get_data_time() } pub fn has_timed_out(&self) -> bool { - if let Some(timeout) = self.exch.get_expiry_ts() { + if let Some(timeout) = self.exch.get_data_time() { if SystemTime::now() > timeout { return true; } diff --git a/matter/src/secure_channel/case.rs b/matter/src/secure_channel/case.rs index 309b414..75d1fc9 100644 --- a/matter/src/secure_channel/case.rs +++ b/matter/src/secure_channel/case.rs @@ -82,7 +82,7 @@ impl Case { let mut case_session = ctx .exch_ctx .exch - .take_exchange_data::() + .take_data_boxed::() .ok_or(Error::InvalidState)?; if case_session.state != State::Sigma1Rx { return Err(Error::Invalid); @@ -171,7 +171,7 @@ impl Case { SCStatusCodes::SessionEstablishmentSuccess, None, )?; - ctx.exch_ctx.exch.clear_exchange_data(); + ctx.exch_ctx.exch.clear_data_boxed(); ctx.exch_ctx.exch.close(); Ok(()) @@ -269,7 +269,7 @@ impl Case { tw.str16(TagType::Context(4), encrypted)?; tw.end_container()?; case_session.tt_hash.update(ctx.tx.as_borrow_slice())?; - ctx.exch_ctx.exch.set_exchange_data(case_session); + ctx.exch_ctx.exch.set_data_boxed(case_session); Ok(()) } diff --git a/matter/src/transport/exchange.rs b/matter/src/transport/exchange.rs index ea8690c..191c3e4 100644 --- a/matter/src/transport/exchange.rs +++ b/matter/src/transport/exchange.rs @@ -129,15 +129,15 @@ impl Exchange { } } - pub fn set_exchange_data(&mut self, data: Box) { + pub fn set_data_boxed(&mut self, data: Box) { self.data = DataOption::Boxed(data); } - pub fn clear_exchange_data(&mut self) { + pub fn clear_data_boxed(&mut self) { self.data = DataOption::None; } - pub fn get_exchange_data(&mut self) -> Option<&mut T> { + pub fn get_data_boxed(&mut self) -> Option<&mut T> { if let DataOption::Boxed(a) = &mut self.data { a.downcast_mut::() } else { @@ -145,7 +145,7 @@ impl Exchange { } } - pub fn take_exchange_data(&mut self) -> Option> { + pub fn take_data_boxed(&mut self) -> Option> { let old = std::mem::replace(&mut self.data, DataOption::None); match old { DataOption::Boxed(d) => d.downcast::().ok(), @@ -156,13 +156,13 @@ impl Exchange { } } - pub fn set_expiry_ts(&mut self, expiry_ts: Option) { + pub fn set_data_time(&mut self, expiry_ts: Option) { if let Some(t) = expiry_ts { self.data = DataOption::Time(t); } } - pub fn get_expiry_ts(&self) -> Option { + pub fn get_data_time(&self) -> Option { match self.data { DataOption::Time(t) => Some(t), _ => None,