DataModel: Mov subscribe/status-report into Data Model

There are peculiar ways status-report should be interpreted in various
contexts. Easier to manage this within the Data Model.
This commit is contained in:
Kedar Sovani 2023-02-21 16:44:17 +05:30
parent 0a42c974d0
commit c5345e3034
7 changed files with 124 additions and 58 deletions

View file

@ -28,17 +28,20 @@ use crate::{
fabric::FabricMgr, fabric::FabricMgr,
interaction_model::{ interaction_model::{
command::CommandReq, command::CommandReq,
core::IMStatusCode, core::{IMStatusCode, OpCode},
messages::{ messages::{
ib::{self, AttrData, DataVersionFilter}, ib::{self, AttrData, DataVersionFilter},
msg::{self, InvReq, ReadReq, WriteReq}, msg::{self, InvReq, ReadReq, ReportDataTag::SupressResponse, SubscribeReq, WriteReq},
GenericPath, GenericPath,
}, },
InteractionConsumer, Transaction, InteractionConsumer, Transaction,
}, },
secure_channel::pake::PaseMgr, secure_channel::pake::PaseMgr,
tlv::{TLVArray, TLVWriter, TagType, ToTLV}, tlv::{TLVArray, TLVWriter, TagType, ToTLV},
transport::session::{Session, SessionMode}, transport::{
proto_demux::ResponseRequired,
session::{Session, SessionMode},
},
}; };
use log::{error, info}; use log::{error, info};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -220,6 +223,7 @@ impl DataModel {
} }
pub mod read; pub mod read;
pub mod subscribe;
impl objects::ChangeConsumer for DataModel { impl objects::ChangeConsumer for DataModel {
fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> { fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> {
@ -253,7 +257,13 @@ impl InteractionConsumer for DataModel {
trans: &mut Transaction, trans: &mut Transaction,
tw: &mut TLVWriter, tw: &mut TLVWriter,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.handle_read_attr_array(req, trans, tw) let is_chunked = self.handle_read_attr_array(req, trans, tw)?;
if !is_chunked {
tw.bool(TagType::Context(SupressResponse as u8), true)?;
// Mark transaction complete, if not chunked
trans.complete();
}
Ok(())
} }
fn consume_invoke_cmd( fn consume_invoke_cmd(
@ -286,6 +296,32 @@ impl InteractionConsumer for DataModel {
Ok(()) Ok(())
} }
fn consume_status_report(
&self,
req: &msg::StatusResp,
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> {
let mut handled = false;
let result = self.handle_subscription_confirm(trans, tw, &mut handled);
if handled {
result
} else {
// Nothing to do for now
info!("Received status report with status {:?}", req.status);
Ok((OpCode::StatusResponse, ResponseRequired::No))
}
}
fn consume_subscribe(
&self,
req: &SubscribeReq,
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> {
self.handle_subscribe_req(req, trans, tw)
}
} }
/// Encoder for generating a response to a write request /// Encoder for generating a response to a write request

View file

@ -23,7 +23,7 @@ use crate::{
core::IMStatusCode, core::IMStatusCode,
messages::{ messages::{
ib::{self, DataVersionFilter}, ib::{self, DataVersionFilter},
msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs, ReportDataTag::SupressResponse}, msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs},
GenericPath, GenericPath,
}, },
Transaction, Transaction,
@ -191,12 +191,14 @@ impl DataModel {
} }
/// Process an array of Attribute Read Requests /// Process an array of Attribute Read Requests
///
/// This API returns whether the read response is chunked or not
pub(super) fn handle_read_attr_array( pub(super) fn handle_read_attr_array(
&self, &self,
read_req: &ReadReq, read_req: &ReadReq,
trans: &mut Transaction, trans: &mut Transaction,
tw: &mut TLVWriter, tw: &mut TLVWriter,
) -> Result<(), Error> { ) -> Result<bool, Error> {
let mut resume_read_req: ResumeReadReq = Default::default(); let mut resume_read_req: ResumeReadReq = Default::default();
let mut attr_encoder = AttrReadEncoder::new(tw); let mut attr_encoder = AttrReadEncoder::new(tw);
@ -234,13 +236,9 @@ impl DataModel {
// If there was an error, indicate chunking. The resume_read_req would have been // If there was an error, indicate chunking. The resume_read_req would have been
// already populated from in the loop above. // already populated from in the loop above.
tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?; tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?;
tw.bool(TagType::Context(SupressResponse as u8), false)?; return Ok(true);
// Don't complete the transaction
} else {
tw.bool(TagType::Context(SupressResponse as u8), true)?;
trans.complete();
} }
} }
Ok(()) Ok(false)
} }
} }

View file

@ -19,33 +19,27 @@ use std::sync::atomic::{AtomicU32, Ordering};
use crate::{ use crate::{
error::Error, error::Error,
interaction_model::core::OpCode, interaction_model::{
tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV}, core::OpCode,
transport::{packet::Packet, proto_demux::ResponseRequired}, messages::msg::{self, SubscribeReq, SubscribeResp},
},
tlv::{TLVWriter, TagType, ToTLV},
transport::proto_demux::ResponseRequired,
}; };
use log::error; use log::error;
use super::{ use super::{DataModel, Transaction};
messages::msg::{self, SubscribeReq, SubscribeResp},
InteractionModel, Transaction,
};
static SUBS_ID: AtomicU32 = AtomicU32::new(1); static SUBS_ID: AtomicU32 = AtomicU32::new(1);
impl InteractionModel { impl DataModel {
pub fn handle_subscribe_req( pub fn handle_subscribe_req(
&mut self, &self,
req: &SubscribeReq,
trans: &mut Transaction, trans: &mut Transaction,
rx_buf: &[u8], tw: &mut TLVWriter,
proto_tx: &mut Packet, ) -> Result<(OpCode, ResponseRequired), Error> {
) -> Result<ResponseRequired, Error> {
proto_tx.set_proto_opcode(OpCode::ReportData as u8);
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let root = get_root_node_struct(rx_buf)?;
let req = SubscribeReq::from_tlv(&root)?;
let ctx = Box::new(SubsCtx { let ctx = Box::new(SubsCtx {
state: SubsState::Confirming, state: SubsState::Confirming,
// TODO // TODO
@ -58,11 +52,7 @@ impl InteractionModel {
TagType::Context(msg::ReportDataTag::SubscriptionId as u8), TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
ctx.id, ctx.id,
)?; )?;
self.consumer.consume_read_attr(&read_req, trans, &mut tw)?; self.handle_read_attr_array(&read_req, trans, tw)?;
tw.bool(
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
false,
)?;
tw.end_container()?; tw.end_container()?;
if !trans.exch.is_data_none() { if !trans.exch.is_data_none() {
@ -71,15 +61,15 @@ impl InteractionModel {
} }
trans.exch.set_data_boxed(ctx); trans.exch.set_data_boxed(ctx);
Ok(ResponseRequired::Yes) Ok((OpCode::ReportData, ResponseRequired::Yes))
} }
pub fn handle_subscription_confirm( pub fn handle_subscription_confirm(
&mut self, &self,
trans: &mut Transaction, trans: &mut Transaction,
proto_tx: &mut Packet, tw: &mut TLVWriter,
request_handled: &mut bool, request_handled: &mut bool,
) -> Result<ResponseRequired, Error> { ) -> Result<(OpCode, ResponseRequired), Error> {
*request_handled = false; *request_handled = false;
if let Some(ctx) = trans.exch.get_data_boxed::<SubsCtx>() { if let Some(ctx) = trans.exch.get_data_boxed::<SubsCtx>() {
if ctx.state != SubsState::Confirming { if ctx.state != SubsState::Confirming {
@ -88,14 +78,12 @@ impl InteractionModel {
} }
*request_handled = true; *request_handled = true;
ctx.state = SubsState::Confirmed; ctx.state = SubsState::Confirmed;
proto_tx.set_proto_opcode(OpCode::SubscriptResponse as u8);
// TODO // TODO
let resp = SubscribeResp::new(ctx.id, 40); let resp = SubscribeResp::new(ctx.id, 40);
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); resp.to_tlv(tw, TagType::Anonymous)?;
resp.to_tlv(&mut tw, TagType::Anonymous)?;
trans.complete(); trans.complete();
Ok(ResponseRequired::Yes) Ok((OpCode::SubscriptResponse, ResponseRequired::Yes))
} else { } else {
trans.complete(); trans.complete();
Err(Error::Invalid) Err(Error::Invalid)

View file

@ -33,9 +33,9 @@ use log::{error, info};
use num; use num;
use num_derive::FromPrimitive; use num_derive::FromPrimitive;
use super::InteractionModel;
use super::Transaction; use super::Transaction;
use super::TransactionState; use super::TransactionState;
use super::{messages::msg::SubscribeReq, InteractionModel};
use super::{messages::msg::TimedReq, InteractionConsumer}; use super::{messages::msg::TimedReq, InteractionConsumer};
/* Handle messages related to the Interation Model /* Handle messages related to the Interation Model
@ -100,24 +100,33 @@ impl InteractionModel {
InteractionModel { consumer } InteractionModel { consumer }
} }
pub fn handle_subscribe_req(
&mut self,
trans: &mut Transaction,
rx_buf: &[u8],
proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> {
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let root = get_root_node_struct(rx_buf)?;
let req = SubscribeReq::from_tlv(&root)?;
let (opcode, resp) = self.consumer.consume_subscribe(&req, trans, &mut tw)?;
proto_tx.set_proto_opcode(opcode as u8);
Ok(resp)
}
pub fn handle_status_resp( pub fn handle_status_resp(
&mut self, &mut self,
trans: &mut Transaction, trans: &mut Transaction,
rx_buf: &[u8], rx_buf: &[u8],
proto_tx: &mut Packet, proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> { ) -> Result<ResponseRequired, Error> {
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let root = get_root_node_struct(rx_buf)?; let root = get_root_node_struct(rx_buf)?;
let req = StatusResp::from_tlv(&root)?; let req = StatusResp::from_tlv(&root)?;
let (opcode, resp) = self.consumer.consume_status_report(&req, trans, &mut tw)?;
let mut handled = false; proto_tx.set_proto_opcode(opcode as u8);
let result = self.handle_subscription_confirm(trans, proto_tx, &mut handled); Ok(resp)
if handled {
result
} else {
// Nothing to do for now
info!("Received status report with status {:?}", req.status);
Ok(ResponseRequired::No)
}
} }
pub fn handle_timed_req( pub fn handle_timed_req(

View file

@ -18,10 +18,13 @@
use crate::{ use crate::{
error::Error, error::Error,
tlv::TLVWriter, tlv::TLVWriter,
transport::{exchange::Exchange, session::Session}, transport::{exchange::Exchange, proto_demux::ResponseRequired, session::Session},
}; };
use self::messages::msg::{InvReq, ReadReq, WriteReq}; use self::{
core::OpCode,
messages::msg::{InvReq, ReadReq, StatusResp, SubscribeReq, WriteReq},
};
#[derive(PartialEq)] #[derive(PartialEq)]
pub enum TransactionState { pub enum TransactionState {
@ -55,6 +58,20 @@ pub trait InteractionConsumer {
trans: &mut Transaction, trans: &mut Transaction,
tw: &mut TLVWriter, tw: &mut TLVWriter,
) -> Result<(), Error>; ) -> Result<(), Error>;
fn consume_status_report(
&self,
_req: &StatusResp,
_trans: &mut Transaction,
_tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error>;
fn consume_subscribe(
&self,
_req: &SubscribeReq,
_trans: &mut Transaction,
_tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error>;
} }
pub struct InteractionModel { pub struct InteractionModel {
@ -64,5 +81,4 @@ pub mod command;
pub mod core; pub mod core;
pub mod messages; pub mod messages;
pub mod read; pub mod read;
pub mod subscribe;
pub mod write; pub mod write;

View file

@ -24,7 +24,7 @@ use super::packet::PacketPool;
const MAX_PROTOCOLS: usize = 4; const MAX_PROTOCOLS: usize = 4;
#[derive(PartialEq)] #[derive(PartialEq, Debug)]
pub enum ResponseRequired { pub enum ResponseRequired {
Yes, Yes,
No, No,

View file

@ -32,6 +32,7 @@ use matter::transport::packet::Packet;
use matter::transport::packet::PacketPool; use matter::transport::packet::PacketPool;
use matter::transport::proto_demux::HandleProto; use matter::transport::proto_demux::HandleProto;
use matter::transport::proto_demux::ProtoCtx; use matter::transport::proto_demux::ProtoCtx;
use matter::transport::proto_demux::ResponseRequired;
use matter::transport::session::SessionMgr; use matter::transport::session::SessionMgr;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -108,6 +109,24 @@ impl InteractionConsumer for DataModel {
) -> Result<(), Error> { ) -> Result<(), Error> {
Ok(()) Ok(())
} }
fn consume_status_report(
&self,
_req: &matter::interaction_model::messages::msg::StatusResp,
_trans: &mut Transaction,
_tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> {
Ok((OpCode::StatusResponse, ResponseRequired::No))
}
fn consume_subscribe(
&self,
_req: &matter::interaction_model::messages::msg::SubscribeReq,
_trans: &mut Transaction,
_tw: &mut TLVWriter,
) -> Result<(OpCode, matter::transport::proto_demux::ResponseRequired), Error> {
Ok((OpCode::StatusResponse, ResponseRequired::No))
}
} }
fn handle_data(action: OpCode, data_in: &[u8], data_out: &mut [u8]) -> (DataModel, usize) { fn handle_data(action: OpCode, data_in: &[u8], data_out: &mut [u8]) -> (DataModel, usize) {