DataModel: Demultiplex multi-leg transactions in core/mod.rs

In following commits chunked read will be added
This commit is contained in:
Kedar Sovani 2023-02-22 14:38:42 +05:30
parent c5345e3034
commit e05f40c506
2 changed files with 44 additions and 37 deletions

View file

@ -225,6 +225,12 @@ impl DataModel {
pub mod read;
pub mod subscribe;
/// Type of Resume Request
enum ResumeReq {
Subscribe(subscribe::SubsCtx),
Read,
}
impl objects::ChangeConsumer for DataModel {
fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> {
endpoint.add_cluster(DescriptorCluster::new(id, self.clone())?)?;
@ -303,14 +309,20 @@ impl InteractionConsumer for DataModel {
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> {
let mut handled = false;
let result = self.handle_subscription_confirm(trans, tw, &mut handled);
if handled {
result
if let Some(resume) = trans.exch.take_data_boxed::<ResumeReq>() {
match *resume {
ResumeReq::Read => Ok((OpCode::Reserved, ResponseRequired::No)),
ResumeReq::Subscribe(mut ctx) => {
let result = self.handle_subscription_confirm(trans, tw, &mut ctx)?;
trans.exch.set_data_boxed(resume);
Ok(result)
}
}
} else {
// Nothing to do for now
trans.complete();
info!("Received status report with status {:?}", req.status);
Ok((OpCode::StatusResponse, ResponseRequired::No))
Ok((OpCode::Reserved, ResponseRequired::No))
}
}
@ -320,7 +332,15 @@ impl InteractionConsumer for DataModel {
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> {
self.handle_subscribe_req(req, trans, tw)
if !trans.exch.is_data_none() {
error!("Exchange data already set!");
return Err(Error::InvalidState);
}
let ctx = self.handle_subscribe_req(req, trans, tw)?;
trans
.exch
.set_data_boxed(Box::new(ResumeReq::Subscribe(ctx)));
Ok((OpCode::ReportData, ResponseRequired::Yes))
}
}

View file

@ -27,8 +27,6 @@ use crate::{
transport::proto_demux::ResponseRequired,
};
use log::error;
use super::{DataModel, Transaction};
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
@ -39,12 +37,12 @@ impl DataModel {
req: &SubscribeReq,
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<(OpCode, ResponseRequired), Error> {
let ctx = Box::new(SubsCtx {
) -> Result<SubsCtx, Error> {
let ctx = SubsCtx {
state: SubsState::Confirming,
// TODO
id: SUBS_ID.fetch_add(1, Ordering::SeqCst),
});
};
let read_req = req.to_read_req();
tw.start_struct(TagType::Anonymous)?;
@ -55,28 +53,20 @@ impl DataModel {
self.handle_read_attr_array(&read_req, trans, tw)?;
tw.end_container()?;
if !trans.exch.is_data_none() {
error!("Exchange data already set!");
return Err(Error::InvalidState);
}
trans.exch.set_data_boxed(ctx);
Ok((OpCode::ReportData, ResponseRequired::Yes))
Ok(ctx)
}
pub fn handle_subscription_confirm(
&self,
trans: &mut Transaction,
tw: &mut TLVWriter,
request_handled: &mut bool,
ctx: &mut SubsCtx,
) -> Result<(OpCode, ResponseRequired), Error> {
*request_handled = false;
if let Some(ctx) = trans.exch.get_data_boxed::<SubsCtx>() {
if ctx.state != SubsState::Confirming {
// Not relevant for us
trans.complete();
return Err(Error::Invalid);
}
*request_handled = true;
ctx.state = SubsState::Confirmed;
// TODO
@ -84,20 +74,17 @@ impl DataModel {
resp.to_tlv(tw, TagType::Anonymous)?;
trans.complete();
Ok((OpCode::SubscriptResponse, ResponseRequired::Yes))
} else {
trans.complete();
Err(Error::Invalid)
}
}
}
#[derive(PartialEq)]
#[derive(PartialEq, Clone, Copy)]
enum SubsState {
Confirming,
Confirmed,
}
struct SubsCtx {
#[derive(Clone, Copy)]
pub struct SubsCtx {
state: SubsState,
id: u32,
}