Start reintroducing long reads and subscriptions from mainline

This commit is contained in:
ivmarkov 2023-04-21 12:00:17 +00:00
parent d446007f6b
commit c11a1a1372
4 changed files with 107 additions and 16 deletions

View file

@ -69,6 +69,17 @@ impl<'a, T> DataModel<'a, T> {
CmdDataEncoder::handle(item, &mut self.handler, transaction, &mut tw)?; CmdDataEncoder::handle(item, &mut self.handler, transaction, &mut tw)?;
} }
} }
Interaction::Subscribe(req) => {
for item in self.node.subscribing_read(req, &accessor) {
AttrDataEncoder::handle_read(item, &self.handler, &mut tw)?;
}
}
Interaction::Status(_resp) => {
todo!()
// for item in self.node.subscribing_read(req, &accessor) {
// AttrDataEncoder::handle_read(item, &self.handler, &mut tw)?;
// }
}
Interaction::Timed(_) => (), Interaction::Timed(_) => (),
} }

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::utils::rand::Rand; use crate::utils::rand::Rand;
pub struct Dataver { pub struct Dataver {
ver: u32, ver: u32,

View file

@ -21,13 +21,13 @@ use crate::{
interaction_model::{ interaction_model::{
core::IMStatusCode, core::IMStatusCode,
messages::{ messages::{
ib::{AttrStatus, CmdStatus}, ib::{AttrPath, AttrStatus, CmdStatus, DataVersionFilter},
msg::{InvReq, ReadReq, WriteReq}, msg::{InvReq, ReadReq, SubscribeReq, WriteReq},
GenericPath, GenericPath,
}, },
}, },
// TODO: This layer shouldn't really depend on the TLV layer, should create an abstraction layer // TODO: This layer shouldn't really depend on the TLV layer, should create an abstraction layer
tlv::TLVElement, tlv::{TLVArray, TLVElement},
}; };
use core::{ use core::{
fmt, fmt,
@ -72,15 +72,48 @@ impl<'a> Node<'a> {
where where
's: 'm, 's: 'm,
{ {
if let Some(attr_requests) = req.attr_requests.as_ref() { self.read_attr_requests(
req.attr_requests.as_ref(),
req.dataver_filters.as_ref(),
req.fabric_filtered,
accessor,
)
}
pub fn subscribing_read<'s, 'm>(
&'s self,
req: &'m SubscribeReq,
accessor: &'m Accessor<'m>,
) -> impl Iterator<Item = Result<AttrDetails, AttrStatus>> + 'm
where
's: 'm,
{
self.read_attr_requests(
req.attr_requests.as_ref(),
req.dataver_filters.as_ref(),
req.fabric_filtered,
accessor,
)
}
fn read_attr_requests<'s, 'm>(
&'s self,
attr_requests: Option<&'m TLVArray<AttrPath>>,
dataver_filters: Option<&'m TLVArray<DataVersionFilter>>,
fabric_filtered: bool,
accessor: &'m Accessor<'m>,
) -> impl Iterator<Item = Result<AttrDetails, AttrStatus>> + 'm
where
's: 'm,
{
if let Some(attr_requests) = attr_requests.as_ref() {
WildcardIter::Wildcard(attr_requests.iter().flat_map( WildcardIter::Wildcard(attr_requests.iter().flat_map(
move |path| match self.expand_attr(accessor, path.to_gp(), false) { move |path| match self.expand_attr(accessor, path.to_gp(), false) {
Ok(iter) => { Ok(iter) => {
let wildcard = matches!(iter, WildcardIter::Wildcard(_)); let wildcard = matches!(iter, WildcardIter::Wildcard(_));
WildcardIter::Wildcard(iter.map(move |(ep, cl, attr)| { WildcardIter::Wildcard(iter.map(move |(ep, cl, attr)| {
let dataver_filter = req let dataver_filter = dataver_filters
.dataver_filters
.as_ref() .as_ref()
.iter() .iter()
.flat_map(|array| array.iter()) .flat_map(|array| array.iter())
@ -96,7 +129,7 @@ impl<'a> Node<'a> {
attr_id: attr, attr_id: attr,
list_index: path.list_index, list_index: path.list_index,
fab_idx: accessor.fab_idx, fab_idx: accessor.fab_idx,
fab_filter: req.fabric_filtered, fab_filter: fabric_filtered,
dataver: dataver_filter, dataver: dataver_filter,
wildcard, wildcard,
}) })

View file

@ -28,7 +28,7 @@ use log::{error, info};
use num; use num;
use num_derive::FromPrimitive; use num_derive::FromPrimitive;
use super::messages::msg::{self, InvReq, ReadReq, StatusResp, TimedReq, WriteReq}; use super::messages::msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, TimedReq, WriteReq};
#[macro_export] #[macro_export]
macro_rules! cmd_enter { macro_rules! cmd_enter {
@ -104,7 +104,7 @@ pub enum OpCode {
StatusResponse = 1, StatusResponse = 1,
ReadRequest = 2, ReadRequest = 2,
SubscribeRequest = 3, SubscribeRequest = 3,
SubscriptResponse = 4, SubscribeResponse = 4,
ReportData = 5, ReportData = 5,
WriteRequest = 6, WriteRequest = 6,
WriteResponse = 7, WriteResponse = 7,
@ -186,6 +186,8 @@ pub enum Interaction<'a> {
Read(ReadReq<'a>), Read(ReadReq<'a>),
Write(WriteReq<'a>), Write(WriteReq<'a>),
Invoke(InvReq<'a>), Invoke(InvReq<'a>),
Subscribe(SubscribeReq<'a>),
Status(StatusResp),
Timed(TimedReq), Timed(TimedReq),
} }
@ -209,12 +211,15 @@ impl<'a> Interaction<'a> {
OpCode::InvokeRequest => Ok(Self::Invoke(InvReq::from_tlv(&get_root_node_struct( OpCode::InvokeRequest => Ok(Self::Invoke(InvReq::from_tlv(&get_root_node_struct(
rx_data, rx_data,
)?)?)), )?)?)),
OpCode::SubscribeRequest => Ok(Self::Subscribe(SubscribeReq::from_tlv(
&get_root_node_struct(rx_data)?,
)?)),
OpCode::StatusResponse => Ok(Self::Status(StatusResp::from_tlv(
&get_root_node_struct(rx_data)?,
)?)),
OpCode::TimedRequest => Ok(Self::Timed(TimedReq::from_tlv(&get_root_node_struct( OpCode::TimedRequest => Ok(Self::Timed(TimedReq::from_tlv(&get_root_node_struct(
rx_data, rx_data,
)?)?)), )?)?)),
// TODO
// OpCode::SubscribeRequest => self.handle_subscribe_req(&mut trans, buf, &mut ctx.tx)?,
// OpCode::StatusResponse => self.handle_status_resp(&mut trans, buf, &mut ctx.tx)?,
_ => { _ => {
error!("Opcode Not Handled: {:?}", opcode); error!("Opcode Not Handled: {:?}", opcode);
Err(Error::InvalidOpcode) Err(Error::InvalidOpcode)
@ -242,7 +247,7 @@ impl<'a> Interaction<'a> {
false false
} }
Interaction::Write(_) => { Self::Write(_) => {
if transaction.has_timed_out() { if transaction.has_timed_out() {
Self::create_status_response(tx, IMStatusCode::Timeout)?; Self::create_status_response(tx, IMStatusCode::Timeout)?;
@ -262,7 +267,7 @@ impl<'a> Interaction<'a> {
false false
} }
} }
Interaction::Invoke(request) => { Self::Invoke(request) => {
if transaction.has_timed_out() { if transaction.has_timed_out() {
Self::create_status_response(tx, IMStatusCode::Timeout)?; Self::create_status_response(tx, IMStatusCode::Timeout)?;
@ -303,7 +308,31 @@ impl<'a> Interaction<'a> {
} }
} }
} }
Interaction::Timed(request) => { Self::Subscribe(request) => {
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
tx.set_proto_opcode(OpCode::ReportData as u8);
let mut tw = TLVWriter::new(tx.get_writebuf()?);
tw.start_struct(TagType::Anonymous)?;
if request.attr_requests.is_some() {
tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
}
true
}
Self::Status(_) => {
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
tx.set_proto_opcode(OpCode::SubscribeResponse as u8);
let mut tw = TLVWriter::new(tx.get_writebuf()?);
tw.start_struct(TagType::Anonymous)?;
true
}
Self::Timed(request) => {
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16); tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
tx.set_proto_opcode(OpCode::StatusResponse as u8); tx.set_proto_opcode(OpCode::StatusResponse as u8);
@ -377,6 +406,24 @@ impl<'a> Interaction<'a> {
true true
} }
Self::Subscribe(request) => {
let mut tw = TLVWriter::new(tx.get_writebuf()?);
if request.attr_requests.is_some() {
tw.end_container()?;
}
tw.end_container()?;
true
}
Self::Status(_) => {
let mut tw = TLVWriter::new(tx.get_writebuf()?);
tw.end_container()?;
true
}
Self::Timed(_) => false, Self::Timed(_) => false,
}; };