Baseline timed request handling

This commit is contained in:
Kedar Sovani 2023-01-03 15:25:44 +05:30
parent ed5949253b
commit d67e134e26
6 changed files with 163 additions and 18 deletions

View file

@ -15,6 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
use super::core::IMStatusCode;
use super::core::OpCode; use super::core::OpCode;
use super::messages::ib; use super::messages::ib;
use super::messages::msg; use super::messages::msg;
@ -50,12 +51,23 @@ impl InteractionModel {
rx_buf: &[u8], rx_buf: &[u8],
proto_tx: &mut Packet, proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> { ) -> Result<ResponseRequired, Error> {
proto_tx.set_proto_opcode(OpCode::InvokeResponse as u8); if InteractionModel::req_timeout_handled(trans, proto_tx)? == true {
return Ok(ResponseRequired::Yes);
}
proto_tx.set_proto_opcode(OpCode::InvokeResponse as u8);
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let root = get_root_node_struct(rx_buf)?; let root = get_root_node_struct(rx_buf)?;
let inv_req = InvReq::from_tlv(&root)?; let inv_req = InvReq::from_tlv(&root)?;
let timed_tx = trans.exch.get_expiry_ts().map(|_| true);
let timed_request = inv_req.timed_request.filter(|a| *a == true);
// Either both should be None, or both should be Some(true)
if timed_tx != timed_request {
InteractionModel::create_status_response(proto_tx, IMStatusCode::TimedRequestMisMatch)?;
return Ok(ResponseRequired::Yes);
}
tw.start_struct(TagType::Anonymous)?; tw.start_struct(TagType::Anonymous)?;
// Suppress Response -> TODO: Need to revisit this for cases where we send a command back // Suppress Response -> TODO: Need to revisit this for cases where we send a command back
tw.bool( tw.bool(

View file

@ -15,10 +15,15 @@
* limitations under the License. * limitations under the License.
*/ */
use std::time::{Duration, SystemTime};
use crate::{ use crate::{
error::*, error::*,
tlv::{self, FromTLV, TLVElement, TLVWriter, TagType, ToTLV}, interaction_model::messages::msg::StatusResp,
tlv::{self, get_root_node_struct, FromTLV, TLVElement, TLVWriter, TagType, ToTLV},
transport::{ transport::{
exchange::Exchange,
packet::Packet,
proto_demux::{self, ProtoCtx, ResponseRequired}, proto_demux::{self, ProtoCtx, ResponseRequired},
session::Session, session::Session,
}, },
@ -28,10 +33,10 @@ use log::{error, info};
use num; use num;
use num_derive::FromPrimitive; use num_derive::FromPrimitive;
use super::InteractionConsumer;
use super::InteractionModel; use super::InteractionModel;
use super::Transaction; use super::Transaction;
use super::TransactionState; use super::TransactionState;
use super::{messages::msg::TimedReq, InteractionConsumer};
/* Handle messages related to the Interation Model /* Handle messages related to the Interation Model
*/ */
@ -55,11 +60,11 @@ pub enum OpCode {
} }
impl<'a> Transaction<'a> { impl<'a> Transaction<'a> {
pub fn new(session: &'a mut Session) -> Self { pub fn new(session: &'a mut Session, exch: &'a mut Exchange) -> Self {
Self { Self {
state: TransactionState::Ongoing, state: TransactionState::Ongoing,
data: None,
session, session,
exch,
} }
} }
@ -70,17 +75,77 @@ impl<'a> Transaction<'a> {
pub fn is_complete(&self) -> bool { pub fn is_complete(&self) -> bool {
self.state == TransactionState::Complete self.state == TransactionState::Complete
} }
pub fn set_timeout(&mut self, timeout: u64) {
self.exch
.set_expiry_ts(SystemTime::now().checked_add(Duration::from_millis(timeout)));
}
pub fn has_timed_out(&self) -> bool {
if let Some(timeout) = self.exch.get_expiry_ts() {
if SystemTime::now() > timeout {
return true;
}
}
false
}
} }
impl InteractionModel { impl InteractionModel {
pub fn new(consumer: Box<dyn InteractionConsumer>) -> InteractionModel { pub fn new(consumer: Box<dyn InteractionConsumer>) -> InteractionModel {
InteractionModel { consumer } InteractionModel { consumer }
} }
pub fn handle_timed_req(
&mut self,
trans: &mut Transaction,
rx_buf: &[u8],
proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> {
proto_tx.set_proto_opcode(OpCode::StatusResponse as u8);
let root = get_root_node_struct(rx_buf)?;
let req = TimedReq::from_tlv(&root)?;
trans.set_timeout(req.timeout.into());
let status = StatusResp {
status: IMStatusCode::Sucess,
};
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let _ = status.to_tlv(&mut tw, TagType::Anonymous);
Ok(ResponseRequired::Yes)
}
/// Handle Request Timeouts
/// This API checks if a request was a timed request, and if so, and if the timeout has
/// expired, it will generate the appropriate response as expected
pub(super) fn req_timeout_handled(
trans: &mut Transaction,
proto_tx: &mut Packet,
) -> Result<bool, Error> {
if trans.has_timed_out() {
trans.complete();
InteractionModel::create_status_response(proto_tx, IMStatusCode::Timeout)?;
Ok(true)
} else {
Ok(false)
}
}
pub(super) fn create_status_response(
proto_tx: &mut Packet,
status: IMStatusCode,
) -> Result<(), Error> {
proto_tx.set_proto_opcode(OpCode::StatusResponse as u8);
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
let status = StatusResp { status };
status.to_tlv(&mut tw, TagType::Anonymous)
}
} }
impl proto_demux::HandleProto for InteractionModel { impl proto_demux::HandleProto for InteractionModel {
fn handle_proto_id(&mut self, ctx: &mut ProtoCtx) -> Result<ResponseRequired, Error> { fn handle_proto_id(&mut self, ctx: &mut ProtoCtx) -> Result<ResponseRequired, Error> {
let mut trans = Transaction::new(&mut ctx.exch_ctx.sess); let mut trans = Transaction::new(&mut ctx.exch_ctx.sess, ctx.exch_ctx.exch);
let proto_opcode: OpCode = let proto_opcode: OpCode =
num::FromPrimitive::from_u8(ctx.rx.get_proto_opcode()).ok_or(Error::Invalid)?; num::FromPrimitive::from_u8(ctx.rx.get_proto_opcode()).ok_or(Error::Invalid)?;
ctx.tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16); ctx.tx.set_proto_id(PROTO_ID_INTERACTION_MODEL as u16);
@ -92,6 +157,7 @@ impl proto_demux::HandleProto for InteractionModel {
OpCode::InvokeRequest => self.handle_invoke_req(&mut trans, buf, &mut ctx.tx)?, OpCode::InvokeRequest => self.handle_invoke_req(&mut trans, buf, &mut ctx.tx)?,
OpCode::ReadRequest => self.handle_read_req(&mut trans, buf, &mut ctx.tx)?, OpCode::ReadRequest => self.handle_read_req(&mut trans, buf, &mut ctx.tx)?,
OpCode::WriteRequest => self.handle_write_req(&mut trans, buf, &mut ctx.tx)?, OpCode::WriteRequest => self.handle_write_req(&mut trans, buf, &mut ctx.tx)?,
OpCode::TimedRequest => self.handle_timed_req(&mut trans, buf, &mut ctx.tx)?,
_ => { _ => {
error!("Opcode Not Handled: {:?}", proto_opcode); error!("Opcode Not Handled: {:?}", proto_opcode);
return Err(Error::InvalidOpcode); return Err(Error::InvalidOpcode);
@ -137,6 +203,10 @@ pub enum IMStatusCode {
UnsupportedCluster = 0xc3, UnsupportedCluster = 0xc3,
NoUpstreamSubscription = 0xc5, NoUpstreamSubscription = 0xc5,
NeedsTimedInteraction = 0xc6, NeedsTimedInteraction = 0xc6,
UnsupportedEvent = 0xc7,
PathsExhausted = 0xc8,
TimedRequestMisMatch = 0xc9,
FailSafeRequired = 0xca,
} }
impl From<Error> for IMStatusCode { impl From<Error> for IMStatusCode {

View file

@ -67,11 +67,22 @@ pub mod msg {
use crate::{ use crate::{
error::Error, error::Error,
interaction_model::core::IMStatusCode,
tlv::{FromTLV, TLVArray, TLVElement, TLVWriter, TagType, ToTLV}, tlv::{FromTLV, TLVArray, TLVElement, TLVWriter, TagType, ToTLV},
}; };
use super::ib::{AttrData, AttrPath, AttrResp, CmdData, DataVersionFilter}; use super::ib::{AttrData, AttrPath, AttrResp, CmdData, DataVersionFilter};
#[derive(FromTLV)]
pub struct TimedReq {
pub timeout: u16,
}
#[derive(ToTLV)]
pub struct StatusResp {
pub status: IMStatusCode,
}
#[derive(FromTLV)] #[derive(FromTLV)]
#[tlvargs(lifetime = "'a")] #[tlvargs(lifetime = "'a")]
pub struct InvReq<'a> { pub struct InvReq<'a> {

View file

@ -15,9 +15,11 @@
* limitations under the License. * limitations under the License.
*/ */
use std::any::Any; use crate::{
error::Error,
use crate::{error::Error, tlv::TLVWriter, transport::session::Session}; tlv::TLVWriter,
transport::{exchange::Exchange, session::Session},
};
use self::messages::msg::{InvReq, ReadReq, WriteReq}; use self::messages::msg::{InvReq, ReadReq, WriteReq};
@ -28,8 +30,8 @@ pub enum TransactionState {
} }
pub struct Transaction<'a> { pub struct Transaction<'a> {
pub state: TransactionState, pub state: TransactionState,
pub data: Option<Box<dyn Any>>,
pub session: &'a mut Session, pub session: &'a mut Session,
pub exch: &'a mut Exchange,
} }
pub trait InteractionConsumer { pub trait InteractionConsumer {

View file

@ -32,6 +32,9 @@ impl InteractionModel {
rx_buf: &[u8], rx_buf: &[u8],
proto_tx: &mut Packet, proto_tx: &mut Packet,
) -> Result<ResponseRequired, Error> { ) -> Result<ResponseRequired, Error> {
if InteractionModel::req_timeout_handled(trans, proto_tx)? == true {
return Ok(ResponseRequired::Yes);
}
proto_tx.set_proto_opcode(OpCode::WriteResponse as u8); proto_tx.set_proto_opcode(OpCode::WriteResponse as u8);
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?); let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);

View file

@ -20,6 +20,7 @@ use colored::*;
use log::{error, info, trace}; use log::{error, info, trace};
use std::any::Any; use std::any::Any;
use std::fmt; use std::fmt;
use std::time::SystemTime;
use crate::error::Error; use crate::error::Error;
use crate::secure_channel; use crate::secure_channel;
@ -59,17 +60,32 @@ impl Default for State {
} }
} }
// Instead of just doing an Option<>, we create some special handling
// where the commonly used higher layer data store does't have to do a Box
#[derive(Debug)]
pub enum DataOption {
Boxed(Box<dyn Any>),
Time(SystemTime),
None,
}
impl Default for DataOption {
fn default() -> Self {
DataOption::None
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Exchange { pub struct Exchange {
id: u16, id: u16,
sess_idx: usize, sess_idx: usize,
role: Role, role: Role,
state: State, state: State,
mrp: ReliableMessage,
// Currently I see this primarily used in PASE and CASE. If that is the limited use // Currently I see this primarily used in PASE and CASE. If that is the limited use
// of this, we might move this into a separate data structure, so as not to burden // of this, we might move this into a separate data structure, so as not to burden
// all 'exchanges'. // all 'exchanges'.
data: Option<Box<dyn Any>>, data: DataOption,
mrp: ReliableMessage,
} }
impl Exchange { impl Exchange {
@ -79,13 +95,13 @@ impl Exchange {
sess_idx, sess_idx,
role, role,
state: State::Open, state: State::Open,
data: None,
mrp: ReliableMessage::new(), mrp: ReliableMessage::new(),
..Default::default()
} }
} }
pub fn close(&mut self) { pub fn close(&mut self) {
self.data = None; self.data = DataOption::None;
self.state = State::Close; self.state = State::Close;
} }
@ -106,20 +122,51 @@ impl Exchange {
self.role self.role
} }
pub fn is_data_none(&self) -> bool {
match self.data {
DataOption::None => true,
_ => false,
}
}
pub fn set_exchange_data(&mut self, data: Box<dyn Any>) { pub fn set_exchange_data(&mut self, data: Box<dyn Any>) {
self.data = Some(data); self.data = DataOption::Boxed(data);
} }
pub fn clear_exchange_data(&mut self) { pub fn clear_exchange_data(&mut self) {
self.data = None; self.data = DataOption::None;
} }
pub fn get_exchange_data<T: Any>(&mut self) -> Option<&mut T> { pub fn get_exchange_data<T: Any>(&mut self) -> Option<&mut T> {
self.data.as_mut()?.downcast_mut::<T>() if let DataOption::Boxed(a) = &mut self.data {
a.downcast_mut::<T>()
} else {
None
}
} }
pub fn take_exchange_data<T: Any>(&mut self) -> Option<Box<T>> { pub fn take_exchange_data<T: Any>(&mut self) -> Option<Box<T>> {
self.data.take()?.downcast::<T>().ok() let old = std::mem::replace(&mut self.data, DataOption::None);
match old {
DataOption::Boxed(d) => d.downcast::<T>().ok(),
_ => {
self.data = old;
None
}
}
}
pub fn set_expiry_ts(&mut self, expiry_ts: Option<SystemTime>) {
if let Some(t) = expiry_ts {
self.data = DataOption::Time(t);
}
}
pub fn get_expiry_ts(&self) -> Option<SystemTime> {
match self.data {
DataOption::Time(t) => Some(t),
_ => None,
}
} }
fn send( fn send(