ReadReq: Multi-hop read is fully functional

This commit is contained in:
Kedar Sovani 2023-02-26 17:12:09 +05:30
parent dd47aa9a69
commit 78d14629a8
4 changed files with 75 additions and 46 deletions

View file

@ -31,7 +31,7 @@ use crate::{
core::{IMStatusCode, OpCode}, core::{IMStatusCode, OpCode},
messages::{ messages::{
ib::{self, AttrData, DataVersionFilter}, ib::{self, AttrData, DataVersionFilter},
msg::{self, InvReq, ReadReq, ReportDataTag::SupressResponse, SubscribeReq, WriteReq}, msg::{self, InvReq, ReadReq, SubscribeReq, WriteReq},
GenericPath, GenericPath,
}, },
InteractionConsumer, Transaction, InteractionConsumer, Transaction,
@ -266,7 +266,7 @@ impl InteractionConsumer for DataModel {
let mut resume_from = None; let mut resume_from = None;
let root = tlv::get_root_node(rx_buf)?; let root = tlv::get_root_node(rx_buf)?;
let req = ReadReq::from_tlv(&root)?; let req = ReadReq::from_tlv(&root)?;
self.handle_read_attr_array(&req, trans, tw, &mut resume_from)?; self.handle_read_req(&req, trans, tw, &mut resume_from)?;
if resume_from.is_some() { if resume_from.is_some() {
// This is a multi-hop read transaction, remember this read request // This is a multi-hop read transaction, remember this read request
let resume = read::ResumeReadReq::new(rx_buf, &resume_from)?; let resume = read::ResumeReadReq::new(rx_buf, &resume_from)?;
@ -275,10 +275,6 @@ impl InteractionConsumer for DataModel {
return Err(Error::InvalidState); return Err(Error::InvalidState);
} }
trans.exch.set_data_boxed(Box::new(ResumeReq::Read(resume))); trans.exch.set_data_boxed(Box::new(ResumeReq::Read(resume)));
} else {
tw.bool(TagType::Context(SupressResponse as u8), true)?;
// Mark transaction complete, if not chunked
trans.complete();
} }
Ok(()) Ok(())
} }

View file

@ -15,16 +15,12 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::data_model::{core::DataModel, objects::*};
use crate::interaction_model::core::OpCode;
use crate::tlv::FromTLV;
use crate::transport::packet::Packet;
use crate::transport::proto_demux::ResponseRequired;
use crate::{ use crate::{
acl::{AccessReq, Accessor}, acl::{AccessReq, Accessor},
data_model::{core::DataModel, objects::*},
error::*, error::*,
interaction_model::{ interaction_model::{
core::IMStatusCode, core::{IMStatusCode, OpCode},
messages::{ messages::{
ib::{self, DataVersionFilter}, ib::{self, DataVersionFilter},
msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs, ReportDataTag::SupressResponse}, msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs, ReportDataTag::SupressResponse},
@ -32,9 +28,12 @@ use crate::{
}, },
Transaction, Transaction,
}, },
tlv::{self, TLVArray, TLVWriter, TagType, ToTLV}, tlv::{self, FromTLV, TLVArray, TLVWriter, TagType, ToTLV},
transport::{packet::Packet, proto_demux::ResponseRequired},
utils::writebuf::WriteBuf,
wb_shrink, wb_unshrink,
}; };
use log::error;
/// Encoder for generating a response to a read request /// Encoder for generating a response to a read request
pub struct AttrReadEncoder<'a, 'b, 'c> { pub struct AttrReadEncoder<'a, 'b, 'c> {
tw: &'a mut TLVWriter<'b, 'c>, tw: &'a mut TLVWriter<'b, 'c>,
@ -205,15 +204,27 @@ impl DataModel {
/// Process an array of Attribute Read Requests /// Process an array of Attribute Read Requests
/// ///
/// This API returns whether the read response is chunked or not /// When the API returns the chunked read is on, if *resume_from is Some(x) otherwise
/// the read is complete
pub(super) fn handle_read_attr_array( pub(super) fn handle_read_attr_array(
&self, &self,
read_req: &ReadReq, read_req: &ReadReq,
trans: &mut Transaction, trans: &mut Transaction,
tw: &mut TLVWriter, old_tw: &mut TLVWriter,
resume_from: &mut Option<GenericPath>, resume_from: &mut Option<GenericPath>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut attr_encoder = AttrReadEncoder::new(tw); let old_wb = old_tw.get_buf();
// Note, this function may be called from multiple places: a) an actual read
// request, a b) resumed read request, c) subscribe request or d) resumed subscribe
// request. Hopefully 18 is sufficient to address all those scenarios.
//
// This is the amount of space we reserve for other things to be attached towards
// the end
const RESERVE_SIZE: usize = 18;
let mut new_wb = wb_shrink!(old_wb, RESERVE_SIZE);
let mut tw = TLVWriter::new(&mut new_wb);
let mut attr_encoder = AttrReadEncoder::new(&mut tw);
if let Some(filters) = &read_req.dataver_filters { if let Some(filters) = &read_req.dataver_filters {
attr_encoder.set_data_ver_filters(filters); attr_encoder.set_data_ver_filters(filters);
} }
@ -238,20 +249,54 @@ impl DataModel {
&mut attr_details, &mut attr_details,
resume_from, resume_from,
); );
if result.is_err() {
break;
}
} }
tw.end_container()?; // Now that all the read reports are captured, let's use the old_tw that is
// the full writebuf, and hopefully as all the necessary space to store this
wb_unshrink!(old_wb, new_wb);
old_tw.end_container()?; // Finish the AttrReports
if result.is_err() { if result.is_err() {
// If there was an error, indicate chunking. The resume_read_req would have been // If there was an error, indicate chunking. The resume_read_req would have been
// already populated from in the loop above. // already populated in the loop above.
tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?; old_tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?;
return Ok(()); } else {
// A None resume_from indicates no chunking
*resume_from = None;
} }
} }
// A None resume_from indicates no chunking
*resume_from = None;
Ok(()) Ok(())
} }
/// Handle a read request
///
/// This could be called from an actual read request or a resumed read request. Subscription
/// requests do not come to this function.
/// When the API returns the chunked read is on, if *resume_from is Some(x) otherwise
/// the read is complete
pub fn handle_read_req(
&self,
read_req: &ReadReq,
trans: &mut Transaction,
tw: &mut TLVWriter,
resume_from: &mut Option<GenericPath>,
) -> Result<(OpCode, ResponseRequired), Error> {
tw.start_struct(TagType::Anonymous)?;
self.handle_read_attr_array(read_req, trans, tw, resume_from)?;
if resume_from.is_none() {
tw.bool(TagType::Context(SupressResponse as u8), true)?;
// Mark transaction complete, if not chunked
trans.complete();
}
tw.end_container()?;
Ok((OpCode::ReportData, ResponseRequired::Yes))
}
/// Handle a resumed read request
pub fn handle_resume_read( pub fn handle_resume_read(
&self, &self,
resume_read_req: &mut ResumeReadReq, resume_read_req: &mut ResumeReadReq,
@ -263,16 +308,11 @@ impl DataModel {
let root = tlv::get_root_node(rx_buf)?; let root = tlv::get_root_node(rx_buf)?;
let req = ReadReq::from_tlv(&root)?; let req = ReadReq::from_tlv(&root)?;
tw.start_struct(TagType::Anonymous)?; self.handle_read_req(&req, trans, tw, &mut resume_read_req.resume_from)
self.handle_read_attr_array(&req, trans, tw, &mut resume_read_req.resume_from)?; } else {
// No pending req, is that even possible?
if resume_read_req.resume_from.is_none() { error!("This shouldn't have happened");
tw.bool(TagType::Context(SupressResponse as u8), true)?; Ok((OpCode::Reserved, ResponseRequired::No))
// Mark transaction complete, if not chunked
trans.complete();
}
tw.end_container()?;
} }
Ok((OpCode::ReportData, ResponseRequired::Yes))
} }
} }

View file

@ -18,10 +18,8 @@
use crate::{ use crate::{
error::Error, error::Error,
interaction_model::core::OpCode, interaction_model::core::OpCode,
tlv::{TLVWriter, TagType}, tlv::TLVWriter,
transport::{packet::Packet, proto_demux::ResponseRequired}, transport::{packet::Packet, proto_demux::ResponseRequired},
utils::writebuf::WriteBuf,
wb_shrink, wb_unshrink,
}; };
use super::{InteractionModel, Transaction}; use super::{InteractionModel, Transaction};
@ -34,20 +32,11 @@ impl InteractionModel {
proto_tx: &mut Packet, proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> { ) -> Result<ResponseRequired, Error> {
proto_tx.set_proto_opcode(OpCode::ReportData as u8); proto_tx.set_proto_opcode(OpCode::ReportData as u8);
// We have to do these gymnastics because we have to reserve some bytes towards the
// end of the slice for adding our terminating TLVs
const RESERVE_SIZE: usize = 8;
let proto_tx_wb = proto_tx.get_writebuf()?; let proto_tx_wb = proto_tx.get_writebuf()?;
let mut child_wb = wb_shrink!(proto_tx_wb, RESERVE_SIZE); let mut tw = TLVWriter::new(proto_tx_wb);
let mut tw = TLVWriter::new(&mut child_wb);
tw.start_struct(TagType::Anonymous)?;
self.consumer.consume_read_attr(rx_buf, trans, &mut tw)?; self.consumer.consume_read_attr(rx_buf, trans, &mut tw)?;
//Now that we have everything, start using the proto_tx_wb, by unshrinking it
wb_unshrink!(proto_tx_wb, child_wb);
let mut tw = TLVWriter::new(proto_tx_wb);
tw.end_container()?;
Ok(ResponseRequired::Yes) Ok(ResponseRequired::Yes)
} }
} }

View file

@ -264,6 +264,10 @@ impl<'a, 'b> TLVWriter<'a, 'b> {
pub fn rewind_to(&mut self, anchor: usize) { pub fn rewind_to(&mut self, anchor: usize) {
self.buf.rewind_tail_to(anchor); self.buf.rewind_tail_to(anchor);
} }
pub fn get_buf<'c>(&'c mut self) -> &'c mut WriteBuf<'a> {
self.buf
}
} }
#[cfg(test)] #[cfg(test)]