DataModel: Capture pending request for chunked read messages

This commit is contained in:
Kedar Sovani 2023-02-24 19:28:42 +05:30
parent 79d6169c48
commit 487124c9dc
2 changed files with 33 additions and 17 deletions

View file

@ -228,7 +228,7 @@ pub mod subscribe;
/// Type of Resume Request
enum ResumeReq {
Subscribe(subscribe::SubsCtx),
Read,
Read(read::ResumeReadReq),
}
impl objects::ChangeConsumer for DataModel {
@ -263,8 +263,15 @@ impl InteractionConsumer for DataModel {
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<(), Error> {
let is_chunked = self.handle_read_attr_array(req, trans, tw)?;
if !is_chunked {
let resume = self.handle_read_attr_array(req, trans, tw)?;
if let Some(resume) = resume {
// This is a multi-hop read transaction, remember this read request
if !trans.exch.is_data_none() {
error!("Exchange data already set, and multi-hop read");
return Err(Error::InvalidState);
}
trans.exch.set_data_boxed(Box::new(ResumeReq::Read(resume)));
} else {
tw.bool(TagType::Context(SupressResponse as u8), true)?;
// Mark transaction complete, if not chunked
trans.complete();
@ -311,7 +318,7 @@ impl InteractionConsumer for DataModel {
) -> Result<(OpCode, ResponseRequired), Error> {
if let Some(resume) = trans.exch.take_data_boxed::<ResumeReq>() {
match *resume {
ResumeReq::Read => Ok((OpCode::Reserved, ResponseRequired::No)),
ResumeReq::Read(_) => Ok((OpCode::Reserved, ResponseRequired::No)),
ResumeReq::Subscribe(mut ctx) => {
let result = self.handle_subscription_confirm(trans, tw, &mut ctx)?;
trans.exch.set_data_boxed(resume);

View file

@ -16,6 +16,8 @@
*/
use crate::data_model::{core::DataModel, objects::*};
use crate::transport::packet::Packet;
use crate::utils::writebuf::WriteBuf;
use crate::{
acl::{AccessReq, Accessor},
error::*,
@ -104,12 +106,7 @@ impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> {
pub struct ResumeReadReq {
/// The Read Request Attribute Path that caused chunking, and this is the path
/// that needs to be resumed.
///
/// TODO: Ideally, the entire ReadRequest (with any subsequent AttrPaths) should also
/// be maintained. But for now, we just store the AttrPath that caused the overflow
/// and chunking. Hopefully, the other end requests any pending paths when it sees no
/// more chunks.
pending_path: GenericPath,
pending_req: Option<Packet<'static>>,
/// The Attribute that couldn't be encoded because our buffer got full. The next chunk
/// will start encoding from this attribute onwards.
@ -198,7 +195,7 @@ impl DataModel {
read_req: &ReadReq,
trans: &mut Transaction,
tw: &mut TLVWriter,
) -> Result<bool, Error> {
) -> Result<Option<ResumeReadReq>, Error> {
let mut resume_read_req: ResumeReadReq = Default::default();
let mut attr_encoder = AttrReadEncoder::new(tw);
@ -226,19 +223,31 @@ impl DataModel {
&mut attr_details,
&mut resume_read_req.resume_encode,
);
if result.is_err() {
resume_read_req.pending_path = attr_path.to_gp();
break;
}
}
tw.end_container()?;
if result.is_err() {
// If there was an error, indicate chunking. The resume_read_req would have been
// already populated from in the loop above.
tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?;
return Ok(true);
// Retain the entire request, because we need the data-filters, and subsequent attr-reads, if any
// when we resume this read in the next hop
resume_read_req.pending_req = Some(copy_read_req_to_packet(read_req)?);
return Ok(Some(resume_read_req));
}
}
Ok(false)
Ok(None)
}
}
fn copy_read_req_to_packet(read_req: &ReadReq) -> Result<Packet<'static>, Error> {
let mut packet = Packet::new_rx()?;
let backup = packet.as_borrow_slice();
let backup_len = backup.len();
let mut wb = WriteBuf::new(backup, backup_len);
let mut tw = TLVWriter::new(&mut wb);
// TODO: This is unnecessarily wasteful, could directly copy &[u8] if accessible
read_req.to_tlv(&mut tw, TagType::Anonymous)?;
let data_len = wb.as_borrow_slice().len();
packet.get_parsebuf()?.set_len(data_len);
Ok(packet)
}