Start reintroducing long reads and subscriptions from mainline

This commit is contained in:
ivmarkov 2023-04-21 12:00:17 +00:00
parent 40f353c92e
commit 817d55aecc
4 changed files with 107 additions and 16 deletions

View file

@ -69,6 +69,17 @@ impl<'a, T> DataModel<'a, T> {
CmdDataEncoder::handle(item, &mut self.handler, transaction, &mut tw)?;
}
}
Interaction::Subscribe(req) => {
for item in self.node.subscribing_read(req, &accessor) {
AttrDataEncoder::handle_read(item, &self.handler, &mut tw)?;
}
}
Interaction::Status(_resp) => {
todo!()
// for item in self.node.subscribing_read(req, &accessor) {
// AttrDataEncoder::handle_read(item, &self.handler, &mut tw)?;
// }
}
Interaction::Timed(_) => (),
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
use crate::utils::rand::Rand;
use crate::utils::rand::Rand;
pub struct Dataver {
ver: u32,

View file

@ -21,13 +21,13 @@ use crate::{
interaction_model::{
core::IMStatusCode,
messages::{
ib::{AttrStatus, CmdStatus},
msg::{InvReq, ReadReq, WriteReq},
ib::{AttrPath, AttrStatus, CmdStatus, DataVersionFilter},
msg::{InvReq, ReadReq, SubscribeReq, WriteReq},
GenericPath,
},
},
// TODO: This layer shouldn't really depend on the TLV layer, should create an abstraction layer
tlv::TLVElement,
tlv::{TLVArray, TLVElement},
};
use core::{
fmt,
@ -72,15 +72,48 @@ impl<'a> Node<'a> {
where
's: 'm,
{
if let Some(attr_requests) = req.attr_requests.as_ref() {
self.read_attr_requests(
req.attr_requests.as_ref(),
req.dataver_filters.as_ref(),
req.fabric_filtered,
accessor,
)
}
pub fn subscribing_read<'s, 'm>(
&'s self,
req: &'m SubscribeReq,
accessor: &'m Accessor<'m>,
) -> impl Iterator<Item = Result<AttrDetails, AttrStatus>> + 'm
where
's: 'm,
{
self.read_attr_requests(
req.attr_requests.as_ref(),
req.dataver_filters.as_ref(),
req.fabric_filtered,
accessor,
)
}
fn read_attr_requests<'s, 'm>(
&'s self,
attr_requests: Option<&'m TLVArray<AttrPath>>,
dataver_filters: Option<&'m TLVArray<DataVersionFilter>>,
fabric_filtered: bool,
accessor: &'m Accessor<'m>,
) -> impl Iterator<Item = Result<AttrDetails, AttrStatus>> + 'm
where
's: 'm,
{
if let Some(attr_requests) = attr_requests.as_ref() {
WildcardIter::Wildcard(attr_requests.iter().flat_map(
move |path| match self.expand_attr(accessor, path.to_gp(), false) {
Ok(iter) => {
let wildcard = matches!(iter, WildcardIter::Wildcard(_));
WildcardIter::Wildcard(iter.map(move |(ep, cl, attr)| {
let dataver_filter = req
.dataver_filters
let dataver_filter = dataver_filters
.as_ref()
.iter()
.flat_map(|array| array.iter())
@ -96,7 +129,7 @@ impl<'a> Node<'a> {
attr_id: attr,
list_index: path.list_index,
fab_idx: accessor.fab_idx,
fab_filter: req.fabric_filtered,
fab_filter: fabric_filtered,
dataver: dataver_filter,
wildcard,
})

View file

@ -28,7 +28,7 @@ use log::{error, info};
use num;
use num_derive::FromPrimitive;
use super::messages::msg::{self, InvReq, ReadReq, StatusResp, TimedReq, WriteReq};
use super::messages::msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, TimedReq, WriteReq};
#[macro_export]
macro_rules! cmd_enter {
@ -104,7 +104,7 @@ pub enum OpCode {
StatusResponse = 1,
ReadRequest = 2,
SubscribeRequest = 3,
SubscriptResponse = 4,
SubscribeResponse = 4,
ReportData = 5,
WriteRequest = 6,
WriteResponse = 7,
@ -186,6 +186,8 @@ pub enum Interaction<'a> {
Read(ReadReq<'a>),
Write(WriteReq<'a>),
Invoke(InvReq<'a>),
Subscribe(SubscribeReq<'a>),
Status(StatusResp),
Timed(TimedReq),
}
@ -209,12 +211,15 @@ impl<'a> Interaction<'a> {
OpCode::InvokeRequest => Ok(Self::Invoke(InvReq::from_tlv(&get_root_node_struct(
rx_data,
)?)?)),
OpCode::SubscribeRequest => Ok(Self::Subscribe(SubscribeReq::from_tlv(
&get_root_node_struct(rx_data)?,
)?)),
OpCode::StatusResponse => Ok(Self::Status(StatusResp::from_tlv(
&get_root_node_struct(rx_data)?,
)?)),
OpCode::TimedRequest => Ok(Self::Timed(TimedReq::from_tlv(&get_root_node_struct(
rx_data,
)?)?)),
// TODO
// OpCode::SubscribeRequest => self.handle_subscribe_req(&mut trans, buf, &mut ctx.tx)?,
// OpCode::StatusResponse => self.handle_status_resp(&mut trans, buf, &mut ctx.tx)?,
_ => {
error!("Opcode Not Handled: {:?}", opcode);
Err(Error::InvalidOpcode)
@ -242,7 +247,7 @@ impl<'a> Interaction<'a> {
false
}
Interaction::Write(_) => {
Self::Write(_) => {
if transaction.has_timed_out() {
Self::create_status_response(tx, IMStatusCode::Timeout)?;
@ -262,7 +267,7 @@ impl<'a> Interaction<'a> {
false
}
}
Interaction::Invoke(request) => {
Self::Invoke(request) => {
if transaction.has_timed_out() {
Self::create_status_response(tx, IMStatusCode::Timeout)?;
@ -303,7 +308,31 @@ impl<'a> Interaction<'a> {
}
}
}
Interaction::Timed(request) => {
Self::Subscribe(request) => {
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
tx.set_proto_opcode(OpCode::ReportData as u8);
let mut tw = TLVWriter::new(tx.get_writebuf()?);
tw.start_struct(TagType::Anonymous)?;
if request.attr_requests.is_some() {
tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
}
true
}
Self::Status(_) => {
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
tx.set_proto_opcode(OpCode::SubscribeResponse as u8);
let mut tw = TLVWriter::new(tx.get_writebuf()?);
tw.start_struct(TagType::Anonymous)?;
true
}
Self::Timed(request) => {
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
tx.set_proto_opcode(OpCode::StatusResponse as u8);
@ -377,6 +406,24 @@ impl<'a> Interaction<'a> {
true
}
Self::Subscribe(request) => {
let mut tw = TLVWriter::new(tx.get_writebuf()?);
if request.attr_requests.is_some() {
tw.end_container()?;
}
tw.end_container()?;
true
}
Self::Status(_) => {
let mut tw = TLVWriter::new(tx.get_writebuf()?);
tw.end_container()?;
true
}
Self::Timed(_) => false,
};