Asynchronous primitives#
The asynchronous primitives like Promise and Stream are the building blocks of ICEY.
-
template<class T>
struct Ok#
-
template<class T>
struct Err#
-
template<class _Value, class _Error>
struct Result : public std::variant<_Value, _Error># A result-type is a sum type that holds either a value or error. I is ike Rust’s Result and like C++23’s std::expected, but for C++20.
Warning
doxygenstruct: Cannot find class “icey::Nothing” in doxygen xml output for project “icey” from directory: .build/doxygenxml/
-
template<class _Value, class _Error = Nothing, class ImplBase = Nothing>
class Stream : public icey::StreamTag# A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript except that state transitions are not final. This is the base class for all the other streams.
Note
This class does not have any fields except a weak pointer to the actual implementation (i.e. it uses the PIMPL idiom). You should not add any fields when inheriting form this class. Instead, put the additional fields that you need in a separate struct and pass it as the
ImplBasetemplate parameter. These fields become available throughimpl().<my_field>, (i.e. the Impl-class will derive from ImplBase).- Template Parameters:
_Value – the type of the value
_Error – the type of the error. It can also be an exception.
ImplBase – a class from which the implementation (impl::Stream) derives, used as an extension point.
Subclassed by icey::ParameterStream< _Value >
Public Types
Public Functions
-
Stream() = default#
-
std::suspend_never initial_suspend()#
[Coroutine support]
-
std::suspend_never final_suspend() const noexcept#
[Coroutine support]
-
auto return_value()#
[Coroutine support] return_value returns the value of the Steam.
-
void unhandled_exception()#
[Coroutine support] Store the unhandled exception in case it occurs: We will re-throw it when it’s time. (The compiler can’t do this for us because of reasons)
-
Awaiter<Self> operator co_await()#
[Coroutine support] Allow this stream to be awaited with
co_await streamin C++20 coroutines.
-
void return_value(const Value &x)#
[Coroutine support] Implementation of the operator co_return(x): It sets the value of the Stream object that is about to get returned (the compiler creates it beforehand)
-
template<class F>
auto then(F &&f)# Calls the given function (synchronous or asynchronous) f every time this stream receives a value. /// It returns a new stream that receives the values that this function f returns. The returned Stream also passes though the errors of this stream so that chaining
thens with anexceptworks.- Template Parameters:
F – Must be (X) -> Y, where X is:
V_1, …, V_n if Value is std::tuple<V_1, …, V_n>
Value otherwise
- Returns:
A new Stream that changes it’s value to y every time this stream receives a value x, where y = f(x). The type of the returned stream is:
Stream<Nothing, _Error> if F is (X) -> void
Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>
Stream<NewValue, _Error> if F is (X) -> std::optional<NewValue>
Stream<Y, _Error> otherwise
-
template<class F>
auto except(F &&f)# Calls the given function (synchronous or asynchronous) f every time this Stream receives an error. It returns a new Stream that receives the values that this function f returns.
- Template Parameters:
F – Must be (X) -> Y, where X is:
V_1, …, V_n if Value is std::tuple<V_1, …, V_n>
Value otherwise
- Returns:
A new Stream that changes it’s value to y every time this stream receives an error x, where y = f(x). The type of the returned stream is:
Stream<Nothing, Nothing> if F is (X) -> void
Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>
Stream<NewValue, Nothing> if F is (X) -> std::optional<NewValue>
Stream<Y, Nothing> otherwise
-
template<AnyStream Output>
void connect_values(Output output)# Connect this Stream to the given output stream so that the output stream receives all the values.
- Todo:
remove this, use unwrap
-
void publish(const std::string &topic_name, const rclcpp::QoS &qos = rclcpp::SystemDefaultsQoS(), const rclcpp::PublisherOptions publisher_options = {})#
Creates a publisher so that every new value of this Stream will get published.
See also
-
template<AnyStream PublisherType, class ...Args>
void publish(Args&&... args)# Creates a custom publisher so that every new value of this Stream will get published.
See also
-
void publish_transform()#
Creates a transform publisher so that every new value of this Stream, which must be of type
geometry_msgs::msg::TransformStamped, will get published.See also
-
auto unpack()#
Unpacks an Stream holding a tuple as value to multiple Streams for each tuple element. Given that
Valueis of typestd::tuple<Value1, Value2, ..., ValueN>, it returnsstd::tuple< Stream<Value1>, Stream<Value2>, ..., Stream<ValueN>>
-
template<class F>
Stream<Value, Nothing> unwrap_or(F &&f)# Unwraps, i.e. creates an ErrorFreeStream by handling the error with the given function f. The returned Stream will receive only the values of this stream.
- Template Parameters:
F – Function receiving the Error of this Stream (it is unpacked if it’s a tuple) and returning void.
-
template<class F>
Stream<Value, Error> filter(F f)# Outputs the Value only if the given predicate f returns true.
- Template Parameters:
F – Function receiving the Value of this Stream (it is unpacked if it’s a tuple) and returning bool
-
Buffer<Value> buffer(std::size_t N) const#
Buffers N elements. Each time N elements were accumulated, the returned Stream will yield an array of exactly N elements.
-
TimeoutFilter<Value> timeout(const Duration &max_age, bool create_extra_timer = true)#
- Parameters:
max_age – the maximum age a message is allowed to have before the timeout occurs
create_extra_timer – If set to false, the timeout will only be detected after at least one message was received. If set to true, an extra timer is created so that timeouts can be detected even if no message is received.
- Returns:
A new Stream that errors on a timeout, i.e. when this stream has not received any value for some time
max_age.
-
TransformSynchronizer<Value> synchronize_with_transform(const std::string &target_frame, const Duration &lookup_timeout)#
Synchronizes a topic with a transform using the
tf2_ros::MessageFilter.Example:
/// Synchronize with a transform: This will yield the message and the transform from the child_frame_id of the header message /// and the given target_frame ("map") at the time of the header stamp. It will wait up to 200ms for the transform. node->icey().create_subscription<sensor_msgs::msg::Image>("camera") .synchronize_with_transform("map", 200ms) .unwrap_or([&](std::string error) { RCLCPP_INFO_STREAM(node->get_logger(), "Transform lookup error: " << error);}) .then([](sensor_msgs::msg::Image::SharedPtr image, const geometry_msgs::msg::TransformStamped &transform_to_map) { });
- Parameters:
target_frame – the transform on which we wait is specified by source_frame and target_frame, where source_frame is the frame in the header of the message.
lookup_timeout – The maximum time to wait until the transform gets available for a message
-
template<AnyStream S, class ...Args>
S create_stream(Args&&... args) const# Creates a new stream of type S using the Context. See Context::create_stream.
-
template<class _Value = Nothing, class _Error = Nothing>
class PromiseBase : public icey::impl::PromiseTag# A Promise is an asynchronous abstraction that yields a value or an error. It can be used with async/await syntax coroutines in C++20. It also allows for wrapping an existing callback-based API. It does not use dynamic memory allocation to store the value.
Subclassed by icey::impl::Promise< _Value, _Error >
Public Types
-
using Self = PromiseBase<Value, Error>#
Public Functions
-
PromiseBase(LaunchAsync l)#
-
PromiseBase()#
-
PromiseBase(const PromiseBase&) = delete#
-
PromiseBase(PromiseBase&&) = delete#
-
PromiseBase &operator=(const Self&) = delete#
-
PromiseBase &operator=(Self&&) = delete#
-
~PromiseBase()#
calls the cancel function if it was set
-
void unhandled_exception()#
Store the unhandled exception in case it occurs: We will re-throw it when it’s time. (The compiler can’t do this for us because of reasons)
-
void set_value(const Value &x)#
Sets the state to hold a value, but does not notify about this state change. Thread-safety: not synchronized; use resolve/reject/put_state for concurrent producers.
-
void set_error(const Error &x)#
Sets the state to hold an error, but does not notify about this state change. Thread-safety: not synchronized; use resolve/reject/put_state for concurrent producers.
-
void set_state(const State &x)#
Sets the state but does not notify. Thread-safety: not synchronized.
-
void resolve(const Value &value)#
Tries to complete the promise with a value and notifies once. Thread-safety: concurrent resolve/reject/put_state calls are synchronized via is_done_ CAS. After the first successful completion, all subsequent completion attempts are ignored.
-
void reject(const Error &error)#
Tries to complete the promise with an error and notifies once. Thread-safety: concurrent resolve/reject/put_state calls are synchronized via is_done_ CAS. After the first successful completion, all subsequent completion attempts are ignored.
-
void put_state(const State &error)#
Tries to complete the promise with an explicit state and notifies once. Thread-safety: concurrent resolve/reject/put_state calls are synchronized via is_done_ CAS. After the first successful completion, all subsequent completion attempts are ignored.
-
bool await_ready() const noexcept#
-
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept#
Launches the asynchronous operation that was previously stored and sets the awaiting coroutine as continuation. Suspends the current coroutine, i.e. returns true. Used only when wrapping a callback-based API.
-
void notify()#
Calls the continuation coroutine.
-
auto get()#
Get the result of the promise: Re-throws an exception if any was stored, other gets the state.
-
std::suspend_never initial_suspend() const noexcept#
We do not use structured concurrency approach, we want to start the coroutine directly.
-
FinalAwaiter final_suspend() const noexcept#
-
void set_continuation(std::coroutine_handle<> continuation)#
-
void detach()#
-
bool is_detached() const#
-
struct FinalAwaiter#
Keep completed coroutines suspended while a
icey::Promisewrapper still owns them. If the wrapper was destroyed before completion (detached), destroy the frame automatically.
-
using Self = PromiseBase<Value, Error>#
-
template<class _Value, class _Error = Nothing>
class Promise : public icey::impl::PromiseBase<_Value, Nothing># A Promise is used in ICEY for async/await. It is returned from asynchronous operations such as a service calls or TF lookups. By calling co_await on a Promise, an icey::Result is returned.
Dev doc: The whole point of this PromiseBase and Promise is the weird design of the promise interface that is required for C++20 coroutines: The operator co_return that isn’t actually called operator co_return consists of two differently named functions, return_value and return_void. Only one of these two functions must be declared, depending on whether a Promise
holds a value or not, i.e. is the void-version. If you now think “can’t I just choose between
return_value and return_void using SFINAE?”, nope this does not work (Reference:
https://devblogs.microsoft.com/oldnewthing/20210330-00/?p=105019) So we need to use different classes and partial specialization.Public Types
-
using Base = PromiseBase<_Value, _Error>#
Public Functions
-
Promise()#
-
void return_value(const _Value &x)#
return_value (aka. operator co_return) sets the value if called with an argument, very confusing, I know (Reference: https://devblogs.microsoft.com/oldnewthing/20210330-00/?p=105019)
-
using Base = PromiseBase<_Value, _Error>#
-
template<class _Value, class _Error = Nothing>
class Promise# This is the type that users writing coroutines use as the return type. It is what is returned when calling promise_type::get_return_value(). It is necessary because of the C++20 coroutines spec that apparently tries to optimize the copying of the promise inside the coroutine state to the caller. To not confuse the users with C++ coroutine spec’s intricacies, we just call this “Promise”. Note that this is not a “Task”: this term is used seemingly exclusively for the structured programming approach of lazily started coroutines. I.e. it is not a Task as implemented in in Lewis Bakers’s cppcoro library and described in https://www.open-std.org/JTC1/SC22/WG21/docs/papers/2018/p1056r1.html. We cannot use the structured programming approach because it requires a custom executor but we want to use the existing ROS executor.
Public Types
Public Functions
-
explicit Promise(std::coroutine_handle<promise_type> coroutine)#
-
~Promise()#
-
void detach()#
You need to detach a Promise if you are not awaiting it but it should still continue to run. Detaching this outer promise means the coroutine state won’t be destroyed on destruction of outer promise. This also avoid the discarting warning
-
bool await_ready() const noexcept#
-
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept#
-
auto await_resume()#
-
explicit Promise(std::coroutine_handle<promise_type> coroutine)#
Actual implementation of the stream (impl Stream)#
-
template<class _Value, class _Error, class Base, class DefaultBase>
class Stream : public icey::impl::StreamImplBase, public Base# A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript but the state transitions are not final.
- Template Parameters:
_Value – the type of the value
_Error – the type of the error. It can also be an exception.
Base – a class from which this class derives, used as an extention point.
DefaultBase – When new
Streams get created usingthenandexcept, this is used as a template parameter forBaseso that a default extention does not get lost when we callthenorexcept.
Public Types
Public Functions
-
Stream() = default#
-
Stream(const Self&) = delete#
A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.
-
Stream(Self&&) = delete#
A Stream is non-movable since it has members that reference it and therefore it should change it’s address.
-
Stream &operator=(const Self&) = delete#
A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.
-
Stream &operator=(Self&&) = delete#
A Stream is non-movable since it has members that reference it and therefore it should change it’s address.
-
~Stream() = default#
-
bool has_none() const#
-
bool has_value() const#
-
bool has_error() const#
-
void register_handler(Handler &&cb)#
Register a handler (i.e. a callback) that gets called when the state changes. It receives the new state as an argument. TODO If there is any value to take, we should immediately call the handler
-
void set_none()#
Sets the state to hold none, but does not notify about this state change.
-
void set_value(const Value &x)#
Sets the state to hold a value, but does not notify about this state change.
-
void set_error(const Error &x)#
Sets the state to hold an error, but does not notify about this state change.
-
MaybeResult take()#
Returns the current state and sets it to none. If no error is possible (Error is not Nothing), it just the Value to not force the user to write unnecessary error handling/unwraping code.
-
void notify()#
It takes (calls take) the current state and notifies the callbacks. It notifies only in case we have an error or value. If the state is none, it does not notify. If the state is an error and the
Erroris an exception type (a subclass ofstd::runtime_error) and also no handlers were registered, the exception is re-thrown. TODO We should take the value of this stream after notifying
Traits#
-
template<class T>
using icey::ErrorOf = remove_shared_ptr_t<T>::Error# The value type of the given Stream type.
C++20 coroutines support#
-
template<class S>
struct Awaiter# An awaiter required to implement the operator
co_awaitfor Streams. It it needed for supporting C++ coroutines.Public Functions
-
void await_suspend(std::coroutine_handle<> continuation) noexcept#
Registers the continuation (that’s the code that follows the
co_awaitstatement, in form of a function pointer) as a callback of the stream. This callback then gets called by the ROS executor.
-
auto await_resume() const#
Returns the current value of the stream. If an exception occurred (but was not handled) previously, here it is re-thrown.
-
void await_suspend(std::coroutine_handle<> continuation) noexcept#
-
using icey::Clock = std::chrono::system_clock#