Warning
This post was published 32 days ago. The information described in this article may have changed.
Hi!
I'm building a relay controller module that listens for commands over MQTT. I'm trying to separate out the command implementation from the MQTT event loop handling. I created an interface struct for the MQTT communication that handles the async event loop, and would let me register command handlers in the form of async function. Each command is decoded into a type from bytes and the responses are encoded into bytes from task specific types as well. I used typetag to dynamically decode/encode inside the eventloop. I just can't figure out how to register async functions that take the decoded objects and return the data.
Here's a summary of what I have so far:
#[typetag::deserialize]
pub trait Request {}
#[typetag::serialize(tag = "success")]
pub trait ResponseSuccess {}
#[typetag::serialize(tag = "failure")]
pub trait ResponseFailure {}
#[derive(Serialize)]
pub enum Response {
Success(Box<dyn ResponseSuccess>),
Failure(Box<dyn ResponseFailure>),
}
type RequestHandler = Box<
dyn Fn(
Box<dyn Request>,
) -> Pin<
Box<
dyn Future<
Output = Result<
Box<dyn ResponseSuccess + Send + Sync>,
Box<dyn ResponseFailure + Send + Sync>,
>,
> + Send
+ Sync,
>,
> + Send
+ Sync,
>;
type RequestHandlerRegistry = Arc<Mutex<HashMap<String, RequestHandler>>>;
pub struct Interface {
handlers: RequestHandlerRegistry
}
impl Interface {
pub async fn add_handler(&self, topic: &str, handle: RequestHandle) -> Result<(), ClientError> {
...
Ok(())
}
pub async fn accept() -> Result<(), InterfaceError>{
let handles = self.handles.clone().lock().await;
let Some(request_handle) = handles.get(&topic) else {
return Ok(());
};
tokio::spawn(async move {
let request = rmp_serde::decode::from_slice(&packet.payload);
let response = request_handle(request).await
let response = rmp_serde::encode::to_vec(&response)
// Send response
...
}
}
I'd like to be able to do something like
#[derive(Deserialize)]
struct GetRelayStatus(u8)
#[typetag::deserialize]
impl Request for GetRelayStatus {}
async fn get_status(cmd: GetRelayStatus) -> Result<bool, RelayInterfaceError>{
// Toggle logic
...
Ok(status)
}
...
interface.add_handler("relay/set", get_status);
Of course these types are wrong. How would something like this be possible?
I already made the app work by storing channels in the handler registry that take and recieve bytes. I just don't like that I have to implement serialization and deserialization inside the tasks.
1 post - 1 participant
🏷️ rust_feed