IM: Include baseline support for Subscription Transaction

Currently the entire transaction goes through successfully and the other end
is notified of a successful subscription. Actual subscription part is yet to be
implemented.
This commit is contained in:
Kedar Sovani 2023-01-13 20:01:43 +05:30
parent 1a0a41812d
commit cc96c9d125
4 changed files with 206 additions and 4 deletions

View file

@ -100,6 +100,26 @@ impl InteractionModel {
InteractionModel { consumer } InteractionModel { consumer }
} }
pub fn handle_status_resp(
&mut self,
trans: &mut Transaction,
rx_buf: &[u8],
proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> {
let root = get_root_node_struct(rx_buf)?;
let req = StatusResp::from_tlv(&root)?;
let mut handled = false;
let result = self.handle_subscription_confirm(trans, proto_tx, &mut handled);
if handled {
result
} else {
// Nothing to do for now
info!("Received status report with status {:?}", req.status);
Ok(ResponseRequired::No)
}
}
pub fn handle_timed_req( pub fn handle_timed_req(
&mut self, &mut self,
trans: &mut Transaction, trans: &mut Transaction,
@ -162,6 +182,8 @@ impl proto_demux::HandleProto for InteractionModel {
OpCode::ReadRequest => self.handle_read_req(&mut trans, buf, &mut ctx.tx)?, OpCode::ReadRequest => self.handle_read_req(&mut trans, buf, &mut ctx.tx)?,
OpCode::WriteRequest => self.handle_write_req(&mut trans, buf, &mut ctx.tx)?, OpCode::WriteRequest => self.handle_write_req(&mut trans, buf, &mut ctx.tx)?,
OpCode::TimedRequest => self.handle_timed_req(&mut trans, buf, &mut ctx.tx)?, OpCode::TimedRequest => self.handle_timed_req(&mut trans, buf, &mut ctx.tx)?,
OpCode::SubscribeRequest => self.handle_subscribe_req(&mut trans, buf, &mut ctx.tx)?,
OpCode::StatusResponse => self.handle_status_resp(&mut trans, buf, &mut ctx.tx)?,
_ => { _ => {
error!("Opcode Not Handled: {:?}", proto_opcode); error!("Opcode Not Handled: {:?}", proto_opcode);
return Err(Error::InvalidOpcode); return Err(Error::InvalidOpcode);

View file

@ -71,7 +71,55 @@ pub mod msg {
tlv::{FromTLV, TLVArray, TLVElement, TLVWriter, TagType, ToTLV}, tlv::{FromTLV, TLVArray, TLVElement, TLVWriter, TagType, ToTLV},
}; };
use super::ib::{self, AttrData, AttrPath, AttrResp, AttrStatus, CmdData, DataVersionFilter}; use super::ib::{
self, AttrData, AttrPath, AttrResp, AttrStatus, CmdData, DataVersionFilter, EventFilter,
EventPath,
};
#[derive(FromTLV)]
#[tlvargs(lifetime = "'a")]
pub struct SubscribeReq<'a> {
pub keep_subs: bool,
pub min_int_floor: u16,
pub max_int_ceil: u16,
pub attr_requests: Option<TLVArray<'a, AttrPath>>,
event_requests: Option<TLVArray<'a, EventPath>>,
event_filters: Option<TLVArray<'a, EventFilter>>,
// The Context Tags are discontiguous for some reason
_dummy: Option<bool>,
pub fabric_filtered: bool,
pub dataver_filters: Option<TLVArray<'a, DataVersionFilter>>,
}
impl<'a> SubscribeReq<'a> {
pub fn to_read_req(&self) -> ReadReq<'a> {
ReadReq {
attr_requests: self.attr_requests,
event_requests: self.event_requests,
event_filters: self.event_filters,
fabric_filtered: self.fabric_filtered,
dataver_filters: self.dataver_filters,
}
}
}
#[derive(ToTLV)]
pub struct SubscribeResp {
pub subs_id: u32,
// The Context Tags are discontiguous for some reason
_dummy: Option<u32>,
pub max_int: u16,
}
impl SubscribeResp {
pub fn new(subs_id: u32, max_int: u16) -> Self {
Self {
subs_id,
_dummy: None,
max_int,
}
}
}
#[derive(FromTLV, ToTLV)] #[derive(FromTLV, ToTLV)]
pub struct TimedReq { pub struct TimedReq {
@ -115,8 +163,8 @@ pub mod msg {
#[tlvargs(lifetime = "'a")] #[tlvargs(lifetime = "'a")]
pub struct ReadReq<'a> { pub struct ReadReq<'a> {
pub attr_requests: Option<TLVArray<'a, AttrPath>>, pub attr_requests: Option<TLVArray<'a, AttrPath>>,
event_requests: Option<bool>, event_requests: Option<TLVArray<'a, EventPath>>,
event_filters: Option<bool>, event_filters: Option<TLVArray<'a, EventFilter>>,
pub fabric_filtered: bool, pub fabric_filtered: bool,
pub dataver_filters: Option<TLVArray<'a, DataVersionFilter>>, pub dataver_filters: Option<TLVArray<'a, DataVersionFilter>>,
} }
@ -172,7 +220,7 @@ pub mod msg {
} }
pub enum ReportDataTag { pub enum ReportDataTag {
_SubscriptionId = 0, SubscriptionId = 0,
AttributeReports = 1, AttributeReports = 1,
_EventReport = 2, _EventReport = 2,
_MoreChunkedMsgs = 3, _MoreChunkedMsgs = 3,
@ -472,4 +520,20 @@ pub mod ib {
pub path: ClusterPath, pub path: ClusterPath,
pub data_ver: u32, pub data_ver: u32,
} }
#[derive(FromTLV, ToTLV, Copy, Clone)]
#[tlvargs(datatype = "list")]
pub struct EventPath {
pub node: Option<u64>,
pub endpoint: Option<u16>,
pub cluster: Option<u32>,
pub event: Option<u32>,
pub is_urgent: Option<bool>,
}
#[derive(FromTLV, ToTLV, Copy, Clone)]
pub struct EventFilter {
pub node: Option<u64>,
pub event_min: Option<u64>,
}
} }

View file

@ -64,4 +64,5 @@ pub mod command;
pub mod core; pub mod core;
pub mod messages; pub mod messages;
pub mod read; pub mod read;
pub mod subscribe;
pub mod write; pub mod write;

View file

@ -0,0 +1,115 @@
/*
*
* Copyright (c) 2023 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::atomic::{AtomicU32, Ordering};
use crate::{
error::Error,
interaction_model::core::OpCode,
tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV},
transport::{packet::Packet, proto_demux::ResponseRequired},
};
use log::error;
use super::{
messages::msg::{self, SubscribeReq, SubscribeResp},
InteractionModel, Transaction,
};
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
impl InteractionModel {
pub fn handle_subscribe_req(
&mut self,
trans: &mut Transaction,
rx_buf: &[u8],
proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> {
proto_tx.set_proto_opcode(OpCode::ReportData as u8);
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let root = get_root_node_struct(rx_buf)?;
let req = SubscribeReq::from_tlv(&root)?;
let ctx = Box::new(SubsCtx {
state: SubsState::Confirming,
// TODO
id: SUBS_ID.fetch_add(1, Ordering::SeqCst),
});
let read_req = req.to_read_req();
tw.start_struct(TagType::Anonymous)?;
tw.u32(
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
ctx.id,
)?;
self.consumer.consume_read_attr(&read_req, trans, &mut tw)?;
tw.bool(
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
false,
)?;
tw.end_container()?;
if !trans.exch.is_data_none() {
error!("Exchange data already set!");
return Err(Error::InvalidState);
}
trans.exch.set_data_boxed(ctx);
Ok(ResponseRequired::Yes)
}
pub fn handle_subscription_confirm(
&mut self,
trans: &mut Transaction,
proto_tx: &mut Packet,
request_handled: &mut bool,
) -> Result<ResponseRequired, Error> {
*request_handled = false;
if let Some(ctx) = trans.exch.get_data_boxed::<SubsCtx>() {
if ctx.state != SubsState::Confirming {
// Not relevant for us
return Err(Error::Invalid);
}
*request_handled = true;
ctx.state = SubsState::Confirmed;
proto_tx.set_proto_opcode(OpCode::SubscriptResponse as u8);
// TODO
let resp = SubscribeResp::new(ctx.id, 40);
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
resp.to_tlv(&mut tw, TagType::Anonymous)?;
trans.complete();
Ok(ResponseRequired::Yes)
} else {
trans.complete();
Err(Error::Invalid)
}
}
}
#[derive(PartialEq)]
enum SubsState {
Confirming,
Confirmed,
}
struct SubsCtx {
state: SubsState,
id: u32,
}