Subscribe: Support for long-read based subscription

The following command works with this: chip-tool  any subscribe-by-id 0xFFFFFFFF 0xFFFFFFFF 1 20 12344321 0xFFFF
This commit is contained in:
Kedar Sovani 2023-02-27 17:44:51 +05:30
parent 78d14629a8
commit d0d853d3c4
5 changed files with 116 additions and 67 deletions

View file

@ -15,6 +15,8 @@
* limitations under the License. * limitations under the License.
*/ */
use self::subscribe::SubsCtx;
use super::{ use super::{
cluster_basic_information::BasicInfoConfig, cluster_basic_information::BasicInfoConfig,
device_types::device_type_add_root_node, device_types::device_type_add_root_node,
@ -31,7 +33,7 @@ use crate::{
core::{IMStatusCode, OpCode}, core::{IMStatusCode, OpCode},
messages::{ messages::{
ib::{self, AttrData, DataVersionFilter}, ib::{self, AttrData, DataVersionFilter},
msg::{self, InvReq, ReadReq, SubscribeReq, WriteReq}, msg::{self, InvReq, ReadReq, WriteReq},
GenericPath, GenericPath,
}, },
InteractionConsumer, Transaction, InteractionConsumer, Transaction,
@ -320,9 +322,7 @@ impl InteractionConsumer for DataModel {
let result = match *resume { let result = match *resume {
ResumeReq::Read(ref mut read) => self.handle_resume_read(read, trans, tw)?, ResumeReq::Read(ref mut read) => self.handle_resume_read(read, trans, tw)?,
ResumeReq::Subscribe(mut ctx) => { ResumeReq::Subscribe(ref mut ctx) => ctx.handle_status_report(trans, tw, self)?,
self.handle_subscription_confirm(trans, tw, &mut ctx)?
}
}; };
trans.exch.set_data_boxed(resume); trans.exch.set_data_boxed(resume);
Ok(result) Ok(result)
@ -336,7 +336,7 @@ impl InteractionConsumer for DataModel {
fn consume_subscribe( fn consume_subscribe(
&self, &self,
req: &SubscribeReq, rx_buf: &[u8],
trans: &mut Transaction, trans: &mut Transaction,
tw: &mut TLVWriter, tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> { ) -> Result<(OpCode, ResponseRequired), Error> {
@ -344,7 +344,7 @@ impl InteractionConsumer for DataModel {
error!("Exchange data already set!"); error!("Exchange data already set!");
return Err(Error::InvalidState); return Err(Error::InvalidState);
} }
let ctx = self.handle_subscribe_req(req, trans, tw)?; let ctx = SubsCtx::new(rx_buf, trans, tw, self)?;
trans trans
.exch .exch
.set_data_boxed(Box::new(ResumeReq::Subscribe(ctx))); .set_data_boxed(Box::new(ResumeReq::Subscribe(ctx)));

View file

@ -34,6 +34,7 @@ use crate::{
wb_shrink, wb_unshrink, wb_shrink, wb_unshrink,
}; };
use log::error; use log::error;
/// Encoder for generating a response to a read request /// Encoder for generating a response to a read request
pub struct AttrReadEncoder<'a, 'b, 'c> { pub struct AttrReadEncoder<'a, 'b, 'c> {
tw: &'a mut TLVWriter<'b, 'c>, tw: &'a mut TLVWriter<'b, 'c>,
@ -107,13 +108,13 @@ impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> {
pub struct ResumeReadReq { pub struct ResumeReadReq {
/// The Read Request Attribute Path that caused chunking, and this is the path /// The Read Request Attribute Path that caused chunking, and this is the path
/// that needs to be resumed. /// that needs to be resumed.
pending_req: Option<Packet<'static>>, pub pending_req: Option<Packet<'static>>,
/// The Attribute that couldn't be encoded because our buffer got full. The next chunk /// The Attribute that couldn't be encoded because our buffer got full. The next chunk
/// will start encoding from this attribute onwards. /// will start encoding from this attribute onwards.
/// Note that given wildcard reads, one PendingPath in the member above can generated /// Note that given wildcard reads, one PendingPath in the member above can generated
/// multiple encode paths. Hence this has to be maintained separately. /// multiple encode paths. Hence this has to be maintained separately.
resume_from: Option<GenericPath>, pub resume_from: Option<GenericPath>,
} }
impl ResumeReadReq { impl ResumeReadReq {
pub fn new(rx_buf: &[u8], resume_from: &Option<GenericPath>) -> Result<Self, Error> { pub fn new(rx_buf: &[u8], resume_from: &Option<GenericPath>) -> Result<Self, Error> {

View file

@ -21,71 +21,122 @@ use crate::{
error::Error, error::Error,
interaction_model::{ interaction_model::{
core::OpCode, core::OpCode,
messages::msg::{self, SubscribeReq, SubscribeResp}, messages::{
msg::{self, SubscribeReq, SubscribeResp},
GenericPath,
},
}, },
tlv::{TLVWriter, TagType, ToTLV}, tlv::{self, get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV},
transport::proto_demux::ResponseRequired, transport::proto_demux::ResponseRequired,
}; };
use super::{DataModel, Transaction}; use super::{read::ResumeReadReq, DataModel, Transaction};
static SUBS_ID: AtomicU32 = AtomicU32::new(1); static SUBS_ID: AtomicU32 = AtomicU32::new(1);
impl DataModel { #[derive(PartialEq)]
pub fn handle_subscribe_req(
&self,
req: &SubscribeReq,
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<SubsCtx, Error> {
let ctx = SubsCtx {
state: SubsState::Confirming,
// TODO
id: SUBS_ID.fetch_add(1, Ordering::SeqCst),
};
let read_req = req.to_read_req();
tw.start_struct(TagType::Anonymous)?;
tw.u32(
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
ctx.id,
)?;
let mut resume_from = None;
self.handle_read_attr_array(&read_req, trans, tw, &mut resume_from)?;
tw.end_container()?;
Ok(ctx)
}
pub fn handle_subscription_confirm(
&self,
trans: &mut Transaction,
tw: &mut TLVWriter,
ctx: &mut SubsCtx,
) -> Result<(OpCode, ResponseRequired), Error> {
if ctx.state != SubsState::Confirming {
// Not relevant for us
trans.complete();
return Err(Error::Invalid);
}
ctx.state = SubsState::Confirmed;
// TODO
let resp = SubscribeResp::new(ctx.id, 40);
resp.to_tlv(tw, TagType::Anonymous)?;
trans.complete();
Ok((OpCode::SubscriptResponse, ResponseRequired::Yes))
}
}
#[derive(PartialEq, Clone, Copy)]
enum SubsState { enum SubsState {
Confirming, Confirming,
Confirmed, Confirmed,
} }
#[derive(Clone, Copy)]
pub struct SubsCtx { pub struct SubsCtx {
state: SubsState, state: SubsState,
id: u32, id: u32,
resume_read_req: Option<ResumeReadReq>,
}
impl SubsCtx {
pub fn new(
rx_buf: &[u8],
trans: &mut Transaction,
tw: &mut TLVWriter,
dm: &DataModel,
) -> Result<Self, Error> {
let root = get_root_node_struct(rx_buf)?;
let req = SubscribeReq::from_tlv(&root)?;
let mut ctx = SubsCtx {
state: SubsState::Confirming,
// TODO
id: SUBS_ID.fetch_add(1, Ordering::SeqCst),
resume_read_req: None,
};
let mut resume_from = None;
ctx.do_read(&req, trans, tw, dm, &mut resume_from)?;
if resume_from.is_some() {
// This is a multi-hop read transaction, remember this read request
ctx.resume_read_req = Some(ResumeReadReq::new(rx_buf, &resume_from)?);
}
Ok(ctx)
}
pub fn handle_status_report(
&mut self,
trans: &mut Transaction,
tw: &mut TLVWriter,
dm: &DataModel,
) -> Result<(OpCode, ResponseRequired), Error> {
if self.state != SubsState::Confirming {
// Not relevant for us
trans.complete();
return Err(Error::Invalid);
}
// Is there a previous resume read pending
if self.resume_read_req.is_some() {
let mut resume_read_req = self.resume_read_req.take().unwrap();
if let Some(packet) = resume_read_req.pending_req.as_mut() {
let rx_buf = packet.get_parsebuf()?.as_borrow_slice();
let root = tlv::get_root_node(rx_buf)?;
let req = SubscribeReq::from_tlv(&root)?;
self.do_read(&req, trans, tw, dm, &mut resume_read_req.resume_from)?;
if resume_read_req.resume_from.is_some() {
// More chunks are pending, setup resume_read_req again
self.resume_read_req = Some(resume_read_req);
}
return Ok((OpCode::ReportData, ResponseRequired::Yes));
}
}
// We are here implies that the read is now complete
self.confirm_subscription(trans, tw)
}
fn confirm_subscription(
&mut self,
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> {
self.state = SubsState::Confirmed;
// TODO
let resp = SubscribeResp::new(self.id, 40);
resp.to_tlv(tw, TagType::Anonymous)?;
trans.complete();
Ok((OpCode::SubscriptResponse, ResponseRequired::Yes))
}
fn do_read(
&mut self,
req: &SubscribeReq,
trans: &mut Transaction,
tw: &mut TLVWriter,
dm: &DataModel,
resume_from: &mut Option<GenericPath>,
) -> Result<(), Error> {
let read_req = req.to_read_req();
tw.start_struct(TagType::Anonymous)?;
tw.u32(
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
self.id,
)?;
dm.handle_read_attr_array(&read_req, trans, tw, resume_from)?;
tw.end_container()?;
Ok(())
}
} }

View file

@ -33,9 +33,9 @@ use log::{error, info};
use num; use num;
use num_derive::FromPrimitive; use num_derive::FromPrimitive;
use super::InteractionModel;
use super::Transaction; use super::Transaction;
use super::TransactionState; use super::TransactionState;
use super::{messages::msg::SubscribeReq, InteractionModel};
use super::{messages::msg::TimedReq, InteractionConsumer}; use super::{messages::msg::TimedReq, InteractionConsumer};
/* Handle messages related to the Interation Model /* Handle messages related to the Interation Model
@ -107,10 +107,7 @@ impl InteractionModel {
proto_tx: &mut Packet, proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> { ) -> Result<ResponseRequired, Error> {
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let root = get_root_node_struct(rx_buf)?; let (opcode, resp) = self.consumer.consume_subscribe(rx_buf, trans, &mut tw)?;
let req = SubscribeReq::from_tlv(&root)?;
let (opcode, resp) = self.consumer.consume_subscribe(&req, trans, &mut tw)?;
proto_tx.set_proto_opcode(opcode as u8); proto_tx.set_proto_opcode(opcode as u8);
Ok(resp) Ok(resp)
} }

View file

@ -23,7 +23,7 @@ use crate::{
use self::{ use self::{
core::OpCode, core::OpCode,
messages::msg::{InvReq, StatusResp, SubscribeReq, WriteReq}, messages::msg::{InvReq, StatusResp, WriteReq},
}; };
#[derive(PartialEq)] #[derive(PartialEq)]
@ -70,7 +70,7 @@ pub trait InteractionConsumer {
fn consume_subscribe( fn consume_subscribe(
&self, &self,
_req: &SubscribeReq, _req: &[u8],
_trans: &mut Transaction, _trans: &mut Transaction,
_tw: &mut TLVWriter, _tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error>; ) -> Result<(OpCode, ResponseRequired), Error>;