Bugfix: subscription_id was not sent

This commit is contained in:
ivmarkov 2023-04-28 10:42:55 +00:00
parent 076ba06e07
commit e8e847cea6
5 changed files with 95 additions and 73 deletions

View file

@ -15,26 +15,17 @@
* limitations under the License. * limitations under the License.
*/ */
use core::{ use core::cell::RefCell;
cell::RefCell,
sync::atomic::{AtomicU32, Ordering},
};
use super::objects::*; use super::objects::*;
use crate::{ use crate::{
acl::{Accessor, AclMgr}, acl::{Accessor, AclMgr},
error::*, error::*,
interaction_model::{ interaction_model::core::{Interaction, Transaction},
core::{Interaction, Transaction}, tlv::TLVWriter,
messages::msg::SubscribeResp,
},
tlv::{TLVWriter, TagType, ToTLV},
transport::packet::Packet, transport::packet::Packet,
}; };
// TODO: For now...
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
pub struct DataModel<'a, T> { pub struct DataModel<'a, T> {
pub acl_mgr: &'a RefCell<AclMgr>, pub acl_mgr: &'a RefCell<AclMgr>,
pub node: &'a Node<'a>, pub node: &'a Node<'a>,
@ -120,19 +111,12 @@ impl<'a, T> DataModel<'a, T> {
Interaction::ResumeSubscribe(req) => { Interaction::ResumeSubscribe(req) => {
let mut resume_path = None; let mut resume_path = None;
if req.resume_path.is_some() { for item in self.node.resume_subscribing_read(&req, &accessor) {
for item in self.node.resume_subscribing_read(&req, &accessor) { if let Some(path) = AttrDataEncoder::handle_read(item, &self.handler, &mut tw)?
if let Some(path) = {
AttrDataEncoder::handle_read(item, &self.handler, &mut tw)? resume_path = Some(path);
{ break;
resume_path = Some(path);
break;
}
} }
} else {
// TODO
let resp = SubscribeResp::new(SUBS_ID.fetch_add(1, Ordering::SeqCst), 40);
resp.to_tlv(&mut tw, TagType::Anonymous)?;
} }
req.complete(tx, transaction, resume_path) req.complete(tx, transaction, resume_path)
@ -215,19 +199,13 @@ impl<'a, T> DataModel<'a, T> {
Interaction::ResumeSubscribe(req) => { Interaction::ResumeSubscribe(req) => {
let mut resume_path = None; let mut resume_path = None;
if req.resume_path.is_some() { for item in self.node.resume_subscribing_read(&req, &accessor) {
for item in self.node.resume_subscribing_read(&req, &accessor) { if let Some(path) =
if let Some(path) = AttrDataEncoder::handle_read_async(item, &self.handler, &mut tw).await?
AttrDataEncoder::handle_read_async(item, &self.handler, &mut tw).await? {
{ resume_path = Some(path);
resume_path = Some(path); break;
break;
}
} }
} else {
// TODO
let resp = SubscribeResp::new(SUBS_ID.fetch_add(1, Ordering::SeqCst), 40);
resp.to_tlv(&mut tw, TagType::Anonymous)?;
} }
req.complete(tx, transaction, resume_path) req.complete(tx, transaction, resume_path)

View file

@ -388,7 +388,6 @@ impl<'a> NocCluster<'a> {
self.failsafe.borrow_mut().record_add_noc(fab_idx)?; self.failsafe.borrow_mut().record_add_noc(fab_idx)?;
transaction.complete();
Ok(fab_idx) Ok(fab_idx)
} }
@ -479,7 +478,10 @@ impl<'a> NocCluster<'a> {
Err(NocError::Error(error)) => Err(error)?, Err(NocError::Error(error)) => Err(error)?,
}; };
Self::create_nocresponse(encoder, status, fab_idx, "") Self::create_nocresponse(encoder, status, fab_idx, "")?;
transaction.complete();
Ok(())
} }
fn handle_command_attrequest( fn handle_command_attrequest(

View file

@ -15,6 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
use core::sync::atomic::{AtomicU32, Ordering};
use core::time::Duration; use core::time::Duration;
use crate::{ use crate::{
@ -35,7 +36,7 @@ use num_derive::FromPrimitive;
use super::messages::{ use super::messages::{
ib::{AttrPath, DataVersionFilter}, ib::{AttrPath, DataVersionFilter},
msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, TimedReq, WriteReq}, msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, SubscribeResp, TimedReq, WriteReq},
GenericPath, GenericPath,
}; };
@ -206,6 +207,9 @@ const MAX_RESUME_DATAVER_FILTERS: usize = 128;
// the end of long reads. // the end of long reads.
const LONG_READS_TLV_RESERVE_SIZE: usize = 24; const LONG_READS_TLV_RESERVE_SIZE: usize = 24;
// TODO: For now...
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
pub enum Interaction<'a> { pub enum Interaction<'a> {
Read(ReadReq<'a>), Read(ReadReq<'a>),
Write(WriteReq<'a>), Write(WriteReq<'a>),
@ -511,8 +515,13 @@ impl TimedReq {
} }
impl<'a> SubscribeReq<'a> { impl<'a> SubscribeReq<'a> {
fn suspend(&self, resume_path: Option<GenericPath>) -> ResumeSubscribeReq { fn suspend(
&self,
resume_path: Option<GenericPath>,
subscription_id: u32,
) -> ResumeSubscribeReq {
ResumeSubscribeReq { ResumeSubscribeReq {
subscription_id,
paths: self paths: self
.attr_requests .attr_requests
.iter() .iter()
@ -531,7 +540,7 @@ impl<'a> SubscribeReq<'a> {
} }
} }
fn initiate(&self, tx: &mut Packet, _transaction: &mut Transaction) -> Result<bool, Error> { fn initiate(&self, tx: &mut Packet, transaction: &mut Transaction) -> Result<bool, Error> {
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16); tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
tx.set_proto_opcode(OpCode::ReportData as u8); tx.set_proto_opcode(OpCode::ReportData as u8);
@ -539,6 +548,14 @@ impl<'a> SubscribeReq<'a> {
tw.start_struct(TagType::Anonymous)?; tw.start_struct(TagType::Anonymous)?;
let subscription_id = SUBS_ID.fetch_add(1, Ordering::SeqCst);
transaction.exch_mut().set_subscription_id(subscription_id);
tw.u32(
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
subscription_id,
)?;
if self.attr_requests.is_some() { if self.attr_requests.is_some() {
tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
} }
@ -565,9 +582,11 @@ impl<'a> SubscribeReq<'a> {
)?; )?;
} }
let subscription_id = transaction.exch_mut().take_subscription_id().unwrap();
transaction transaction
.exch_mut() .exch_mut()
.set_suspended_subscribe_req(self.suspend(resume_path)); .set_suspended_subscribe_req(self.suspend(resume_path, subscription_id));
tw.bool( tw.bool(
TagType::Context(msg::ReportDataTag::SupressResponse as u8), TagType::Context(msg::ReportDataTag::SupressResponse as u8),
@ -640,6 +659,7 @@ impl ResumeReadReq {
} }
pub struct ResumeSubscribeReq { pub struct ResumeSubscribeReq {
pub subscription_id: u32,
pub paths: heapless::Vec<AttrPath, MAX_RESUME_PATHS>, pub paths: heapless::Vec<AttrPath, MAX_RESUME_PATHS>,
pub filters: heapless::Vec<DataVersionFilter, MAX_RESUME_DATAVER_FILTERS>, pub filters: heapless::Vec<DataVersionFilter, MAX_RESUME_DATAVER_FILTERS>,
pub fabric_filtered: bool, pub fabric_filtered: bool,
@ -660,15 +680,28 @@ impl ResumeSubscribeReq {
tw.start_struct(TagType::Anonymous)?; tw.start_struct(TagType::Anonymous)?;
tw.u32(
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
self.subscription_id,
)?;
tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?; tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
Ok(true)
} else { } else {
tx.set_proto_opcode(OpCode::SubscribeResponse as u8); tx.set_proto_opcode(OpCode::SubscribeResponse as u8);
// let mut tw = TLVWriter::new(tx.get_writebuf()?); let mut tw = TLVWriter::new(tx.get_writebuf()?);
// tw.start_struct(TagType::Anonymous)?;
}
Ok(true) tw.start_struct(TagType::Anonymous)?;
let resp = SubscribeResp::new(self.subscription_id, 40);
resp.to_tlv(&mut tw, TagType::Anonymous)?;
tw.end_container()?;
Ok(false)
}
} }
pub fn complete( pub fn complete(
@ -677,41 +710,34 @@ impl ResumeSubscribeReq {
transaction: &mut Transaction, transaction: &mut Transaction,
resume_path: Option<GenericPath>, resume_path: Option<GenericPath>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
if self.resume_path.is_none() && resume_path.is_some() { if self.resume_path.is_none() {
panic!("Cannot resume subscribe"); // Should not get here as initiate() should've sent the subscribe response already
panic!("Subscription was already processed");
} }
if self.resume_path.is_some() { // Completing a ReportData message
// Completing a ReportData message
let mut tw = ReadReq::restore_long_read_space(tx)?;
tw.end_container()?; let mut tw = ReadReq::restore_long_read_space(tx)?;
if resume_path.is_some() { tw.end_container()?;
tw.bool(
TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
true,
)?;
}
if resume_path.is_some() {
tw.bool( tw.bool(
TagType::Context(msg::ReportDataTag::SupressResponse as u8), TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
false, true,
)?; )?;
tw.end_container()?;
self.resume_path = resume_path;
transaction.exch_mut().set_suspended_subscribe_req(self);
} else {
// Completing a SubscribeResponse message
// let mut tw = TLVWriter::new(tx.get_writebuf()?);
// tw.end_container()?;
transaction.complete();
} }
tw.bool(
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
false,
)?;
tw.end_container()?;
self.resume_path = resume_path;
transaction.exch_mut().set_suspended_subscribe_req(self);
Ok(true) Ok(true)
} }
} }

View file

@ -71,6 +71,7 @@ pub enum DataOption {
CaseSession(CaseSession), CaseSession(CaseSession),
Time(Duration), Time(Duration),
SuspendedReadReq(ResumeReadReq), SuspendedReadReq(ResumeReadReq),
SubscriptionId(u32),
SuspendedSubscibeReq(ResumeSubscribeReq), SuspendedSubscibeReq(ResumeSubscribeReq),
#[default] #[default]
None, None,
@ -168,6 +169,20 @@ impl Exchange {
} }
} }
pub fn set_subscription_id(&mut self, id: u32) {
self.data = DataOption::SubscriptionId(id);
}
pub fn take_subscription_id(&mut self) -> Option<u32> {
let old = core::mem::replace(&mut self.data, DataOption::None);
if let DataOption::SubscriptionId(id) = old {
Some(id)
} else {
self.data = old;
None
}
}
pub fn set_suspended_subscribe_req(&mut self, req: ResumeSubscribeReq) { pub fn set_suspended_subscribe_req(&mut self, req: ResumeSubscribeReq) {
self.data = DataOption::SuspendedSubscibeReq(req); self.data = DataOption::SuspendedSubscibeReq(req);
} }

View file

@ -70,6 +70,7 @@ impl<'r, 'a, 'p> RecvCompletion<'r, 'a, 'p> {
fn maybe_next_action(&mut self) -> Result<Option<Option<RecvAction<'_, 'p>>>, Error> { fn maybe_next_action(&mut self) -> Result<Option<Option<RecvAction<'_, 'p>>>, Error> {
self.mgr.exch_mgr.purge(); self.mgr.exch_mgr.purge();
self.tx.reset();
let (state, next) = match core::mem::replace(&mut self.state, RecvState::New) { let (state, next) = match core::mem::replace(&mut self.state, RecvState::New) {
RecvState::New => { RecvState::New => {
@ -108,7 +109,7 @@ impl<'r, 'a, 'p> RecvCompletion<'r, 'a, 'p> {
} }
} }
Ok(None) => (RecvState::Ack, None), Ok(None) => (RecvState::Ack, None),
Err(Error::Duplicate) => (RecvState::Ack, Some(None)), Err(Error::Duplicate) => (RecvState::Ack, None),
Err(Error::NoSpace) => (RecvState::EvictSession, None), Err(Error::NoSpace) => (RecvState::EvictSession, None),
Err(err) => Err(err)?, Err(err) => Err(err)?,
}, },