Asynchronous primitives#
The asynchronous primitives like Promise
and Stream
are the building blocks of ICEY.
-
template<class _Value, class _Error>
struct Result : private std::variant<std::monostate, _Value, _Error>, public icey::ResultTag# A Result-type is a sum type that can either hold Value or Error, or, different to Rust, none. It is used as the state for the Stream. Note that this Result type is API-compatible with
std::expected
(C++23), so a change is easily possible once we target C++23.
-
struct Nothing#
A special type that indicates that there is no value. (Using
void
for this would cause many problems, so defining an extra struct is easier.)
-
template<class _Value, class _Error = Nothing, class ImplBase = Nothing>
class Stream : public icey::StreamTag# A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript except that state transitions are not final. This is the base class for all the other streams.
Note
This class does not have any fields except a weak pointer to the actual implementation (i.e. it uses the PIMPL idiom). You should not add any fields when inheriting form this class. Instead, put the additional fields that you need in a separate struct and pass it as the
ImplBase
template parameter. These fields become available throughimpl().<my_field>
, (i.e. the Impl-class will derive from ImplBase).- Template Parameters:
_Value – the type of the value
_Error – the type of the error. It can also be an exception.
ImplBase – a class from which the implementation (impl::Stream) derives, used as an extention point.
Subclassed by icey::ParameterStream< _Value >
Public Types
Public Functions
-
Stream() = default#
-
std::suspend_never initial_suspend()#
[Coroutine support]
-
std::suspend_never final_suspend() const noexcept#
[Coroutine support]
-
auto return_value()#
[Coroutine support] return_value returns the value of the Steam.
-
void unhandled_exception()#
[Coroutine support] Store the unhandled exception in case it occurs: We will re-throw it when it’s time. (The compiler can’t do this for us because of reasons)
-
Awaiter<Self> operator co_await()#
[Coroutine support] Allow this stream to be awaited with
co_await stream
in C++20 coroutines.
-
void return_value(const Value &x)#
[Coroutine support] Implementation of the operator co_return(x): It sets the value of the Stream object that is about to get returned (the compiler creates it beforehand)
-
template<class F>
auto then(F &&f)# Calls the given function (synchronous or asynchronous) f every time this stream receives a value. /// It returns a new stream that receives the values that this function f returns. The returned Stream also passes though the errors of this stream so that chaining
then
s with anexcept
works.- Template Parameters:
F – Must be (X) -> Y, where X is:
V_1, …, V_n if Value is std::tuple<V_1, …, V_n>
Value otherwise
- Returns:
A new Stream that changes it’s value to y every time this stream receives a value x, where y = f(x). The type of the returned stream is:
Stream<Nothing, _Error> if F is (X) -> void
Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>
Stream<NewValue, _Error> if F is (X) -> std::optional<NewValue>
Stream<Y, _Error> otherwise
-
template<class F>
auto except(F &&f)# Calls the given function (synchronous or asynchronous) f every time this Stream receives an error. It returns a new Stream that receives the values that this function f returns.
- Template Parameters:
F – Must be (X) -> Y, where X is:
V_1, …, V_n if Value is std::tuple<V_1, …, V_n>
Value otherwise
- Returns:
A new Stream that changes it’s value to y every time this stream receives an error x, where y = f(x). The type of the returned stream is:
Stream<Nothing, Nothing> if F is (X) -> void
Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>
Stream<NewValue, Nothing> if F is (X) -> std::optional<NewValue>
Stream<Y, Nothing> otherwise
-
template<AnyStream Output>
void connect_values(Output output)# Connect this Stream to the given output stream so that the output stream receives all the values.
- Todo:
remove this, use unwrap
-
void publish(const std::string &topic_name, const rclcpp::QoS &qos = rclcpp::SystemDefaultsQoS(), const rclcpp::PublisherOptions publisher_options = {})#
Creates a publisher so that every new value of this Stream will get published.
See also
-
template<AnyStream PublisherType, class ...Args>
void publish(Args&&... args)# Creates a custom publisher so that every new value of this Stream will get published.
See also
-
void publish_transform()#
Creates a transform publisher so that every new value of this Stream, which must be of type
geometry_msgs::msg::TransformStamped
, will get published.See also
-
auto unpack()#
Unpacks an Stream holding a tuple as value to multiple Streams for each tuple element. Given that
Value
is of typestd::tuple<Value1, Value2, ..., ValueN>
, it returnsstd::tuple< Stream<Value1>, Stream<Value2>, ..., Stream<ValueN>>
-
template<class F>
Stream<Value, Nothing> unwrap_or(F &&f)# Unwraps, i.e. creates an ErrorFreeStream by handling the error with the given function f. The returned Stream will receive only the values of this stream.
- Template Parameters:
F – Function receiving the Error of this Stream (it is unpacked if it’s a tuple) and returning void.
-
template<class F>
Stream<Value, Error> filter(F f)# Outputs the Value only if the given predicate f returns true.
- Template Parameters:
F – Function receiving the Value of this Stream (it is unpacked if it’s a tuple) and returning bool
-
Buffer<Value> buffer(std::size_t N) const#
Buffers N elements. Each time N elements were accumulated, the returned Stream will yield an array of exactly N elements.
-
TimeoutFilter<Value> timeout(const Duration &max_age, bool create_extra_timer = true)#
- Parameters:
max_age – the maximum age a message is allowed to have before the timeout occurs
create_extra_timer – If set to false, the timeout will only be detected after at least one message was received. If set to true, an extra timer is created so that timeouts can be detected even if no message is received.
- Returns:
A new Stream that errors on a timeout, i.e. when this stream has not received any value for some time
max_age
.
-
TransformSynchronizer<Value> synchronize_with_transform(const std::string &target_frame, const Duration &lookup_timeout)#
Synchronizes a topic with a transform using the
tf2_ros::MessageFilter
.Example:
/// Synchronize with a transform: This will yield the message and the transform from the child_frame_id of the header message /// and the given target_frame ("map") at the time of the header stamp. It will wait up to 200ms for the transform. node->icey().create_subscription<sensor_msgs::msg::Image>("camera") .synchronize_with_transform("map", 200ms) .unwrap_or([&](std::string error) { RCLCPP_INFO_STREAM(node->get_logger(), "Transform lookup error: " << error);}) .then([](sensor_msgs::msg::Image::SharedPtr image, const geometry_msgs::msg::TransformStamped &transform_to_map) { });
- Parameters:
target_frame – the transform on which we wait is specified by source_frame and target_frame, where source_frame is the frame in the header of the message.
lookup_timeout – The maximum time to wait until the transform gets available for a message
-
template<AnyStream S, class ...Args>
S create_stream(Args&&... args) const# Creates a new stream of type S using the Context. See Context::create_stream.
Warning
doxygenclass: Cannot find class “icey::PromiseBase” in doxygen xml output for project “icey” from directory: .build/doxygenxml/
-
template<class _Value, class _Error = Nothing>
class Promise# This is the type that users writing coroutines use as the return type. It is what is returned when calling promise_type::get_return_value(). It is necessary because of the C++20 coroutines spec that apparently tries to optimize the copying of the promise inside the coroutine state to the caller. To not confuse the users with C++ coroutine spec’s intricacies, we just call this “Promise”. Note that this is not a “Task”: this term is used seemingly exclusively for the structured programming approach of lazily started coroutines. I.e. it is not a Task as implemented in in Lewis Bakers’s cppcoro library and described in https://www.open-std.org/JTC1/SC22/WG21/docs/papers/2018/p1056r1.html. We cannot use the structured programming approach because it requires a custom executor but we want to use the existing ROS executor.
Public Types
Actual implementation of the stream (impl Stream)#
-
template<class _Value, class _Error, class Base, class DefaultBase>
class Stream : public icey::impl::StreamImplBase, public Base# A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript but the state transitions are not final.
- Template Parameters:
_Value – the type of the value
_Error – the type of the error. It can also be an exception.
Base – a class from which this class derives, used as an extention point.
DefaultBase – When new
Stream
s get created usingthen
andexcept
, this is used as a template parameter forBase
so that a default extention does not get lost when we callthen
orexcept
.
Public Types
Public Functions
-
Stream() = default#
-
Stream(const Self&) = delete#
A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.
-
Stream(Self&&) = delete#
A Stream is non-movable since it has members that reference it and therefore it should change it’s address.
-
Stream &operator=(const Self&) = delete#
A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.
-
Stream &operator=(Self&&) = delete#
A Stream is non-movable since it has members that reference it and therefore it should change it’s address.
-
~Stream() = default#
-
bool has_none() const#
-
bool has_value() const#
-
bool has_error() const#
-
void register_handler(Handler &&cb)#
Register a handler (i.e. a callback) that gets called when the state changes. It receives the new state as an argument. TODO If there is any value to take, we should immediately call the handler
-
void set_none()#
Sets the state to hold none, but does not notify about this state change.
-
void set_value(const Value &x)#
Sets the state to hold a value, but does not notify about this state change.
-
void set_error(const Error &x)#
Sets the state to hold an error, but does not notify about this state change.
-
MaybeResult take()#
Returns the current state and sets it to none. If no error is possible (Error is not Nothing), it just the Value to not force the user to write unnecessary error handling/unwraping code.
-
void notify()#
It takes (calls take) the current state and notifies the callbacks. It notifies only in case we have an error or value. If the state is none, it does not notify. If the state is an error and the
Error
is an exception type (a subclass ofstd::runtime_error
) and also no handlers were registered, the exception is re-thrown. TODO We should take the value of this stream after notifying
Traits#
-
template<class T>
using icey::ErrorOf = remove_shared_ptr_t<T>::Error# The value type of the given Stream type.
C++20 coroutines support#
-
template<class S>
struct Awaiter# An awaiter required to implement the operator
co_await
for Streams. It it needed for supporting C++ coroutines.Public Functions
-
void await_suspend(std::coroutine_handle<> continuation) noexcept#
Registers the continuation (that’s the code that follows the
co_await
statement, in form of a function pointer) as a callback of the stream. This callback then get’s called by the ROS executor.
-
auto await_resume() const#
Returns the current value of the stream. If an exception occurred (but was not handled) previously, here it is re-thrown.
-
void await_suspend(std::coroutine_handle<> continuation) noexcept#
-
using icey::Clock = std::chrono::system_clock#