Merge pull request #28 from kedars/feature/large_reads
Support multi-leg reads
This commit is contained in:
commit
9b5a2530f8
16 changed files with 671 additions and 290 deletions
|
@ -15,6 +15,8 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
use self::subscribe::SubsCtx;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
cluster_basic_information::BasicInfoConfig,
|
cluster_basic_information::BasicInfoConfig,
|
||||||
device_types::device_type_add_root_node,
|
device_types::device_type_add_root_node,
|
||||||
|
@ -28,7 +30,7 @@ use crate::{
|
||||||
fabric::FabricMgr,
|
fabric::FabricMgr,
|
||||||
interaction_model::{
|
interaction_model::{
|
||||||
command::CommandReq,
|
command::CommandReq,
|
||||||
core::IMStatusCode,
|
core::{IMStatusCode, OpCode},
|
||||||
messages::{
|
messages::{
|
||||||
ib::{self, AttrData, DataVersionFilter},
|
ib::{self, AttrData, DataVersionFilter},
|
||||||
msg::{self, InvReq, ReadReq, WriteReq},
|
msg::{self, InvReq, ReadReq, WriteReq},
|
||||||
|
@ -37,8 +39,11 @@ use crate::{
|
||||||
InteractionConsumer, Transaction,
|
InteractionConsumer, Transaction,
|
||||||
},
|
},
|
||||||
secure_channel::pake::PaseMgr,
|
secure_channel::pake::PaseMgr,
|
||||||
tlv::{TLVArray, TLVWriter, TagType, ToTLV},
|
tlv::{self, FromTLV, TLVArray, TLVWriter, TagType, ToTLV},
|
||||||
transport::session::{Session, SessionMode},
|
transport::{
|
||||||
|
proto_demux::ResponseRequired,
|
||||||
|
session::{Session, SessionMode},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
@ -76,17 +81,6 @@ impl DataModel {
|
||||||
Ok(dm)
|
Ok(dm)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_attribute_raw(
|
|
||||||
&self,
|
|
||||||
endpoint: u16,
|
|
||||||
cluster: u32,
|
|
||||||
attr: u16,
|
|
||||||
) -> Result<AttrValue, IMStatusCode> {
|
|
||||||
let node = self.node.read().unwrap();
|
|
||||||
let cluster = node.get_cluster(endpoint, cluster)?;
|
|
||||||
cluster.base().read_attribute_raw(attr).map(|a| a.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode a write attribute from a path that may or may not be wildcard
|
// Encode a write attribute from a path that may or may not be wildcard
|
||||||
fn handle_write_attr_path(
|
fn handle_write_attr_path(
|
||||||
node: &mut Node,
|
node: &mut Node,
|
||||||
|
@ -153,42 +147,6 @@ impl DataModel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode a read attribute from a path that may or may not be wildcard
|
|
||||||
fn handle_read_attr_path(
|
|
||||||
node: &Node,
|
|
||||||
accessor: &Accessor,
|
|
||||||
attr_encoder: &mut AttrReadEncoder,
|
|
||||||
attr_details: &mut AttrDetails,
|
|
||||||
) {
|
|
||||||
let path = attr_encoder.path;
|
|
||||||
// Skip error reporting for wildcard paths, don't for concrete paths
|
|
||||||
attr_encoder.skip_error(path.is_wildcard());
|
|
||||||
|
|
||||||
let result = node.for_each_attribute(&path, |path, c| {
|
|
||||||
// Ignore processing if data filter matches.
|
|
||||||
// For a wildcard attribute, this may end happening unnecessarily for all attributes, although
|
|
||||||
// a single skip for the cluster is sufficient. That requires us to replace this for_each with a
|
|
||||||
// for_each_cluster
|
|
||||||
let cluster_data_ver = c.base().get_dataver();
|
|
||||||
if Self::data_filter_matches(&attr_encoder.data_ver_filters, path, cluster_data_ver) {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
attr_details.attr_id = path.leaf.unwrap_or_default() as u16;
|
|
||||||
// Overwrite the previous path with the concrete path
|
|
||||||
attr_encoder.set_path(*path);
|
|
||||||
// Set the cluster's data version
|
|
||||||
attr_encoder.set_data_ver(cluster_data_ver);
|
|
||||||
let mut access_req = AccessReq::new(accessor, path, Access::READ);
|
|
||||||
Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details);
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
if let Err(e) = result {
|
|
||||||
// We hit this only if this is a non-wildcard path
|
|
||||||
attr_encoder.encode_status(e, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle command from a path that may or may not be wildcard
|
// Handle command from a path that may or may not be wildcard
|
||||||
fn handle_command_path(node: &mut Node, cmd_req: &mut CommandReq) {
|
fn handle_command_path(node: &mut Node, cmd_req: &mut CommandReq) {
|
||||||
let wildcard = cmd_req.cmd.path.is_wildcard();
|
let wildcard = cmd_req.cmd.path.is_wildcard();
|
||||||
|
@ -266,6 +224,15 @@ impl DataModel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod read;
|
||||||
|
pub mod subscribe;
|
||||||
|
|
||||||
|
/// Type of Resume Request
|
||||||
|
enum ResumeReq {
|
||||||
|
Subscribe(subscribe::SubsCtx),
|
||||||
|
Read(read::ResumeReadReq),
|
||||||
|
}
|
||||||
|
|
||||||
impl objects::ChangeConsumer for DataModel {
|
impl objects::ChangeConsumer for DataModel {
|
||||||
fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> {
|
fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> {
|
||||||
endpoint.add_cluster(DescriptorCluster::new(id, self.clone())?)?;
|
endpoint.add_cluster(DescriptorCluster::new(id, self.clone())?)?;
|
||||||
|
@ -294,45 +261,22 @@ impl InteractionConsumer for DataModel {
|
||||||
|
|
||||||
fn consume_read_attr(
|
fn consume_read_attr(
|
||||||
&self,
|
&self,
|
||||||
read_req: &ReadReq,
|
rx_buf: &[u8],
|
||||||
trans: &mut Transaction,
|
trans: &mut Transaction,
|
||||||
tw: &mut TLVWriter,
|
tw: &mut TLVWriter,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let mut attr_encoder = AttrReadEncoder::new(tw);
|
let mut resume_from = None;
|
||||||
if let Some(filters) = &read_req.dataver_filters {
|
let root = tlv::get_root_node(rx_buf)?;
|
||||||
attr_encoder.set_data_ver_filters(filters);
|
let req = ReadReq::from_tlv(&root)?;
|
||||||
|
self.handle_read_req(&req, trans, tw, &mut resume_from)?;
|
||||||
|
if resume_from.is_some() {
|
||||||
|
// This is a multi-hop read transaction, remember this read request
|
||||||
|
let resume = read::ResumeReadReq::new(rx_buf, &resume_from)?;
|
||||||
|
if !trans.exch.is_data_none() {
|
||||||
|
error!("Exchange data already set, and multi-hop read");
|
||||||
|
return Err(Error::InvalidState);
|
||||||
}
|
}
|
||||||
|
trans.exch.set_data_boxed(Box::new(ResumeReq::Read(resume)));
|
||||||
let mut attr_details = AttrDetails {
|
|
||||||
// This will be updated internally
|
|
||||||
attr_id: 0,
|
|
||||||
// This will be updated internally
|
|
||||||
list_index: None,
|
|
||||||
// This will be updated internally
|
|
||||||
fab_idx: 0,
|
|
||||||
fab_filter: read_req.fabric_filtered,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(attr_requests) = &read_req.attr_requests {
|
|
||||||
let accessor = self.sess_to_accessor(trans.session);
|
|
||||||
let node = self.node.read().unwrap();
|
|
||||||
attr_encoder
|
|
||||||
.tw
|
|
||||||
.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
|
|
||||||
|
|
||||||
for attr_path in attr_requests.iter() {
|
|
||||||
attr_encoder.set_path(attr_path.to_gp());
|
|
||||||
// Extract the attr_path fields into various structures
|
|
||||||
attr_details.list_index = attr_path.list_index;
|
|
||||||
attr_details.fab_idx = accessor.fab_idx;
|
|
||||||
DataModel::handle_read_attr_path(
|
|
||||||
&node,
|
|
||||||
&accessor,
|
|
||||||
&mut attr_encoder,
|
|
||||||
&mut attr_details,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
tw.end_container()?;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -367,61 +311,44 @@ impl InteractionConsumer for DataModel {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// Encoder for generating a response to a read request
|
fn consume_status_report(
|
||||||
pub struct AttrReadEncoder<'a, 'b, 'c> {
|
&self,
|
||||||
tw: &'a mut TLVWriter<'b, 'c>,
|
req: &msg::StatusResp,
|
||||||
data_ver: u32,
|
trans: &mut Transaction,
|
||||||
path: GenericPath,
|
tw: &mut TLVWriter,
|
||||||
skip_error: bool,
|
) -> Result<(OpCode, ResponseRequired), Error> {
|
||||||
data_ver_filters: Option<&'a TLVArray<'a, DataVersionFilter>>,
|
if let Some(mut resume) = trans.exch.take_data_boxed::<ResumeReq>() {
|
||||||
}
|
let result = match *resume {
|
||||||
|
ResumeReq::Read(ref mut read) => self.handle_resume_read(read, trans, tw)?,
|
||||||
|
|
||||||
impl<'a, 'b, 'c> AttrReadEncoder<'a, 'b, 'c> {
|
ResumeReq::Subscribe(ref mut ctx) => ctx.handle_status_report(trans, tw, self)?,
|
||||||
pub fn new(tw: &'a mut TLVWriter<'b, 'c>) -> Self {
|
};
|
||||||
Self {
|
trans.exch.set_data_boxed(resume);
|
||||||
tw,
|
Ok(result)
|
||||||
data_ver: 0,
|
} else {
|
||||||
skip_error: false,
|
// Nothing to do for now
|
||||||
path: Default::default(),
|
trans.complete();
|
||||||
data_ver_filters: None,
|
info!("Received status report with status {:?}", req.status);
|
||||||
|
Ok((OpCode::Reserved, ResponseRequired::No))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn skip_error(&mut self, skip: bool) {
|
fn consume_subscribe(
|
||||||
self.skip_error = skip;
|
&self,
|
||||||
}
|
rx_buf: &[u8],
|
||||||
|
trans: &mut Transaction,
|
||||||
pub fn set_data_ver(&mut self, data_ver: u32) {
|
tw: &mut TLVWriter,
|
||||||
self.data_ver = data_ver;
|
) -> Result<(OpCode, ResponseRequired), Error> {
|
||||||
}
|
if !trans.exch.is_data_none() {
|
||||||
|
error!("Exchange data already set!");
|
||||||
pub fn set_data_ver_filters(&mut self, filters: &'a TLVArray<'a, DataVersionFilter>) {
|
return Err(Error::InvalidState);
|
||||||
self.data_ver_filters = Some(filters);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_path(&mut self, path: GenericPath) {
|
|
||||||
self.path = path;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> {
|
|
||||||
fn encode(&mut self, value: EncodeValue) {
|
|
||||||
let resp = ib::AttrResp::Data(ib::AttrData::new(
|
|
||||||
Some(self.data_ver),
|
|
||||||
ib::AttrPath::new(&self.path),
|
|
||||||
value,
|
|
||||||
));
|
|
||||||
let _ = resp.to_tlv(self.tw, TagType::Anonymous);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn encode_status(&mut self, status: IMStatusCode, cluster_status: u16) {
|
|
||||||
if !self.skip_error {
|
|
||||||
let resp =
|
|
||||||
ib::AttrResp::Status(ib::AttrStatus::new(&self.path, status, cluster_status));
|
|
||||||
let _ = resp.to_tlv(self.tw, TagType::Anonymous);
|
|
||||||
}
|
}
|
||||||
|
let ctx = SubsCtx::new(rx_buf, trans, tw, self)?;
|
||||||
|
trans
|
||||||
|
.exch
|
||||||
|
.set_data_boxed(Box::new(ResumeReq::Subscribe(ctx)));
|
||||||
|
Ok((OpCode::ReportData, ResponseRequired::Yes))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
319
matter/src/data_model/core/read.rs
Normal file
319
matter/src/data_model/core/read.rs
Normal file
|
@ -0,0 +1,319 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright (c) 2020-2022 Project CHIP Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
acl::{AccessReq, Accessor},
|
||||||
|
data_model::{core::DataModel, objects::*},
|
||||||
|
error::*,
|
||||||
|
interaction_model::{
|
||||||
|
core::{IMStatusCode, OpCode},
|
||||||
|
messages::{
|
||||||
|
ib::{self, DataVersionFilter},
|
||||||
|
msg::{self, ReadReq, ReportDataTag::MoreChunkedMsgs, ReportDataTag::SupressResponse},
|
||||||
|
GenericPath,
|
||||||
|
},
|
||||||
|
Transaction,
|
||||||
|
},
|
||||||
|
tlv::{self, FromTLV, TLVArray, TLVWriter, TagType, ToTLV},
|
||||||
|
transport::{packet::Packet, proto_demux::ResponseRequired},
|
||||||
|
utils::writebuf::WriteBuf,
|
||||||
|
wb_shrink, wb_unshrink,
|
||||||
|
};
|
||||||
|
use log::error;
|
||||||
|
|
||||||
|
/// Encoder for generating a response to a read request
|
||||||
|
pub struct AttrReadEncoder<'a, 'b, 'c> {
|
||||||
|
tw: &'a mut TLVWriter<'b, 'c>,
|
||||||
|
data_ver: u32,
|
||||||
|
path: GenericPath,
|
||||||
|
skip_error: bool,
|
||||||
|
data_ver_filters: Option<&'a TLVArray<'a, DataVersionFilter>>,
|
||||||
|
is_buffer_full: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, 'c> AttrReadEncoder<'a, 'b, 'c> {
|
||||||
|
pub fn new(tw: &'a mut TLVWriter<'b, 'c>) -> Self {
|
||||||
|
Self {
|
||||||
|
tw,
|
||||||
|
data_ver: 0,
|
||||||
|
skip_error: false,
|
||||||
|
path: Default::default(),
|
||||||
|
data_ver_filters: None,
|
||||||
|
is_buffer_full: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn skip_error(&mut self, skip: bool) {
|
||||||
|
self.skip_error = skip;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_data_ver(&mut self, data_ver: u32) {
|
||||||
|
self.data_ver = data_ver;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_data_ver_filters(&mut self, filters: &'a TLVArray<'a, DataVersionFilter>) {
|
||||||
|
self.data_ver_filters = Some(filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_path(&mut self, path: GenericPath) {
|
||||||
|
self.path = path;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_buffer_full(&self) -> bool {
|
||||||
|
self.is_buffer_full
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> {
|
||||||
|
fn encode(&mut self, value: EncodeValue) {
|
||||||
|
let resp = ib::AttrResp::Data(ib::AttrData::new(
|
||||||
|
Some(self.data_ver),
|
||||||
|
ib::AttrPath::new(&self.path),
|
||||||
|
value,
|
||||||
|
));
|
||||||
|
|
||||||
|
let anchor = self.tw.get_tail();
|
||||||
|
if resp.to_tlv(self.tw, TagType::Anonymous).is_err() {
|
||||||
|
self.is_buffer_full = true;
|
||||||
|
self.tw.rewind_to(anchor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode_status(&mut self, status: IMStatusCode, cluster_status: u16) {
|
||||||
|
if !self.skip_error {
|
||||||
|
let resp =
|
||||||
|
ib::AttrResp::Status(ib::AttrStatus::new(&self.path, status, cluster_status));
|
||||||
|
let _ = resp.to_tlv(self.tw, TagType::Anonymous);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State to maintain when a Read Request needs to be resumed
|
||||||
|
/// resumed - the next chunk of the read needs to be returned
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ResumeReadReq {
|
||||||
|
/// The Read Request Attribute Path that caused chunking, and this is the path
|
||||||
|
/// that needs to be resumed.
|
||||||
|
pub pending_req: Option<Packet<'static>>,
|
||||||
|
|
||||||
|
/// The Attribute that couldn't be encoded because our buffer got full. The next chunk
|
||||||
|
/// will start encoding from this attribute onwards.
|
||||||
|
/// Note that given wildcard reads, one PendingPath in the member above can generated
|
||||||
|
/// multiple encode paths. Hence this has to be maintained separately.
|
||||||
|
pub resume_from: Option<GenericPath>,
|
||||||
|
}
|
||||||
|
impl ResumeReadReq {
|
||||||
|
pub fn new(rx_buf: &[u8], resume_from: &Option<GenericPath>) -> Result<Self, Error> {
|
||||||
|
let mut packet = Packet::new_rx()?;
|
||||||
|
let dst = packet.as_borrow_slice();
|
||||||
|
|
||||||
|
let src_len = rx_buf.len();
|
||||||
|
dst[..src_len].copy_from_slice(rx_buf);
|
||||||
|
packet.get_parsebuf()?.set_len(src_len);
|
||||||
|
Ok(ResumeReadReq {
|
||||||
|
pending_req: Some(packet),
|
||||||
|
resume_from: *resume_from,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataModel {
|
||||||
|
pub fn read_attribute_raw(
|
||||||
|
&self,
|
||||||
|
endpoint: u16,
|
||||||
|
cluster: u32,
|
||||||
|
attr: u16,
|
||||||
|
) -> Result<AttrValue, IMStatusCode> {
|
||||||
|
let node = self.node.read().unwrap();
|
||||||
|
let cluster = node.get_cluster(endpoint, cluster)?;
|
||||||
|
cluster.base().read_attribute_raw(attr).map(|a| a.clone())
|
||||||
|
}
|
||||||
|
/// Encode a read attribute from a path that may or may not be wildcard
|
||||||
|
///
|
||||||
|
/// If the buffer gets full while generating the read response, we will return
|
||||||
|
/// an Err(path), where the path is the path that we should resume from, for the next chunk.
|
||||||
|
/// This facilitates chunk management
|
||||||
|
fn handle_read_attr_path(
|
||||||
|
node: &Node,
|
||||||
|
accessor: &Accessor,
|
||||||
|
attr_encoder: &mut AttrReadEncoder,
|
||||||
|
attr_details: &mut AttrDetails,
|
||||||
|
resume_from: &mut Option<GenericPath>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut status = Ok(());
|
||||||
|
let path = attr_encoder.path;
|
||||||
|
|
||||||
|
// Skip error reporting for wildcard paths, don't for concrete paths
|
||||||
|
attr_encoder.skip_error(path.is_wildcard());
|
||||||
|
|
||||||
|
let result = node.for_each_attribute(&path, |path, c| {
|
||||||
|
// Ignore processing if data filter matches.
|
||||||
|
// For a wildcard attribute, this may end happening unnecessarily for all attributes, although
|
||||||
|
// a single skip for the cluster is sufficient. That requires us to replace this for_each with a
|
||||||
|
// for_each_cluster
|
||||||
|
let cluster_data_ver = c.base().get_dataver();
|
||||||
|
if Self::data_filter_matches(&attr_encoder.data_ver_filters, path, cluster_data_ver) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// The resume_from indicates that this is the next chunk of a previous Read Request. In such cases, we
|
||||||
|
// need to skip until we hit this path.
|
||||||
|
if let Some(r) = resume_from {
|
||||||
|
// If resume_from is valid, and we haven't hit the resume_from yet, skip encoding
|
||||||
|
if r != path {
|
||||||
|
return Ok(());
|
||||||
|
} else {
|
||||||
|
// Else, wipe out the resume_from so subsequent paths can be encoded
|
||||||
|
*resume_from = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
attr_details.attr_id = path.leaf.unwrap_or_default() as u16;
|
||||||
|
// Overwrite the previous path with the concrete path
|
||||||
|
attr_encoder.set_path(*path);
|
||||||
|
// Set the cluster's data version
|
||||||
|
attr_encoder.set_data_ver(cluster_data_ver);
|
||||||
|
let mut access_req = AccessReq::new(accessor, path, Access::READ);
|
||||||
|
Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details);
|
||||||
|
if attr_encoder.is_buffer_full() {
|
||||||
|
// Buffer is full, next time resume from this attribute
|
||||||
|
*resume_from = Some(*path);
|
||||||
|
status = Err(Error::NoSpace);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
if let Err(e) = result {
|
||||||
|
// We hit this only if this is a non-wildcard path
|
||||||
|
attr_encoder.encode_status(e, 0);
|
||||||
|
}
|
||||||
|
status
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process an array of Attribute Read Requests
|
||||||
|
///
|
||||||
|
/// When the API returns the chunked read is on, if *resume_from is Some(x) otherwise
|
||||||
|
/// the read is complete
|
||||||
|
pub(super) fn handle_read_attr_array(
|
||||||
|
&self,
|
||||||
|
read_req: &ReadReq,
|
||||||
|
trans: &mut Transaction,
|
||||||
|
old_tw: &mut TLVWriter,
|
||||||
|
resume_from: &mut Option<GenericPath>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let old_wb = old_tw.get_buf();
|
||||||
|
// Note, this function may be called from multiple places: a) an actual read
|
||||||
|
// request, a b) resumed read request, c) subscribe request or d) resumed subscribe
|
||||||
|
// request. Hopefully 18 is sufficient to address all those scenarios.
|
||||||
|
//
|
||||||
|
// This is the amount of space we reserve for other things to be attached towards
|
||||||
|
// the end
|
||||||
|
const RESERVE_SIZE: usize = 18;
|
||||||
|
let mut new_wb = wb_shrink!(old_wb, RESERVE_SIZE);
|
||||||
|
let mut tw = TLVWriter::new(&mut new_wb);
|
||||||
|
|
||||||
|
let mut attr_encoder = AttrReadEncoder::new(&mut tw);
|
||||||
|
if let Some(filters) = &read_req.dataver_filters {
|
||||||
|
attr_encoder.set_data_ver_filters(filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(attr_requests) = &read_req.attr_requests {
|
||||||
|
let accessor = self.sess_to_accessor(trans.session);
|
||||||
|
let mut attr_details = AttrDetails::new(accessor.fab_idx, read_req.fabric_filtered);
|
||||||
|
let node = self.node.read().unwrap();
|
||||||
|
attr_encoder
|
||||||
|
.tw
|
||||||
|
.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
|
||||||
|
|
||||||
|
let mut result = Ok(());
|
||||||
|
for attr_path in attr_requests.iter() {
|
||||||
|
attr_encoder.set_path(attr_path.to_gp());
|
||||||
|
// Extract the attr_path fields into various structures
|
||||||
|
attr_details.list_index = attr_path.list_index;
|
||||||
|
result = DataModel::handle_read_attr_path(
|
||||||
|
&node,
|
||||||
|
&accessor,
|
||||||
|
&mut attr_encoder,
|
||||||
|
&mut attr_details,
|
||||||
|
resume_from,
|
||||||
|
);
|
||||||
|
if result.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Now that all the read reports are captured, let's use the old_tw that is
|
||||||
|
// the full writebuf, and hopefully as all the necessary space to store this
|
||||||
|
wb_unshrink!(old_wb, new_wb);
|
||||||
|
old_tw.end_container()?; // Finish the AttrReports
|
||||||
|
|
||||||
|
if result.is_err() {
|
||||||
|
// If there was an error, indicate chunking. The resume_read_req would have been
|
||||||
|
// already populated in the loop above.
|
||||||
|
old_tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?;
|
||||||
|
} else {
|
||||||
|
// A None resume_from indicates no chunking
|
||||||
|
*resume_from = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a read request
|
||||||
|
///
|
||||||
|
/// This could be called from an actual read request or a resumed read request. Subscription
|
||||||
|
/// requests do not come to this function.
|
||||||
|
/// When the API returns the chunked read is on, if *resume_from is Some(x) otherwise
|
||||||
|
/// the read is complete
|
||||||
|
pub fn handle_read_req(
|
||||||
|
&self,
|
||||||
|
read_req: &ReadReq,
|
||||||
|
trans: &mut Transaction,
|
||||||
|
tw: &mut TLVWriter,
|
||||||
|
resume_from: &mut Option<GenericPath>,
|
||||||
|
) -> Result<(OpCode, ResponseRequired), Error> {
|
||||||
|
tw.start_struct(TagType::Anonymous)?;
|
||||||
|
|
||||||
|
self.handle_read_attr_array(read_req, trans, tw, resume_from)?;
|
||||||
|
|
||||||
|
if resume_from.is_none() {
|
||||||
|
tw.bool(TagType::Context(SupressResponse as u8), true)?;
|
||||||
|
// Mark transaction complete, if not chunked
|
||||||
|
trans.complete();
|
||||||
|
}
|
||||||
|
tw.end_container()?;
|
||||||
|
Ok((OpCode::ReportData, ResponseRequired::Yes))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a resumed read request
|
||||||
|
pub fn handle_resume_read(
|
||||||
|
&self,
|
||||||
|
resume_read_req: &mut ResumeReadReq,
|
||||||
|
trans: &mut Transaction,
|
||||||
|
tw: &mut TLVWriter,
|
||||||
|
) -> Result<(OpCode, ResponseRequired), Error> {
|
||||||
|
if let Some(packet) = resume_read_req.pending_req.as_mut() {
|
||||||
|
let rx_buf = packet.get_parsebuf()?.as_borrow_slice();
|
||||||
|
let root = tlv::get_root_node(rx_buf)?;
|
||||||
|
let req = ReadReq::from_tlv(&root)?;
|
||||||
|
|
||||||
|
self.handle_read_req(&req, trans, tw, &mut resume_read_req.resume_from)
|
||||||
|
} else {
|
||||||
|
// No pending req, is that even possible?
|
||||||
|
error!("This shouldn't have happened");
|
||||||
|
Ok((OpCode::Reserved, ResponseRequired::No))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
142
matter/src/data_model/core/subscribe.rs
Normal file
142
matter/src/data_model/core/subscribe.rs
Normal file
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright (c) 2023 Project CHIP Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use std::sync::atomic::{AtomicU32, Ordering};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
error::Error,
|
||||||
|
interaction_model::{
|
||||||
|
core::OpCode,
|
||||||
|
messages::{
|
||||||
|
msg::{self, SubscribeReq, SubscribeResp},
|
||||||
|
GenericPath,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
tlv::{self, get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV},
|
||||||
|
transport::proto_demux::ResponseRequired,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{read::ResumeReadReq, DataModel, Transaction};
|
||||||
|
|
||||||
|
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
|
||||||
|
|
||||||
|
#[derive(PartialEq)]
|
||||||
|
enum SubsState {
|
||||||
|
Confirming,
|
||||||
|
Confirmed,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SubsCtx {
|
||||||
|
state: SubsState,
|
||||||
|
id: u32,
|
||||||
|
resume_read_req: Option<ResumeReadReq>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubsCtx {
|
||||||
|
pub fn new(
|
||||||
|
rx_buf: &[u8],
|
||||||
|
trans: &mut Transaction,
|
||||||
|
tw: &mut TLVWriter,
|
||||||
|
dm: &DataModel,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
let root = get_root_node_struct(rx_buf)?;
|
||||||
|
let req = SubscribeReq::from_tlv(&root)?;
|
||||||
|
|
||||||
|
let mut ctx = SubsCtx {
|
||||||
|
state: SubsState::Confirming,
|
||||||
|
// TODO
|
||||||
|
id: SUBS_ID.fetch_add(1, Ordering::SeqCst),
|
||||||
|
resume_read_req: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut resume_from = None;
|
||||||
|
ctx.do_read(&req, trans, tw, dm, &mut resume_from)?;
|
||||||
|
if resume_from.is_some() {
|
||||||
|
// This is a multi-hop read transaction, remember this read request
|
||||||
|
ctx.resume_read_req = Some(ResumeReadReq::new(rx_buf, &resume_from)?);
|
||||||
|
}
|
||||||
|
Ok(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn handle_status_report(
|
||||||
|
&mut self,
|
||||||
|
trans: &mut Transaction,
|
||||||
|
tw: &mut TLVWriter,
|
||||||
|
dm: &DataModel,
|
||||||
|
) -> Result<(OpCode, ResponseRequired), Error> {
|
||||||
|
if self.state != SubsState::Confirming {
|
||||||
|
// Not relevant for us
|
||||||
|
trans.complete();
|
||||||
|
return Err(Error::Invalid);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is there a previous resume read pending
|
||||||
|
if self.resume_read_req.is_some() {
|
||||||
|
let mut resume_read_req = self.resume_read_req.take().unwrap();
|
||||||
|
if let Some(packet) = resume_read_req.pending_req.as_mut() {
|
||||||
|
let rx_buf = packet.get_parsebuf()?.as_borrow_slice();
|
||||||
|
let root = tlv::get_root_node(rx_buf)?;
|
||||||
|
let req = SubscribeReq::from_tlv(&root)?;
|
||||||
|
|
||||||
|
self.do_read(&req, trans, tw, dm, &mut resume_read_req.resume_from)?;
|
||||||
|
if resume_read_req.resume_from.is_some() {
|
||||||
|
// More chunks are pending, setup resume_read_req again
|
||||||
|
self.resume_read_req = Some(resume_read_req);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok((OpCode::ReportData, ResponseRequired::Yes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We are here implies that the read is now complete
|
||||||
|
self.confirm_subscription(trans, tw)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn confirm_subscription(
|
||||||
|
&mut self,
|
||||||
|
trans: &mut Transaction,
|
||||||
|
tw: &mut TLVWriter,
|
||||||
|
) -> Result<(OpCode, ResponseRequired), Error> {
|
||||||
|
self.state = SubsState::Confirmed;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
let resp = SubscribeResp::new(self.id, 40);
|
||||||
|
resp.to_tlv(tw, TagType::Anonymous)?;
|
||||||
|
trans.complete();
|
||||||
|
Ok((OpCode::SubscriptResponse, ResponseRequired::Yes))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn do_read(
|
||||||
|
&mut self,
|
||||||
|
req: &SubscribeReq,
|
||||||
|
trans: &mut Transaction,
|
||||||
|
tw: &mut TLVWriter,
|
||||||
|
dm: &DataModel,
|
||||||
|
resume_from: &mut Option<GenericPath>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let read_req = req.to_read_req();
|
||||||
|
tw.start_struct(TagType::Anonymous)?;
|
||||||
|
tw.u32(
|
||||||
|
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
|
||||||
|
self.id,
|
||||||
|
)?;
|
||||||
|
dm.handle_read_attr_array(&read_req, trans, tw, resume_from)?;
|
||||||
|
tw.end_container()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -46,11 +46,28 @@ pub enum GlobalElements {
|
||||||
|
|
||||||
// TODO: What if we instead of creating this, we just pass the AttrData/AttrPath to the read/write
|
// TODO: What if we instead of creating this, we just pass the AttrData/AttrPath to the read/write
|
||||||
// methods?
|
// methods?
|
||||||
|
/// The Attribute Details structure records the details about the attribute under consideration.
|
||||||
|
/// Typically this structure is progressively built as we proceed through the request processing.
|
||||||
pub struct AttrDetails {
|
pub struct AttrDetails {
|
||||||
pub attr_id: u16,
|
/// Fabric Filtering Activated
|
||||||
pub list_index: Option<Nullable<u16>>,
|
|
||||||
pub fab_idx: u8,
|
|
||||||
pub fab_filter: bool,
|
pub fab_filter: bool,
|
||||||
|
/// The current Fabric Index
|
||||||
|
pub fab_idx: u8,
|
||||||
|
/// List Index, if any
|
||||||
|
pub list_index: Option<Nullable<u16>>,
|
||||||
|
/// The actual attribute ID
|
||||||
|
pub attr_id: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AttrDetails {
|
||||||
|
pub fn new(fab_idx: u8, fab_filter: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
fab_filter,
|
||||||
|
fab_idx,
|
||||||
|
list_index: None,
|
||||||
|
attr_id: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ClusterType {
|
pub trait ClusterType {
|
||||||
|
|
|
@ -195,7 +195,7 @@ mod tests {
|
||||||
use crate::{
|
use crate::{
|
||||||
acl::{AclEntry, AclMgr, AuthMode},
|
acl::{AclEntry, AclMgr, AuthMode},
|
||||||
data_model::{
|
data_model::{
|
||||||
core::AttrReadEncoder,
|
core::read::AttrReadEncoder,
|
||||||
objects::{AttrDetails, ClusterType, Privilege},
|
objects::{AttrDetails, ClusterType, Privilege},
|
||||||
},
|
},
|
||||||
interaction_model::messages::ib::ListOperation,
|
interaction_model::messages::ib::ListOperation,
|
||||||
|
|
|
@ -100,24 +100,30 @@ impl InteractionModel {
|
||||||
InteractionModel { consumer }
|
InteractionModel { consumer }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn handle_subscribe_req(
|
||||||
|
&mut self,
|
||||||
|
trans: &mut Transaction,
|
||||||
|
rx_buf: &[u8],
|
||||||
|
proto_tx: &mut Packet,
|
||||||
|
) -> Result<ResponseRequired, Error> {
|
||||||
|
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
|
||||||
|
let (opcode, resp) = self.consumer.consume_subscribe(rx_buf, trans, &mut tw)?;
|
||||||
|
proto_tx.set_proto_opcode(opcode as u8);
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn handle_status_resp(
|
pub fn handle_status_resp(
|
||||||
&mut self,
|
&mut self,
|
||||||
trans: &mut Transaction,
|
trans: &mut Transaction,
|
||||||
rx_buf: &[u8],
|
rx_buf: &[u8],
|
||||||
proto_tx: &mut Packet,
|
proto_tx: &mut Packet,
|
||||||
) -> Result<ResponseRequired, Error> {
|
) -> Result<ResponseRequired, Error> {
|
||||||
|
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
|
||||||
let root = get_root_node_struct(rx_buf)?;
|
let root = get_root_node_struct(rx_buf)?;
|
||||||
let req = StatusResp::from_tlv(&root)?;
|
let req = StatusResp::from_tlv(&root)?;
|
||||||
|
let (opcode, resp) = self.consumer.consume_status_report(&req, trans, &mut tw)?;
|
||||||
let mut handled = false;
|
proto_tx.set_proto_opcode(opcode as u8);
|
||||||
let result = self.handle_subscription_confirm(trans, proto_tx, &mut handled);
|
Ok(resp)
|
||||||
if handled {
|
|
||||||
result
|
|
||||||
} else {
|
|
||||||
// Nothing to do for now
|
|
||||||
info!("Received status report with status {:?}", req.status);
|
|
||||||
Ok(ResponseRequired::No)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_timed_req(
|
pub fn handle_timed_req(
|
||||||
|
|
|
@ -223,7 +223,7 @@ pub mod msg {
|
||||||
SubscriptionId = 0,
|
SubscriptionId = 0,
|
||||||
AttributeReports = 1,
|
AttributeReports = 1,
|
||||||
_EventReport = 2,
|
_EventReport = 2,
|
||||||
_MoreChunkedMsgs = 3,
|
MoreChunkedMsgs = 3,
|
||||||
SupressResponse = 4,
|
SupressResponse = 4,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,10 +18,13 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
tlv::TLVWriter,
|
tlv::TLVWriter,
|
||||||
transport::{exchange::Exchange, session::Session},
|
transport::{exchange::Exchange, proto_demux::ResponseRequired, session::Session},
|
||||||
};
|
};
|
||||||
|
|
||||||
use self::messages::msg::{InvReq, ReadReq, WriteReq};
|
use self::{
|
||||||
|
core::OpCode,
|
||||||
|
messages::msg::{InvReq, StatusResp, WriteReq},
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq)]
|
||||||
pub enum TransactionState {
|
pub enum TransactionState {
|
||||||
|
@ -44,7 +47,9 @@ pub trait InteractionConsumer {
|
||||||
|
|
||||||
fn consume_read_attr(
|
fn consume_read_attr(
|
||||||
&self,
|
&self,
|
||||||
req: &ReadReq,
|
// TODO: This handling is different from the other APIs here, identify
|
||||||
|
// consistent options for this trait
|
||||||
|
req: &[u8],
|
||||||
trans: &mut Transaction,
|
trans: &mut Transaction,
|
||||||
tw: &mut TLVWriter,
|
tw: &mut TLVWriter,
|
||||||
) -> Result<(), Error>;
|
) -> Result<(), Error>;
|
||||||
|
@ -55,6 +60,20 @@ pub trait InteractionConsumer {
|
||||||
trans: &mut Transaction,
|
trans: &mut Transaction,
|
||||||
tw: &mut TLVWriter,
|
tw: &mut TLVWriter,
|
||||||
) -> Result<(), Error>;
|
) -> Result<(), Error>;
|
||||||
|
|
||||||
|
fn consume_status_report(
|
||||||
|
&self,
|
||||||
|
_req: &StatusResp,
|
||||||
|
_trans: &mut Transaction,
|
||||||
|
_tw: &mut TLVWriter,
|
||||||
|
) -> Result<(OpCode, ResponseRequired), Error>;
|
||||||
|
|
||||||
|
fn consume_subscribe(
|
||||||
|
&self,
|
||||||
|
_req: &[u8],
|
||||||
|
_trans: &mut Transaction,
|
||||||
|
_tw: &mut TLVWriter,
|
||||||
|
) -> Result<(OpCode, ResponseRequired), Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct InteractionModel {
|
pub struct InteractionModel {
|
||||||
|
@ -64,5 +83,4 @@ pub mod command;
|
||||||
pub mod core;
|
pub mod core;
|
||||||
pub mod messages;
|
pub mod messages;
|
||||||
pub mod read;
|
pub mod read;
|
||||||
pub mod subscribe;
|
|
||||||
pub mod write;
|
pub mod write;
|
||||||
|
|
|
@ -18,14 +18,11 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
interaction_model::core::OpCode,
|
interaction_model::core::OpCode,
|
||||||
tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType},
|
tlv::TLVWriter,
|
||||||
transport::{packet::Packet, proto_demux::ResponseRequired},
|
transport::{packet::Packet, proto_demux::ResponseRequired},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{InteractionModel, Transaction};
|
||||||
messages::msg::{self, ReadReq},
|
|
||||||
InteractionModel, Transaction,
|
|
||||||
};
|
|
||||||
|
|
||||||
impl InteractionModel {
|
impl InteractionModel {
|
||||||
pub fn handle_read_req(
|
pub fn handle_read_req(
|
||||||
|
@ -35,21 +32,11 @@ impl InteractionModel {
|
||||||
proto_tx: &mut Packet,
|
proto_tx: &mut Packet,
|
||||||
) -> Result<ResponseRequired, Error> {
|
) -> Result<ResponseRequired, Error> {
|
||||||
proto_tx.set_proto_opcode(OpCode::ReportData as u8);
|
proto_tx.set_proto_opcode(OpCode::ReportData as u8);
|
||||||
|
let proto_tx_wb = proto_tx.get_writebuf()?;
|
||||||
|
let mut tw = TLVWriter::new(proto_tx_wb);
|
||||||
|
|
||||||
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
|
self.consumer.consume_read_attr(rx_buf, trans, &mut tw)?;
|
||||||
let root = get_root_node_struct(rx_buf)?;
|
|
||||||
let read_req = ReadReq::from_tlv(&root)?;
|
|
||||||
|
|
||||||
tw.start_struct(TagType::Anonymous)?;
|
|
||||||
self.consumer.consume_read_attr(&read_req, trans, &mut tw)?;
|
|
||||||
// Supress response always true for read interaction
|
|
||||||
tw.bool(
|
|
||||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
|
||||||
true,
|
|
||||||
)?;
|
|
||||||
tw.end_container()?;
|
|
||||||
|
|
||||||
trans.complete();
|
|
||||||
Ok(ResponseRequired::Yes)
|
Ok(ResponseRequired::Yes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,115 +0,0 @@
|
||||||
/*
|
|
||||||
*
|
|
||||||
* Copyright (c) 2023 Project CHIP Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicU32, Ordering};
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
error::Error,
|
|
||||||
interaction_model::core::OpCode,
|
|
||||||
tlv::{get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV},
|
|
||||||
transport::{packet::Packet, proto_demux::ResponseRequired},
|
|
||||||
};
|
|
||||||
|
|
||||||
use log::error;
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
messages::msg::{self, SubscribeReq, SubscribeResp},
|
|
||||||
InteractionModel, Transaction,
|
|
||||||
};
|
|
||||||
|
|
||||||
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
|
|
||||||
|
|
||||||
impl InteractionModel {
|
|
||||||
pub fn handle_subscribe_req(
|
|
||||||
&mut self,
|
|
||||||
trans: &mut Transaction,
|
|
||||||
rx_buf: &[u8],
|
|
||||||
proto_tx: &mut Packet,
|
|
||||||
) -> Result<ResponseRequired, Error> {
|
|
||||||
proto_tx.set_proto_opcode(OpCode::ReportData as u8);
|
|
||||||
|
|
||||||
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
|
|
||||||
let root = get_root_node_struct(rx_buf)?;
|
|
||||||
let req = SubscribeReq::from_tlv(&root)?;
|
|
||||||
|
|
||||||
let ctx = Box::new(SubsCtx {
|
|
||||||
state: SubsState::Confirming,
|
|
||||||
// TODO
|
|
||||||
id: SUBS_ID.fetch_add(1, Ordering::SeqCst),
|
|
||||||
});
|
|
||||||
|
|
||||||
let read_req = req.to_read_req();
|
|
||||||
tw.start_struct(TagType::Anonymous)?;
|
|
||||||
tw.u32(
|
|
||||||
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
|
|
||||||
ctx.id,
|
|
||||||
)?;
|
|
||||||
self.consumer.consume_read_attr(&read_req, trans, &mut tw)?;
|
|
||||||
tw.bool(
|
|
||||||
TagType::Context(msg::ReportDataTag::SupressResponse as u8),
|
|
||||||
false,
|
|
||||||
)?;
|
|
||||||
tw.end_container()?;
|
|
||||||
|
|
||||||
if !trans.exch.is_data_none() {
|
|
||||||
error!("Exchange data already set!");
|
|
||||||
return Err(Error::InvalidState);
|
|
||||||
}
|
|
||||||
trans.exch.set_data_boxed(ctx);
|
|
||||||
|
|
||||||
Ok(ResponseRequired::Yes)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn handle_subscription_confirm(
|
|
||||||
&mut self,
|
|
||||||
trans: &mut Transaction,
|
|
||||||
proto_tx: &mut Packet,
|
|
||||||
request_handled: &mut bool,
|
|
||||||
) -> Result<ResponseRequired, Error> {
|
|
||||||
*request_handled = false;
|
|
||||||
if let Some(ctx) = trans.exch.get_data_boxed::<SubsCtx>() {
|
|
||||||
if ctx.state != SubsState::Confirming {
|
|
||||||
// Not relevant for us
|
|
||||||
return Err(Error::Invalid);
|
|
||||||
}
|
|
||||||
*request_handled = true;
|
|
||||||
ctx.state = SubsState::Confirmed;
|
|
||||||
proto_tx.set_proto_opcode(OpCode::SubscriptResponse as u8);
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
let resp = SubscribeResp::new(ctx.id, 40);
|
|
||||||
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
|
|
||||||
resp.to_tlv(&mut tw, TagType::Anonymous)?;
|
|
||||||
trans.complete();
|
|
||||||
Ok(ResponseRequired::Yes)
|
|
||||||
} else {
|
|
||||||
trans.complete();
|
|
||||||
Err(Error::Invalid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
|
||||||
enum SubsState {
|
|
||||||
Confirming,
|
|
||||||
Confirmed,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SubsCtx {
|
|
||||||
state: SubsState,
|
|
||||||
id: u32,
|
|
||||||
}
|
|
|
@ -115,6 +115,12 @@ impl PaseMgr {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for PaseMgr {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// This file basically deals with the handlers for the PASE secure channel protocol
|
// This file basically deals with the handlers for the PASE secure channel protocol
|
||||||
// TLV extraction and encoding is done in this file.
|
// TLV extraction and encoding is done in this file.
|
||||||
// We create a Spake2p object and set it up in the exchange-data. This object then
|
// We create a Spake2p object and set it up in the exchange-data. This object then
|
||||||
|
|
|
@ -383,7 +383,7 @@ impl<'a, T: ToTLV> ToTLV for TLVArray<'a, T> {
|
||||||
}
|
}
|
||||||
tw.end_container()
|
tw.end_container()
|
||||||
}
|
}
|
||||||
Self::Ptr(_) => Err(Error::Invalid),
|
Self::Ptr(t) => t.to_tlv(tw, tag_type),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -404,6 +404,31 @@ impl<'a, T: Debug + ToTLV + FromTLV<'a> + Copy> Debug for TLVArray<'a, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'a> ToTLV for TLVElement<'a> {
|
||||||
|
fn to_tlv(&self, tw: &mut TLVWriter, _tag_type: TagType) -> Result<(), Error> {
|
||||||
|
match self.get_element_type() {
|
||||||
|
ElementType::S8(v) => v.to_tlv(tw, self.get_tag()),
|
||||||
|
ElementType::U8(v) => v.to_tlv(tw, self.get_tag()),
|
||||||
|
ElementType::U16(v) => v.to_tlv(tw, self.get_tag()),
|
||||||
|
ElementType::U32(v) => v.to_tlv(tw, self.get_tag()),
|
||||||
|
ElementType::U64(v) => v.to_tlv(tw, self.get_tag()),
|
||||||
|
ElementType::False => tw.bool(self.get_tag(), false),
|
||||||
|
ElementType::True => tw.bool(self.get_tag(), true),
|
||||||
|
ElementType::Utf8l(v) | ElementType::Utf16l(v) => tw.utf16(self.get_tag(), v),
|
||||||
|
ElementType::Str8l(v) | ElementType::Str16l(v) => tw.str16(self.get_tag(), v),
|
||||||
|
ElementType::Null => tw.null(self.get_tag()),
|
||||||
|
ElementType::Struct(_) => tw.start_struct(self.get_tag()),
|
||||||
|
ElementType::Array(_) => tw.start_array(self.get_tag()),
|
||||||
|
ElementType::List(_) => tw.start_list(self.get_tag()),
|
||||||
|
ElementType::EndCnt => tw.end_container(),
|
||||||
|
_ => {
|
||||||
|
error!("ToTLV Not supported");
|
||||||
|
Err(Error::Invalid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::{FromTLV, OctetStr, TLVElement, TLVWriter, TagType, ToTLV};
|
use super::{FromTLV, OctetStr, TLVElement, TLVWriter, TagType, ToTLV};
|
||||||
|
|
|
@ -264,6 +264,10 @@ impl<'a, 'b> TLVWriter<'a, 'b> {
|
||||||
pub fn rewind_to(&mut self, anchor: usize) {
|
pub fn rewind_to(&mut self, anchor: usize) {
|
||||||
self.buf.rewind_tail_to(anchor);
|
self.buf.rewind_tail_to(anchor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_buf<'c>(&'c mut self) -> &'c mut WriteBuf<'a> {
|
||||||
|
self.buf
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -24,7 +24,7 @@ use super::packet::PacketPool;
|
||||||
|
|
||||||
const MAX_PROTOCOLS: usize = 4;
|
const MAX_PROTOCOLS: usize = 4;
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq, Debug)]
|
||||||
pub enum ResponseRequired {
|
pub enum ResponseRequired {
|
||||||
Yes,
|
Yes,
|
||||||
No,
|
No,
|
||||||
|
|
|
@ -18,6 +18,33 @@
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use byteorder::{ByteOrder, LittleEndian};
|
use byteorder::{ByteOrder, LittleEndian};
|
||||||
|
|
||||||
|
/// Shrink WriteBuf
|
||||||
|
///
|
||||||
|
/// This Macro creates a new (child) WriteBuf which has a truncated slice end.
|
||||||
|
/// - It accepts a WriteBuf, and the size to reserve (truncate) towards the end.
|
||||||
|
/// - It returns the new (child) WriteBuf
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! wb_shrink {
|
||||||
|
($orig_wb:ident, $reserve:ident) => {{
|
||||||
|
let m_data = $orig_wb.empty_as_mut_slice();
|
||||||
|
let m_wb = WriteBuf::new(m_data, m_data.len() - $reserve);
|
||||||
|
(m_wb)
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unshrink WriteBuf
|
||||||
|
///
|
||||||
|
/// This macro unshrinks the WriteBuf
|
||||||
|
/// - It accepts the original WriteBuf and the child WriteBuf (that was the result of wb_shrink)
|
||||||
|
/// After this call, the child WriteBuf shouldn't be used
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! wb_unshrink {
|
||||||
|
($orig_wb:ident, $new_wb:ident) => {{
|
||||||
|
let m_data_len = $new_wb.as_slice().len();
|
||||||
|
$orig_wb.forward_tail_by(m_data_len);
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct WriteBuf<'a> {
|
pub struct WriteBuf<'a> {
|
||||||
buf: &'a mut [u8],
|
buf: &'a mut [u8],
|
||||||
|
|
|
@ -19,7 +19,6 @@ use boxslab::Slab;
|
||||||
use matter::error::Error;
|
use matter::error::Error;
|
||||||
use matter::interaction_model::core::OpCode;
|
use matter::interaction_model::core::OpCode;
|
||||||
use matter::interaction_model::messages::msg::InvReq;
|
use matter::interaction_model::messages::msg::InvReq;
|
||||||
use matter::interaction_model::messages::msg::ReadReq;
|
|
||||||
use matter::interaction_model::messages::msg::WriteReq;
|
use matter::interaction_model::messages::msg::WriteReq;
|
||||||
use matter::interaction_model::InteractionConsumer;
|
use matter::interaction_model::InteractionConsumer;
|
||||||
use matter::interaction_model::InteractionModel;
|
use matter::interaction_model::InteractionModel;
|
||||||
|
@ -32,6 +31,7 @@ use matter::transport::packet::Packet;
|
||||||
use matter::transport::packet::PacketPool;
|
use matter::transport::packet::PacketPool;
|
||||||
use matter::transport::proto_demux::HandleProto;
|
use matter::transport::proto_demux::HandleProto;
|
||||||
use matter::transport::proto_demux::ProtoCtx;
|
use matter::transport::proto_demux::ProtoCtx;
|
||||||
|
use matter::transport::proto_demux::ResponseRequired;
|
||||||
use matter::transport::session::SessionMgr;
|
use matter::transport::session::SessionMgr;
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
@ -93,7 +93,7 @@ impl InteractionConsumer for DataModel {
|
||||||
|
|
||||||
fn consume_read_attr(
|
fn consume_read_attr(
|
||||||
&self,
|
&self,
|
||||||
_req: &ReadReq,
|
_req: &[u8],
|
||||||
_trans: &mut Transaction,
|
_trans: &mut Transaction,
|
||||||
_tlvwriter: &mut TLVWriter,
|
_tlvwriter: &mut TLVWriter,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
@ -108,6 +108,24 @@ impl InteractionConsumer for DataModel {
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn consume_status_report(
|
||||||
|
&self,
|
||||||
|
_req: &matter::interaction_model::messages::msg::StatusResp,
|
||||||
|
_trans: &mut Transaction,
|
||||||
|
_tw: &mut TLVWriter,
|
||||||
|
) -> Result<(OpCode, ResponseRequired), Error> {
|
||||||
|
Ok((OpCode::StatusResponse, ResponseRequired::No))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume_subscribe(
|
||||||
|
&self,
|
||||||
|
_req: &[u8],
|
||||||
|
_trans: &mut Transaction,
|
||||||
|
_tw: &mut TLVWriter,
|
||||||
|
) -> Result<(OpCode, matter::transport::proto_demux::ResponseRequired), Error> {
|
||||||
|
Ok((OpCode::StatusResponse, ResponseRequired::No))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_data(action: OpCode, data_in: &[u8], data_out: &mut [u8]) -> (DataModel, usize) {
|
fn handle_data(action: OpCode, data_in: &[u8], data_out: &mut [u8]) -> (DataModel, usize) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue