Bugfix: subscription_id was not sent
This commit is contained in:
parent
b21f257c47
commit
bb275cd50a
5 changed files with 95 additions and 73 deletions
|
@ -15,26 +15,17 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
sync::atomic::{AtomicU32, Ordering},
|
||||
};
|
||||
use core::cell::RefCell;
|
||||
|
||||
use super::objects::*;
|
||||
use crate::{
|
||||
acl::{Accessor, AclMgr},
|
||||
error::*,
|
||||
interaction_model::{
|
||||
core::{Interaction, Transaction},
|
||||
messages::msg::SubscribeResp,
|
||||
},
|
||||
tlv::{TLVWriter, TagType, ToTLV},
|
||||
interaction_model::core::{Interaction, Transaction},
|
||||
tlv::TLVWriter,
|
||||
transport::packet::Packet,
|
||||
};
|
||||
|
||||
// TODO: For now...
|
||||
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
|
||||
|
||||
pub struct DataModel<'a, T> {
|
||||
pub acl_mgr: &'a RefCell<AclMgr>,
|
||||
pub node: &'a Node<'a>,
|
||||
|
@ -120,19 +111,12 @@ impl<'a, T> DataModel<'a, T> {
|
|||
Interaction::ResumeSubscribe(req) => {
|
||||
let mut resume_path = None;
|
||||
|
||||
if req.resume_path.is_some() {
|
||||
for item in self.node.resume_subscribing_read(&req, &accessor) {
|
||||
if let Some(path) =
|
||||
AttrDataEncoder::handle_read(item, &self.handler, &mut tw)?
|
||||
{
|
||||
resume_path = Some(path);
|
||||
break;
|
||||
}
|
||||
for item in self.node.resume_subscribing_read(&req, &accessor) {
|
||||
if let Some(path) = AttrDataEncoder::handle_read(item, &self.handler, &mut tw)?
|
||||
{
|
||||
resume_path = Some(path);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// TODO
|
||||
let resp = SubscribeResp::new(SUBS_ID.fetch_add(1, Ordering::SeqCst), 40);
|
||||
resp.to_tlv(&mut tw, TagType::Anonymous)?;
|
||||
}
|
||||
|
||||
req.complete(tx, transaction, resume_path)
|
||||
|
@ -215,19 +199,13 @@ impl<'a, T> DataModel<'a, T> {
|
|||
Interaction::ResumeSubscribe(req) => {
|
||||
let mut resume_path = None;
|
||||
|
||||
if req.resume_path.is_some() {
|
||||
for item in self.node.resume_subscribing_read(&req, &accessor) {
|
||||
if let Some(path) =
|
||||
AttrDataEncoder::handle_read_async(item, &self.handler, &mut tw).await?
|
||||
{
|
||||
resume_path = Some(path);
|
||||
break;
|
||||
}
|
||||
for item in self.node.resume_subscribing_read(&req, &accessor) {
|
||||
if let Some(path) =
|
||||
AttrDataEncoder::handle_read_async(item, &self.handler, &mut tw).await?
|
||||
{
|
||||
resume_path = Some(path);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// TODO
|
||||
let resp = SubscribeResp::new(SUBS_ID.fetch_add(1, Ordering::SeqCst), 40);
|
||||
resp.to_tlv(&mut tw, TagType::Anonymous)?;
|
||||
}
|
||||
|
||||
req.complete(tx, transaction, resume_path)
|
||||
|
|
|
@ -388,7 +388,6 @@ impl<'a> NocCluster<'a> {
|
|||
|
||||
self.failsafe.borrow_mut().record_add_noc(fab_idx)?;
|
||||
|
||||
transaction.complete();
|
||||
Ok(fab_idx)
|
||||
}
|
||||
|
||||
|
@ -479,7 +478,10 @@ impl<'a> NocCluster<'a> {
|
|||
Err(NocError::Error(error)) => Err(error)?,
|
||||
};
|
||||
|
||||
Self::create_nocresponse(encoder, status, fab_idx, "")
|
||||
Self::create_nocresponse(encoder, status, fab_idx, "")?;
|
||||
transaction.complete();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_command_attrequest(
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use core::sync::atomic::{AtomicU32, Ordering};
|
||||
use core::time::Duration;
|
||||
|
||||
use crate::{
|
||||
|
@ -35,7 +36,7 @@ use num_derive::FromPrimitive;
|
|||
|
||||
use super::messages::{
|
||||
ib::{AttrPath, DataVersionFilter},
|
||||
msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, TimedReq, WriteReq},
|
||||
msg::{self, InvReq, ReadReq, StatusResp, SubscribeReq, SubscribeResp, TimedReq, WriteReq},
|
||||
GenericPath,
|
||||
};
|
||||
|
||||
|
@ -206,6 +207,9 @@ const MAX_RESUME_DATAVER_FILTERS: usize = 128;
|
|||
// the end of long reads.
|
||||
const LONG_READS_TLV_RESERVE_SIZE: usize = 24;
|
||||
|
||||
// TODO: For now...
|
||||
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
|
||||
|
||||
pub enum Interaction<'a> {
|
||||
Read(ReadReq<'a>),
|
||||
Write(WriteReq<'a>),
|
||||
|
@ -511,8 +515,13 @@ impl TimedReq {
|
|||
}
|
||||
|
||||
impl<'a> SubscribeReq<'a> {
|
||||
fn suspend(&self, resume_path: Option<GenericPath>) -> ResumeSubscribeReq {
|
||||
fn suspend(
|
||||
&self,
|
||||
resume_path: Option<GenericPath>,
|
||||
subscription_id: u32,
|
||||
) -> ResumeSubscribeReq {
|
||||
ResumeSubscribeReq {
|
||||
subscription_id,
|
||||
paths: self
|
||||
.attr_requests
|
||||
.iter()
|
||||
|
@ -531,7 +540,7 @@ impl<'a> SubscribeReq<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
fn initiate(&self, tx: &mut Packet, _transaction: &mut Transaction) -> Result<bool, Error> {
|
||||
fn initiate(&self, tx: &mut Packet, transaction: &mut Transaction) -> Result<bool, Error> {
|
||||
tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
|
||||
tx.set_proto_opcode(OpCode::ReportData as u8);
|
||||
|
||||
|
@ -539,6 +548,14 @@ impl<'a> SubscribeReq<'a> {
|
|||
|
||||
tw.start_struct(TagType::Anonymous)?;
|
||||
|
||||
let subscription_id = SUBS_ID.fetch_add(1, Ordering::SeqCst);
|
||||
transaction.exch_mut().set_subscription_id(subscription_id);
|
||||
|
||||
tw.u32(
|
||||
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
|
||||
subscription_id,
|
||||
)?;
|
||||
|
||||
if self.attr_requests.is_some() {
|
||||
tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
|
||||
}
|
||||
|
@ -565,9 +582,11 @@ impl<'a> SubscribeReq<'a> {
|
|||
)?;
|
||||
}
|
||||
|
||||
let subscription_id = transaction.exch_mut().take_subscription_id().unwrap();
|
||||
|
||||
transaction
|
||||
.exch_mut()
|
||||
.set_suspended_subscribe_req(self.suspend(resume_path));
|
||||
.set_suspended_subscribe_req(self.suspend(resume_path, subscription_id));
|
||||
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||
|
@ -640,6 +659,7 @@ impl ResumeReadReq {
|
|||
}
|
||||
|
||||
pub struct ResumeSubscribeReq {
|
||||
pub subscription_id: u32,
|
||||
pub paths: heapless::Vec<AttrPath, MAX_RESUME_PATHS>,
|
||||
pub filters: heapless::Vec<DataVersionFilter, MAX_RESUME_DATAVER_FILTERS>,
|
||||
pub fabric_filtered: bool,
|
||||
|
@ -660,15 +680,28 @@ impl ResumeSubscribeReq {
|
|||
|
||||
tw.start_struct(TagType::Anonymous)?;
|
||||
|
||||
tw.u32(
|
||||
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
|
||||
self.subscription_id,
|
||||
)?;
|
||||
|
||||
tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
tx.set_proto_opcode(OpCode::SubscribeResponse as u8);
|
||||
|
||||
// let mut tw = TLVWriter::new(tx.get_writebuf()?);
|
||||
// tw.start_struct(TagType::Anonymous)?;
|
||||
}
|
||||
let mut tw = TLVWriter::new(tx.get_writebuf()?);
|
||||
|
||||
Ok(true)
|
||||
tw.start_struct(TagType::Anonymous)?;
|
||||
|
||||
let resp = SubscribeResp::new(self.subscription_id, 40);
|
||||
resp.to_tlv(&mut tw, TagType::Anonymous)?;
|
||||
|
||||
tw.end_container()?;
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn complete(
|
||||
|
@ -677,41 +710,34 @@ impl ResumeSubscribeReq {
|
|||
transaction: &mut Transaction,
|
||||
resume_path: Option<GenericPath>,
|
||||
) -> Result<bool, Error> {
|
||||
if self.resume_path.is_none() && resume_path.is_some() {
|
||||
panic!("Cannot resume subscribe");
|
||||
if self.resume_path.is_none() {
|
||||
// Should not get here as initiate() should've sent the subscribe response already
|
||||
panic!("Subscription was already processed");
|
||||
}
|
||||
|
||||
if self.resume_path.is_some() {
|
||||
// Completing a ReportData message
|
||||
let mut tw = ReadReq::restore_long_read_space(tx)?;
|
||||
// Completing a ReportData message
|
||||
|
||||
tw.end_container()?;
|
||||
let mut tw = ReadReq::restore_long_read_space(tx)?;
|
||||
|
||||
if resume_path.is_some() {
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
tw.end_container()?;
|
||||
|
||||
if resume_path.is_some() {
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||
false,
|
||||
TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
|
||||
true,
|
||||
)?;
|
||||
|
||||
tw.end_container()?;
|
||||
|
||||
self.resume_path = resume_path;
|
||||
transaction.exch_mut().set_suspended_subscribe_req(self);
|
||||
} else {
|
||||
// Completing a SubscribeResponse message
|
||||
|
||||
// let mut tw = TLVWriter::new(tx.get_writebuf()?);
|
||||
// tw.end_container()?;
|
||||
|
||||
transaction.complete();
|
||||
}
|
||||
|
||||
tw.bool(
|
||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
||||
false,
|
||||
)?;
|
||||
|
||||
tw.end_container()?;
|
||||
|
||||
self.resume_path = resume_path;
|
||||
transaction.exch_mut().set_suspended_subscribe_req(self);
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ pub enum DataOption {
|
|||
CaseSession(CaseSession),
|
||||
Time(Duration),
|
||||
SuspendedReadReq(ResumeReadReq),
|
||||
SubscriptionId(u32),
|
||||
SuspendedSubscibeReq(ResumeSubscribeReq),
|
||||
#[default]
|
||||
None,
|
||||
|
@ -168,6 +169,20 @@ impl Exchange {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn set_subscription_id(&mut self, id: u32) {
|
||||
self.data = DataOption::SubscriptionId(id);
|
||||
}
|
||||
|
||||
pub fn take_subscription_id(&mut self) -> Option<u32> {
|
||||
let old = core::mem::replace(&mut self.data, DataOption::None);
|
||||
if let DataOption::SubscriptionId(id) = old {
|
||||
Some(id)
|
||||
} else {
|
||||
self.data = old;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_suspended_subscribe_req(&mut self, req: ResumeSubscribeReq) {
|
||||
self.data = DataOption::SuspendedSubscibeReq(req);
|
||||
}
|
||||
|
|
|
@ -70,6 +70,7 @@ impl<'r, 'a, 'p> RecvCompletion<'r, 'a, 'p> {
|
|||
|
||||
fn maybe_next_action(&mut self) -> Result<Option<Option<RecvAction<'_, 'p>>>, Error> {
|
||||
self.mgr.exch_mgr.purge();
|
||||
self.tx.reset();
|
||||
|
||||
let (state, next) = match core::mem::replace(&mut self.state, RecvState::New) {
|
||||
RecvState::New => {
|
||||
|
@ -108,7 +109,7 @@ impl<'r, 'a, 'p> RecvCompletion<'r, 'a, 'p> {
|
|||
}
|
||||
}
|
||||
Ok(None) => (RecvState::Ack, None),
|
||||
Err(Error::Duplicate) => (RecvState::Ack, Some(None)),
|
||||
Err(Error::Duplicate) => (RecvState::Ack, None),
|
||||
Err(Error::NoSpace) => (RecvState::EvictSession, None),
|
||||
Err(err) => Err(err)?,
|
||||
},
|
||||
|
|
Loading…
Add table
Reference in a new issue