Async flow: Synchronizing and filtering Streams#
ICEY allows to express asynchronous data-flow very easily by applying transformations on Streams, leading to easy-to-read declarative code. In the following we look at various transformations that we can apply on Streams like synchronizing with other streams or removing unwanted values.
Publish#
You can publish the values of every Stream using .publish(<topic>, <qos>={}, <options>={})
icey_context_->create_timer(100ms)
.then([](size_t ticks) {
/// This function gets called each time the timer ticks
std_msgs::msg::Float32 msg;
msg.data = std::sin(0.1 * ticks * 2 * M_PI);
return msg;
})
.publish("sine_signal");
See also the signal generator example:
For this to work, a Stream must hold a ROS-message type.
Filter#
You can filter a Stream by calling .filter()
using a function that returns false if the message should be filtered and true if the message should be passed through:
icey_context_->create_subscription<geometry_msgs::PoseStamped>("ego_pose")
/// Filter (i.e. remove) messages that contain NaNs:
.filter([](geometry_msgs::PoseStamped::SharedPtr pose_msg) -> bool {
return !(std::isnan(pose_msg->pose.x)
||std::isnan(pose_msg->pose.y)
|| std::isnan(pose_msg->pose.z));
})
.then([](geometry_msgs::PoseStamped::SharedPtr pose_msg) {
/// Here we receive only NaN-free messages for further processing
});
Synchronization#
ICEY provides an easy way to use the synchronizers from the message_filters
package: For example, synchronizing a camera and a LiDAR point cloud is as simple as:
auto camera_image = icey_context_->create_subscription<sensor_msgs::msg::Image>("camera");
auto point_cloud = icey_context_->create_subscription<sensor_msgs::msg::PointCloud2>("point_cloud");
/// Synchronize by approximately matching the header time stamps (queue_size=100):
icey::synchronize_approx_time(100, camera_image, point_cloud)
.then([](sensor_msgs::msg::Image::SharedPtr, sensor_msgs::msg::PointCloud2::SharedPtr) {
});
See also the synchronization example
This method approximately matches the header timestamps using the message_filters::Synchronizer
with the ApproxTime
policy.
Error handling: unwrap_or
#
Streams can yield not only values but also errors.
For some transformations, it is mandatory to handle an error first. We can handle possible errors by calling unwrap_or
and providing a function that receives the potential error. This results in an error-free stream.
icey::Stream<size_t, std::string> fallible_stream = icey_context_->create_timer(100ms)
.then([](size_t ticks) -> icey::Result<size_t, std::string> {
if(ticks % 10) {
return icey::Result<size_t, std::string>::Ok(ticks);
} else {
return icey::Result<size_t, std::string>::Err("An error occurred");
}
});
icey::Stream<size_t> error_free_stream = fallible_stream
.unwrap_or([](std::string error) {
});
One example of a transformation that requires error handling is the icey::synchronize_approx_time
transformation that we used earlier.
Error handling is mandatory here because such transformations may yield new errors.
However, streams are statically typed on the error. This means a stream can only store one type of error.
Timeout#
You can check whether a message is too old (by it’s header stamp) and register a callback when a timeout occurs using .timeout(<duration>)
:
icey_context_->create_subscription<geometry_msgs::PoseStamped>("ego_pose")
/// Expect that every pose message is at most 200ms old
.timeout(100ms)
.unwrap_or([&](auto current_time, auto msg_time, auto max_age) {
auto msg_dt = (current_time - message_timestamp).seconds();
RCLCPP_ERROR_STREAM_THROTTLE(this->get_logger(), *this->get_clock(), 1000, fmt::format(
"Timeout occured, message is old: {} seconds old, maximum allowed is {} seconds",
msg_dt, max_age));
})
.then([](geometry_msgs::PoseStamped::SharedPtr pose_msg) {
/// Here we receive only NaN-free messages for further processing
});
Buffer#
You can buffer N
values of a Stream, obtaining a new Stream that yields an array of exactly N
elements using .buffer(<N>)
:
icey_context_->create_subscription<std_msgs::msg::String>("/messages")
.buffer(5)
.then([&](std::shared_ptr<std::vector<std_msgs::msg::String>> messages) {
RCLCPP_INFO_STREAM(node->get_logger(), "Received 5 messages: " << fmt::format());
});
Control flow: Multiple inputs and multiple outputs#
Single input, multiple output#
You can return multiple values from a handler, this will yield a Stream with value of type tuple.
Such tuple-Streams can be .unpack()
ed, the result is a tuple of multiple Streams:
auto [output1, output2] = icey_context_->create_subscription<Msg>("topic", 1)
.then([](Msg::SharedPtr input) {
auto output_msg1 = do_computation(input);
auto output_msg2 = do_another_computation(input);
return std::make_tuple(output_msg1, output_msg2);
}).unpack();
output1.publish("output_topic1");
output2.publish("output_topic2");