ReadRequest: Chunked Messages - Part 1
- Create the structure so that when the WriteBuf gets full, we return back with MoreChunkedMessages note - When the WriteBuf gets full, we also make a note of the current attribute, so that we can resume from this attribute onwards while sending the next chunk
This commit is contained in:
parent
1807c606a0
commit
60ded6d5fb
3 changed files with 62 additions and 17 deletions
|
@ -153,14 +153,21 @@ impl DataModel {
|
|||
}
|
||||
}
|
||||
|
||||
// Encode a read attribute from a path that may or may not be wildcard
|
||||
/// Encode a read attribute from a path that may or may not be wildcard
|
||||
///
|
||||
/// If the buffer gets full while generating the read response, we will return
|
||||
/// an Err(path), where the path is the path that we should resume from, for the next chunk.
|
||||
/// This facilitates chunk management
|
||||
fn handle_read_attr_path(
|
||||
node: &Node,
|
||||
accessor: &Accessor,
|
||||
attr_encoder: &mut AttrReadEncoder,
|
||||
attr_details: &mut AttrDetails,
|
||||
) {
|
||||
resume_from: &mut Option<GenericPath>,
|
||||
) -> Result<(), GenericPath> {
|
||||
let mut status = Ok(());
|
||||
let path = attr_encoder.path;
|
||||
|
||||
// Skip error reporting for wildcard paths, don't for concrete paths
|
||||
attr_encoder.skip_error(path.is_wildcard());
|
||||
|
||||
|
@ -174,6 +181,16 @@ impl DataModel {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(r) = resume_from {
|
||||
// If resume_from is valid, and we haven't hit the resume_from yet, skip encoding
|
||||
if r != path {
|
||||
return Ok(());
|
||||
} else {
|
||||
// Else, wipe out the resume_from so subsequent paths can be encoded
|
||||
*resume_from = None;
|
||||
}
|
||||
}
|
||||
|
||||
attr_details.attr_id = path.leaf.unwrap_or_default() as u16;
|
||||
// Overwrite the previous path with the concrete path
|
||||
attr_encoder.set_path(*path);
|
||||
|
@ -181,12 +198,17 @@ impl DataModel {
|
|||
attr_encoder.set_data_ver(cluster_data_ver);
|
||||
let mut access_req = AccessReq::new(accessor, path, Access::READ);
|
||||
Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details);
|
||||
if attr_encoder.is_buffer_full() {
|
||||
// Buffer is full, next time resume from this attribute
|
||||
status = Err(*path);
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
if let Err(e) = result {
|
||||
// We hit this only if this is a non-wildcard path
|
||||
attr_encoder.encode_status(e, 0);
|
||||
}
|
||||
status
|
||||
}
|
||||
|
||||
// Handle command from a path that may or may not be wildcard
|
||||
|
@ -320,19 +342,42 @@ impl InteractionConsumer for DataModel {
|
|||
.tw
|
||||
.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
|
||||
|
||||
let mut result = Ok(());
|
||||
let mut resume_from = None;
|
||||
for attr_path in attr_requests.iter() {
|
||||
attr_encoder.set_path(attr_path.to_gp());
|
||||
// Extract the attr_path fields into various structures
|
||||
attr_details.list_index = attr_path.list_index;
|
||||
attr_details.fab_idx = accessor.fab_idx;
|
||||
DataModel::handle_read_attr_path(
|
||||
result = DataModel::handle_read_attr_path(
|
||||
&node,
|
||||
&accessor,
|
||||
&mut attr_encoder,
|
||||
&mut attr_details,
|
||||
&mut resume_from,
|
||||
);
|
||||
if result.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
tw.end_container()?;
|
||||
if let Err(path) = result {
|
||||
// If there was an error, make a note of this 'path' to resume for the next chunk
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
|
||||
true,
|
||||
)?;
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||
false,
|
||||
)?;
|
||||
} else {
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||
true,
|
||||
)?;
|
||||
trans.complete();
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -223,7 +223,7 @@ pub mod msg {
|
|||
SubscriptionId = 0,
|
||||
AttributeReports = 1,
|
||||
_EventReport = 2,
|
||||
_MoreChunkedMsgs = 3,
|
||||
MoreChunkedMsgs = 3,
|
||||
SupressResponse = 4,
|
||||
}
|
||||
|
||||
|
|
|
@ -20,12 +20,11 @@ use crate::{
|
|||
interaction_model::core::OpCode,
|
||||
tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType},
|
||||
transport::{packet::Packet, proto_demux::ResponseRequired},
|
||||
utils::writebuf::WriteBuf,
|
||||
wb_shrink, wb_unshrink,
|
||||
};
|
||||
|
||||
use super::{
|
||||
messages::msg::{self, ReadReq},
|
||||
InteractionModel, Transaction,
|
||||
};
|
||||
use super::{messages::msg::ReadReq, InteractionModel, Transaction};
|
||||
|
||||
impl InteractionModel {
|
||||
pub fn handle_read_req(
|
||||
|
@ -35,21 +34,22 @@ impl InteractionModel {
|
|||
proto_tx: &mut Packet,
|
||||
) -> Result<ResponseRequired, Error> {
|
||||
proto_tx.set_proto_opcode(OpCode::ReportData as u8);
|
||||
|
||||
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
|
||||
// We have to do these gymnastics because we have to reserve some bytes towards the
|
||||
// end of the slice for adding our terminating TLVs
|
||||
const RESERVE_SIZE: usize = 8;
|
||||
let proto_tx_wb = proto_tx.get_writebuf()?;
|
||||
let mut child_wb = wb_shrink!(proto_tx_wb, RESERVE_SIZE);
|
||||
let mut tw = TLVWriter::new(&mut child_wb);
|
||||
let root = get_root_node_struct(rx_buf)?;
|
||||
let read_req = ReadReq::from_tlv(&root)?;
|
||||
|
||||
tw.start_struct(TagType::Anonymous)?;
|
||||
self.consumer.consume_read_attr(&read_req, trans, &mut tw)?;
|
||||
// Supress response always true for read interaction
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||
true,
|
||||
)?;
|
||||
tw.end_container()?;
|
||||
|
||||
trans.complete();
|
||||
// Now that we have everything, start using the proto_tx_wb, by unshrinking it
|
||||
wb_unshrink!(proto_tx_wb, child_wb);
|
||||
let mut tw = TLVWriter::new(proto_tx_wb);
|
||||
tw.end_container()?;
|
||||
Ok(ResponseRequired::Yes)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue