ReadRequest: Create structure for recording ResumeReadRequest
This commit is contained in:
parent
e41f6ac99a
commit
88bcbb1b02
1 changed files with 34 additions and 17 deletions
|
@ -164,7 +164,7 @@ impl DataModel {
|
||||||
attr_encoder: &mut AttrReadEncoder,
|
attr_encoder: &mut AttrReadEncoder,
|
||||||
attr_details: &mut AttrDetails,
|
attr_details: &mut AttrDetails,
|
||||||
resume_from: &mut Option<GenericPath>,
|
resume_from: &mut Option<GenericPath>,
|
||||||
) -> Result<(), GenericPath> {
|
) -> Result<(), Error> {
|
||||||
let mut status = Ok(());
|
let mut status = Ok(());
|
||||||
let path = attr_encoder.path;
|
let path = attr_encoder.path;
|
||||||
|
|
||||||
|
@ -181,6 +181,8 @@ impl DataModel {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The resume_from indicates that this is the next chunk of a previous Read Request. In such cases, we
|
||||||
|
// need to skip until we hit this path.
|
||||||
if let Some(r) = resume_from {
|
if let Some(r) = resume_from {
|
||||||
// If resume_from is valid, and we haven't hit the resume_from yet, skip encoding
|
// If resume_from is valid, and we haven't hit the resume_from yet, skip encoding
|
||||||
if r != path {
|
if r != path {
|
||||||
|
@ -200,7 +202,8 @@ impl DataModel {
|
||||||
Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details);
|
Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details);
|
||||||
if attr_encoder.is_buffer_full() {
|
if attr_encoder.is_buffer_full() {
|
||||||
// Buffer is full, next time resume from this attribute
|
// Buffer is full, next time resume from this attribute
|
||||||
status = Err(*path);
|
*resume_from = Some(*path);
|
||||||
|
status = Err(Error::NoSpace);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
@ -295,6 +298,26 @@ impl objects::ChangeConsumer for DataModel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// State to maintain when a Read Request needs to be resumed
|
||||||
|
/// resumed - the next chunk of the read needs to be returned
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ResumeReadReq {
|
||||||
|
/// The Read Request Attribute Path that caused chunking, and this is the path
|
||||||
|
/// that needs to be resumed.
|
||||||
|
///
|
||||||
|
/// TODO: Ideally, the entire ReadRequest (with any subsequent AttrPaths) should also
|
||||||
|
/// be maintained. But for now, we just store the AttrPath that caused the overflow
|
||||||
|
/// and chunking. Hopefully, the other end requests any pending paths when it sees no
|
||||||
|
/// more chunks.
|
||||||
|
pending_path: GenericPath,
|
||||||
|
|
||||||
|
/// The Attribute that couldn't be encoded because our buffer got full. The next chunk
|
||||||
|
/// will start encoding from this attribute onwards.
|
||||||
|
/// Note that given wildcard reads, one PendingPath in the member above can generated
|
||||||
|
/// multiple encode paths. Hence this has to be maintained separately.
|
||||||
|
resume_encode: Option<GenericPath>,
|
||||||
|
}
|
||||||
|
|
||||||
impl InteractionConsumer for DataModel {
|
impl InteractionConsumer for DataModel {
|
||||||
fn consume_write_attr(
|
fn consume_write_attr(
|
||||||
&self,
|
&self,
|
||||||
|
@ -320,49 +343,42 @@ impl InteractionConsumer for DataModel {
|
||||||
trans: &mut Transaction,
|
trans: &mut Transaction,
|
||||||
tw: &mut TLVWriter,
|
tw: &mut TLVWriter,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
let mut resume_read_req: ResumeReadReq = Default::default();
|
||||||
|
|
||||||
let mut attr_encoder = AttrReadEncoder::new(tw);
|
let mut attr_encoder = AttrReadEncoder::new(tw);
|
||||||
if let Some(filters) = &read_req.dataver_filters {
|
if let Some(filters) = &read_req.dataver_filters {
|
||||||
attr_encoder.set_data_ver_filters(filters);
|
attr_encoder.set_data_ver_filters(filters);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut attr_details = AttrDetails {
|
|
||||||
// This will be updated internally
|
|
||||||
attr_id: 0,
|
|
||||||
// This will be updated internally
|
|
||||||
list_index: None,
|
|
||||||
// This will be updated internally
|
|
||||||
fab_idx: 0,
|
|
||||||
fab_filter: read_req.fabric_filtered,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(attr_requests) = &read_req.attr_requests {
|
if let Some(attr_requests) = &read_req.attr_requests {
|
||||||
let accessor = self.sess_to_accessor(trans.session);
|
let accessor = self.sess_to_accessor(trans.session);
|
||||||
|
let mut attr_details = AttrDetails::new(accessor.fab_idx, read_req.fabric_filtered);
|
||||||
let node = self.node.read().unwrap();
|
let node = self.node.read().unwrap();
|
||||||
attr_encoder
|
attr_encoder
|
||||||
.tw
|
.tw
|
||||||
.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
|
.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
|
||||||
|
|
||||||
let mut result = Ok(());
|
let mut result = Ok(());
|
||||||
let mut resume_from = None;
|
|
||||||
for attr_path in attr_requests.iter() {
|
for attr_path in attr_requests.iter() {
|
||||||
attr_encoder.set_path(attr_path.to_gp());
|
attr_encoder.set_path(attr_path.to_gp());
|
||||||
// Extract the attr_path fields into various structures
|
// Extract the attr_path fields into various structures
|
||||||
attr_details.list_index = attr_path.list_index;
|
attr_details.list_index = attr_path.list_index;
|
||||||
attr_details.fab_idx = accessor.fab_idx;
|
|
||||||
result = DataModel::handle_read_attr_path(
|
result = DataModel::handle_read_attr_path(
|
||||||
&node,
|
&node,
|
||||||
&accessor,
|
&accessor,
|
||||||
&mut attr_encoder,
|
&mut attr_encoder,
|
||||||
&mut attr_details,
|
&mut attr_details,
|
||||||
&mut resume_from,
|
&mut resume_read_req.resume_encode,
|
||||||
);
|
);
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
|
resume_read_req.pending_path = attr_path.to_gp();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tw.end_container()?;
|
tw.end_container()?;
|
||||||
if let Err(path) = result {
|
if result.is_err() {
|
||||||
// If there was an error, make a note of this 'path' to resume for the next chunk
|
// If there was an error, indicate chunking. The resume_read_req would have been
|
||||||
|
// already populated from in the loop above.
|
||||||
tw.bool(
|
tw.bool(
|
||||||
TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
|
TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
|
||||||
true,
|
true,
|
||||||
|
@ -371,6 +387,7 @@ impl InteractionConsumer for DataModel {
|
||||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||||
false,
|
false,
|
||||||
)?;
|
)?;
|
||||||
|
// Don't complete the transaction
|
||||||
} else {
|
} else {
|
||||||
tw.bool(
|
tw.bool(
|
||||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||||
|
|
Loading…
Add table
Reference in a new issue