diff --git a/matter/src/interaction_model/core.rs b/matter/src/interaction_model/core.rs index 326fbcc..114e86a 100644 --- a/matter/src/interaction_model/core.rs +++ b/matter/src/interaction_model/core.rs @@ -100,6 +100,26 @@ impl InteractionModel { InteractionModel { consumer } } + pub fn handle_status_resp( + &mut self, + trans: &mut Transaction, + rx_buf: &[u8], + proto_tx: &mut Packet, + ) -> Result { + let root = get_root_node_struct(rx_buf)?; + let req = StatusResp::from_tlv(&root)?; + + let mut handled = false; + let result = self.handle_subscription_confirm(trans, proto_tx, &mut handled); + if handled { + result + } else { + // Nothing to do for now + info!("Received status report with status {:?}", req.status); + Ok(ResponseRequired::No) + } + } + pub fn handle_timed_req( &mut self, trans: &mut Transaction, @@ -162,6 +182,8 @@ impl proto_demux::HandleProto for InteractionModel { OpCode::ReadRequest => self.handle_read_req(&mut trans, buf, &mut ctx.tx)?, OpCode::WriteRequest => self.handle_write_req(&mut trans, buf, &mut ctx.tx)?, OpCode::TimedRequest => self.handle_timed_req(&mut trans, buf, &mut ctx.tx)?, + OpCode::SubscribeRequest => self.handle_subscribe_req(&mut trans, buf, &mut ctx.tx)?, + OpCode::StatusResponse => self.handle_status_resp(&mut trans, buf, &mut ctx.tx)?, _ => { error!("Opcode Not Handled: {:?}", proto_opcode); return Err(Error::InvalidOpcode); diff --git a/matter/src/interaction_model/messages.rs b/matter/src/interaction_model/messages.rs index 2faa9d3..e5c3ba8 100644 --- a/matter/src/interaction_model/messages.rs +++ b/matter/src/interaction_model/messages.rs @@ -71,7 +71,55 @@ pub mod msg { tlv::{FromTLV, TLVArray, TLVElement, TLVWriter, TagType, ToTLV}, }; - use super::ib::{self, AttrData, AttrPath, AttrResp, AttrStatus, CmdData, DataVersionFilter}; + use super::ib::{ + self, AttrData, AttrPath, AttrResp, AttrStatus, CmdData, DataVersionFilter, EventFilter, + EventPath, + }; + + #[derive(FromTLV)] + #[tlvargs(lifetime = "'a")] + pub struct SubscribeReq<'a> { + pub keep_subs: bool, + pub min_int_floor: u16, + pub max_int_ceil: u16, + pub attr_requests: Option>, + event_requests: Option>, + event_filters: Option>, + // The Context Tags are discontiguous for some reason + _dummy: Option, + pub fabric_filtered: bool, + pub dataver_filters: Option>, + } + + impl<'a> SubscribeReq<'a> { + pub fn to_read_req(&self) -> ReadReq<'a> { + ReadReq { + attr_requests: self.attr_requests, + event_requests: self.event_requests, + event_filters: self.event_filters, + fabric_filtered: self.fabric_filtered, + dataver_filters: self.dataver_filters, + } + } + } + + #[derive(ToTLV)] + pub struct SubscribeResp { + pub subs_id: u32, + // The Context Tags are discontiguous for some reason + _dummy: Option, + pub max_int: u16, + } + + impl SubscribeResp { + pub fn new(subs_id: u32, max_int: u16) -> Self { + Self { + subs_id, + _dummy: None, + max_int, + } + } + } #[derive(FromTLV, ToTLV)] pub struct TimedReq { @@ -115,8 +163,8 @@ pub mod msg { #[tlvargs(lifetime = "'a")] pub struct ReadReq<'a> { pub attr_requests: Option>, - event_requests: Option, - event_filters: Option, + event_requests: Option>, + event_filters: Option>, pub fabric_filtered: bool, pub dataver_filters: Option>, } @@ -172,7 +220,7 @@ pub mod msg { } pub enum ReportDataTag { - _SubscriptionId = 0, + SubscriptionId = 0, AttributeReports = 1, _EventReport = 2, _MoreChunkedMsgs = 3, @@ -472,4 +520,20 @@ pub mod ib { pub path: ClusterPath, pub data_ver: u32, } + + #[derive(FromTLV, ToTLV, Copy, Clone)] + #[tlvargs(datatype = "list")] + pub struct EventPath { + pub node: Option, + pub endpoint: Option, + pub cluster: Option, + pub event: Option, + pub is_urgent: Option, + } + + #[derive(FromTLV, ToTLV, Copy, Clone)] + pub struct EventFilter { + pub node: Option, + pub event_min: Option, + } } diff --git a/matter/src/interaction_model/mod.rs b/matter/src/interaction_model/mod.rs index 7d5f757..5b4996f 100644 --- a/matter/src/interaction_model/mod.rs +++ b/matter/src/interaction_model/mod.rs @@ -64,4 +64,5 @@ pub mod command; pub mod core; pub mod messages; pub mod read; +pub mod subscribe; pub mod write; diff --git a/matter/src/interaction_model/subscribe.rs b/matter/src/interaction_model/subscribe.rs new file mode 100644 index 0000000..330ab7f --- /dev/null +++ b/matter/src/interaction_model/subscribe.rs @@ -0,0 +1,115 @@ +/* + * + * Copyright (c) 2023 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::atomic::{AtomicU32, Ordering}; + +use crate::{ + error::Error, + interaction_model::core::OpCode, + tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV}, + transport::{packet::Packet, proto_demux::ResponseRequired}, +}; + +use log::error; + +use super::{ + messages::msg::{self, SubscribeReq, SubscribeResp}, + InteractionModel, Transaction, +}; + +static SUBS_ID: AtomicU32 = AtomicU32::new(1); + +impl InteractionModel { + pub fn handle_subscribe_req( + &mut self, + trans: &mut Transaction, + rx_buf: &[u8], + proto_tx: &mut Packet, + ) -> Result { + proto_tx.set_proto_opcode(OpCode::ReportData as u8); + + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); + let root = get_root_node_struct(rx_buf)?; + let req = SubscribeReq::from_tlv(&root)?; + + let ctx = Box::new(SubsCtx { + state: SubsState::Confirming, + // TODO + id: SUBS_ID.fetch_add(1, Ordering::SeqCst), + }); + + let read_req = req.to_read_req(); + tw.start_struct(TagType::Anonymous)?; + tw.u32( + TagType::Context(msg::ReportDataTag::SubscriptionId as u8), + ctx.id, + )?; + self.consumer.consume_read_attr(&read_req, trans, &mut tw)?; + tw.bool( + TagType::Context(msg::ReportDataTag::SupressResponse as u8), + false, + )?; + tw.end_container()?; + + if !trans.exch.is_data_none() { + error!("Exchange data already set!"); + return Err(Error::InvalidState); + } + trans.exch.set_data_boxed(ctx); + + Ok(ResponseRequired::Yes) + } + + pub fn handle_subscription_confirm( + &mut self, + trans: &mut Transaction, + proto_tx: &mut Packet, + request_handled: &mut bool, + ) -> Result { + *request_handled = false; + if let Some(ctx) = trans.exch.get_data_boxed::() { + if ctx.state != SubsState::Confirming { + // Not relevant for us + return Err(Error::Invalid); + } + *request_handled = true; + ctx.state = SubsState::Confirmed; + proto_tx.set_proto_opcode(OpCode::SubscriptResponse as u8); + + // TODO + let resp = SubscribeResp::new(ctx.id, 40); + let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); + resp.to_tlv(&mut tw, TagType::Anonymous)?; + trans.complete(); + Ok(ResponseRequired::Yes) + } else { + trans.complete(); + Err(Error::Invalid) + } + } +} + +#[derive(PartialEq)] +enum SubsState { + Confirming, + Confirmed, +} + +struct SubsCtx { + state: SubsState, + id: u32, +}