Asynchronous primitives

Contents

Asynchronous primitives#

The asynchronous primitives like Promise and Stream are the building blocks of ICEY.

template<class T>
struct Ok#

Public Functions

explicit Ok(const T &v)#
explicit Ok(T &&v)#

Public Members

T value#
template<class T>
struct Err#

Public Functions

explicit Err(const T &v)#
explicit Err(T &&v)#

Public Members

T value#
template<class _Value, class _Error>
struct Result : public std::variant<_Value, _Error>#

A result-type is a sum type that holds either a value or error. I is ike Rust’s Result and like C++23’s std::expected, but for C++20.

Public Types

using Value = _Value#
using Error = _Error#

Public Functions

Result() = delete#
Result(const Ok<Value> &ok_value)#
Result(Ok<Value> &&ok_value)#
Result(const Err<Error> &err_value)#
Result(Err<Error> &&err_value)#
bool has_value() const#
bool has_error() const#
Value &value() &#
const Value &value() const &#
Value &&value() &&#
const Value &&value() const &&#
Error &error() &#
const Error &error() const &#
Error &&error() &&#
const Error &&error() const &&#
void set_value(const Value &x)#
void set_value(Value &&x)#
void set_error(const Error &x)#
void set_error(Error &&x)#
explicit operator bool() const#

Warning

doxygenstruct: Cannot find class “icey::Nothing” in doxygen xml output for project “icey” from directory: .build/doxygenxml/

template<class _Value, class _Error = Nothing, class ImplBase = Nothing>
class Stream : public icey::StreamTag#

A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript except that state transitions are not final. This is the base class for all the other streams.

Note

This class does not have any fields except a weak pointer to the actual implementation (i.e. it uses the PIMPL idiom). You should not add any fields when inheriting form this class. Instead, put the additional fields that you need in a separate struct and pass it as the ImplBase template parameter. These fields become available through impl().<my_field>, (i.e. the Impl-class will derive from ImplBase).

Template Parameters:
  • _Value – the type of the value

  • _Error – the type of the error. It can also be an exception.

  • ImplBase – a class from which the implementation (impl::Stream) derives, used as an extension point.

Subclassed by icey::ParameterStream< _Value >

Public Types

using Value = _Value#
using Error = _Error#
using Self = Stream<_Value, _Error, ImplBase>#
using Impl = impl::Stream<Value, Error, WithDefaults<ImplBase>, WithDefaults<Nothing>>#

The actual implementation of the Stream.

using promise_type = Self#

[Coroutine support]

Public Functions

Stream() = default#
explicit Stream(Context &ctx)#

Create s new stream using the context.

explicit Stream(std::shared_ptr<Impl> impl)#
Self &get_return_object()#

[Coroutine support]

std::suspend_never initial_suspend()#

[Coroutine support]

std::suspend_never final_suspend() const noexcept#

[Coroutine support]

auto return_value()#

[Coroutine support] return_value returns the value of the Steam.

void unhandled_exception()#

[Coroutine support] Store the unhandled exception in case it occurs: We will re-throw it when it’s time. (The compiler can’t do this for us because of reasons)

Awaiter<Self> operator co_await()#

[Coroutine support] Allow this stream to be awaited with co_await stream in C++20 coroutines.

void return_value(const Value &x)#

[Coroutine support] Implementation of the operator co_return(x): It sets the value of the Stream object that is about to get returned (the compiler creates it beforehand)

Weak<Impl> impl() const#

Returns a weak pointer to the implementation.

template<class F>
auto then(F &&f)#

Calls the given function (synchronous or asynchronous) f every time this stream receives a value. /// It returns a new stream that receives the values that this function f returns. The returned Stream also passes though the errors of this stream so that chaining thens with an except works.

Template Parameters:

F – Must be (X) -> Y, where X is:

  • V_1, …, V_n if Value is std::tuple<V_1, …, V_n>

  • Value otherwise

Returns:

A new Stream that changes it’s value to y every time this stream receives a value x, where y = f(x). The type of the returned stream is:

  • Stream<Nothing, _Error> if F is (X) -> void

  • Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>

  • Stream<NewValue, _Error> if F is (X) -> std::optional<NewValue>

  • Stream<Y, _Error> otherwise

template<class F>
auto except(F &&f)#

Calls the given function (synchronous or asynchronous) f every time this Stream receives an error. It returns a new Stream that receives the values that this function f returns.

Template Parameters:

F – Must be (X) -> Y, where X is:

  • V_1, …, V_n if Value is std::tuple<V_1, …, V_n>

  • Value otherwise

Returns:

A new Stream that changes it’s value to y every time this stream receives an error x, where y = f(x). The type of the returned stream is:

  • Stream<Nothing, Nothing> if F is (X) -> void

  • Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>

  • Stream<NewValue, Nothing> if F is (X) -> std::optional<NewValue>

  • Stream<Y, Nothing> otherwise

template<AnyStream Output>
void connect_values(Output output)#

Connect this Stream to the given output stream so that the output stream receives all the values.

Todo:

remove this, use unwrap

void publish(const std::string &topic_name, const rclcpp::QoS &qos = rclcpp::SystemDefaultsQoS(), const rclcpp::PublisherOptions publisher_options = {})#

Creates a publisher so that every new value of this Stream will get published.

See also

PublisherStream

template<AnyStream PublisherType, class ...Args>
void publish(Args&&... args)#

Creates a custom publisher so that every new value of this Stream will get published.

See also

PublisherStream

void publish_transform()#

Creates a transform publisher so that every new value of this Stream, which must be of type geometry_msgs::msg::TransformStamped, will get published.

auto unpack()#

Unpacks an Stream holding a tuple as value to multiple Streams for each tuple element. Given that Value is of type std::tuple<Value1, Value2, ..., ValueN>, it returns std::tuple< Stream<Value1>, Stream<Value2>, ..., Stream<ValueN>>

template<class F>
Stream<Value, Nothing> unwrap_or(F &&f)#

Unwraps, i.e. creates an ErrorFreeStream by handling the error with the given function f. The returned Stream will receive only the values of this stream.

Template Parameters:

F – Function receiving the Error of this Stream (it is unpacked if it’s a tuple) and returning void.

template<class F>
Stream<Value, Error> filter(F f)#

Outputs the Value only if the given predicate f returns true.

Template Parameters:

F – Function receiving the Value of this Stream (it is unpacked if it’s a tuple) and returning bool

Buffer<Value> buffer(std::size_t N) const#

Buffers N elements. Each time N elements were accumulated, the returned Stream will yield an array of exactly N elements.

TimeoutFilter<Value> timeout(const Duration &max_age, bool create_extra_timer = true)#
Parameters:
  • max_age – the maximum age a message is allowed to have before the timeout occurs

  • create_extra_timer – If set to false, the timeout will only be detected after at least one message was received. If set to true, an extra timer is created so that timeouts can be detected even if no message is received.

Returns:

A new Stream that errors on a timeout, i.e. when this stream has not received any value for some time max_age.

TransformSynchronizer<Value> synchronize_with_transform(const std::string &target_frame, const Duration &lookup_timeout)#

Synchronizes a topic with a transform using the tf2_ros::MessageFilter.

Example:

  /// Synchronize with a transform: This will yield the message and the transform from the child_frame_id of the header message
  /// and the given target_frame ("map") at the time of the header stamp. It will wait up to 200ms for the transform.
  node->icey().create_subscription<sensor_msgs::msg::Image>("camera")
    .synchronize_with_transform("map", 200ms)
    .unwrap_or([&](std::string error) { RCLCPP_INFO_STREAM(node->get_logger(), "Transform lookup error: " << error);})
    .then([](sensor_msgs::msg::Image::SharedPtr image,
        const geometry_msgs::msg::TransformStamped &transform_to_map) {

    });

Parameters:
  • target_frame – the transform on which we wait is specified by source_frame and target_frame, where source_frame is the frame in the header of the message.

  • lookup_timeout – The maximum time to wait until the transform gets available for a message

template<AnyStream S, class ...Args>
S create_stream(Args&&... args) const#

Creates a new stream of type S using the Context. See Context::create_stream.

template<class _Value = Nothing, class _Error = Nothing>
class PromiseBase : public icey::impl::PromiseTag#

A Promise is an asynchronous abstraction that yields a value or an error. It can be used with async/await syntax coroutines in C++20. It also allows for wrapping an existing callback-based API. It does not use dynamic memory allocation to store the value.

Subclassed by icey::impl::Promise< _Value, _Error >

Public Types

using Value = _Value#
using Error = _Error#
using State = PromiseState<_Value, _Error>#
using Self = PromiseBase<Value, Error>#
using LaunchAsync = std::function<void(Self&)>#

Public Functions

PromiseBase(LaunchAsync l)#
PromiseBase()#
PromiseBase(const PromiseBase&) = delete#
PromiseBase(PromiseBase&&) = delete#
PromiseBase &operator=(const Self&) = delete#
PromiseBase &operator=(Self&&) = delete#
~PromiseBase()#

calls the cancel function if it was set

void unhandled_exception()#

Store the unhandled exception in case it occurs: We will re-throw it when it’s time. (The compiler can’t do this for us because of reasons)

const Value &value() const#
const Error &error() const#
void set_value(const Value &x)#

Sets the state to hold a value, but does not notify about this state change. Thread-safety: not synchronized; use resolve/reject/put_state for concurrent producers.

void set_error(const Error &x)#

Sets the state to hold an error, but does not notify about this state change. Thread-safety: not synchronized; use resolve/reject/put_state for concurrent producers.

void set_state(const State &x)#

Sets the state but does not notify. Thread-safety: not synchronized.

void resolve(const Value &value)#

Tries to complete the promise with a value and notifies once. Thread-safety: concurrent resolve/reject/put_state calls are synchronized via is_done_ CAS. After the first successful completion, all subsequent completion attempts are ignored.

void reject(const Error &error)#

Tries to complete the promise with an error and notifies once. Thread-safety: concurrent resolve/reject/put_state calls are synchronized via is_done_ CAS. After the first successful completion, all subsequent completion attempts are ignored.

void put_state(const State &error)#

Tries to complete the promise with an explicit state and notifies once. Thread-safety: concurrent resolve/reject/put_state calls are synchronized via is_done_ CAS. After the first successful completion, all subsequent completion attempts are ignored.

bool await_ready() const noexcept#
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept#

Launches the asynchronous operation that was previously stored and sets the awaiting coroutine as continuation. Suspends the current coroutine, i.e. returns true. Used only when wrapping a callback-based API.

auto await_resume()#

get()s this promise. Used only when wrapping a callback-based API.

void notify()#

Calls the continuation coroutine.

auto get()#

Get the result of the promise: Re-throws an exception if any was stored, other gets the state.

std::suspend_never initial_suspend() const noexcept#

We do not use structured concurrency approach, we want to start the coroutine directly.

FinalAwaiter final_suspend() const noexcept#
void set_continuation(std::coroutine_handle<> continuation)#
void detach()#
bool is_detached() const#
struct FinalAwaiter#

Keep completed coroutines suspended while a icey::Promise wrapper still owns them. If the wrapper was destroyed before completion (detached), destroy the frame automatically.

Public Functions

bool await_ready() const noexcept#
template<class PromiseT>
bool await_suspend(std::coroutine_handle<PromiseT> h) const noexcept#
void await_resume() const noexcept#
template<class _Value, class _Error = Nothing>
class Promise : public icey::impl::PromiseBase<_Value, Nothing>#

A Promise is used in ICEY for async/await. It is returned from asynchronous operations such as a service calls or TF lookups. By calling co_await on a Promise, an icey::Result is returned.

Dev doc: The whole point of this PromiseBase and Promise is the weird design of the promise interface that is required for C++20 coroutines: The operator co_return that isn’t actually called operator co_return consists of two differently named functions, return_value and return_void. Only one of these two functions must be declared, depending on whether a Promise

holds a value or not, i.e. is the void-version. If you now think “can’t I just choose between

return_value and return_void using SFINAE?”, nope this does not work (Reference:

https://devblogs.microsoft.com/oldnewthing/20210330-00/?p=105019) So we need to use different classes and partial specialization.

Public Types

using Self = Promise<_Value, _Error>#
using Base = PromiseBase<_Value, _Error>#
using State = Base::State#

Public Functions

Promise()#
::icey::Promise<_Value, _Error> get_return_object() noexcept#
void return_value(const _Value &x)#

return_value (aka. operator co_return) sets the value if called with an argument, very confusing, I know (Reference: https://devblogs.microsoft.com/oldnewthing/20210330-00/?p=105019)

void return_value(const State &x)#

Sets the state to the given one:

template<class _Value, class _Error = Nothing>
class Promise#

This is the type that users writing coroutines use as the return type. It is what is returned when calling promise_type::get_return_value(). It is necessary because of the C++20 coroutines spec that apparently tries to optimize the copying of the promise inside the coroutine state to the caller. To not confuse the users with C++ coroutine spec’s intricacies, we just call this “Promise”. Note that this is not a “Task”: this term is used seemingly exclusively for the structured programming approach of lazily started coroutines. I.e. it is not a Task as implemented in in Lewis Bakers’s cppcoro library and described in https://www.open-std.org/JTC1/SC22/WG21/docs/papers/2018/p1056r1.html. We cannot use the structured programming approach because it requires a custom executor but we want to use the existing ROS executor.

Public Types

using Self = Promise<_Value, _Error>#
using Value = _Value#
using Error = _Error#
using promise_type = impl::Promise<_Value, _Error>#

Public Functions

explicit Promise(std::coroutine_handle<promise_type> coroutine)#
Promise(const Promise&) = delete#
Promise(Promise &&other) = delete#
Promise &operator=(const Promise&) = delete#
Promise &operator=(Promise &&other) = delete#
~Promise()#
void detach()#

You need to detach a Promise if you are not awaiting it but it should still continue to run. Detaching this outer promise means the coroutine state won’t be destroyed on destruction of outer promise. This also avoid the discarting warning

bool await_ready() const noexcept#
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept#
auto await_resume()#

Actual implementation of the stream (impl Stream)#

template<class _Value, class _Error, class Base, class DefaultBase>
class Stream : public icey::impl::StreamImplBase, public Base#

A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript but the state transitions are not final.

Template Parameters:
  • _Value – the type of the value

  • _Error – the type of the error. It can also be an exception.

  • Base – a class from which this class derives, used as an extention point.

  • DefaultBase – When new Streams get created using then and except, this is used as a template parameter for Base so that a default extention does not get lost when we call then or except.

Public Types

using Value = _Value#
using Error = _Error#
using Self = Stream<Value, Error, Base, DefaultBase>#
using State = PromiseState<Value, Error>#
using MaybeResult = std::conditional_t<std::is_same_v<Error, Nothing>, Value, Result<Value, Error>>#

If no error is possible (Error is Nothing), this it just the Value instead of the State to not force the user to write unnecessary error handling/unwraping code.

using Handler = std::function<void(const State&)>#

Public Functions

Stream() = default#
Stream(const Self&) = delete#

A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.

Stream(Self&&) = delete#

A Stream is non-movable since it has members that reference it and therefore it should change it’s address.

Stream &operator=(const Self&) = delete#

A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.

Stream &operator=(Self&&) = delete#

A Stream is non-movable since it has members that reference it and therefore it should change it’s address.

~Stream() = default#
bool has_none() const#
bool has_value() const#
bool has_error() const#
const Value &value() const#
const Error &error() const#
State &get_state()#
const State &get_state() const#
void register_handler(Handler &&cb)#

Register a handler (i.e. a callback) that gets called when the state changes. It receives the new state as an argument. TODO If there is any value to take, we should immediately call the handler

void set_none()#

Sets the state to hold none, but does not notify about this state change.

void set_value(const Value &x)#

Sets the state to hold a value, but does not notify about this state change.

void set_error(const Error &x)#

Sets the state to hold an error, but does not notify about this state change.

void set_state(const State &x)#
State take_state()#

Returns the current state and sets it to none.

MaybeResult take()#

Returns the current state and sets it to none. If no error is possible (Error is not Nothing), it just the Value to not force the user to write unnecessary error handling/unwraping code.

void notify()#

It takes (calls take) the current state and notifies the callbacks. It notifies only in case we have an error or value. If the state is none, it does not notify. If the state is an error and the Error is an exception type (a subclass of std::runtime_error) and also no handlers were registered, the exception is re-thrown. TODO We should take the value of this stream after notifying

void put_value(const Value &x)#

Sets the state to a value and notifies.

void put_error(const Error &x)#

Sets the state to an error and notifies.

void put_state(const State &x)#

Sets the state to the given one and notifies.

template<class F>
auto then(F &&f)#
template<class F>
auto except(F &&f)#

Traits#

template<class T>
using icey::ErrorOf = remove_shared_ptr_t<T>::Error#

The value type of the given Stream type.

template<class T>
using icey::ValueOf = typename remove_shared_ptr_t<T>::Value#

The error type of the given Stream type.

template<class T>
using icey::MessageOf = remove_shared_ptr_t<ValueOf<T>>#

The ROS-message of the given Stream type.

C++20 coroutines support#

template<class S>
struct Awaiter#

An awaiter required to implement the operator co_await for Streams. It it needed for supporting C++ coroutines.

Public Functions

Awaiter(S &s)#
bool await_ready() const noexcept#
Returns:

Returns whether this Stream already has a value.

void await_suspend(std::coroutine_handle<> continuation) noexcept#

Registers the continuation (that’s the code that follows the co_await statement, in form of a function pointer) as a callback of the stream. This callback then gets called by the ROS executor.

auto await_resume() const#

Returns the current value of the stream. If an exception occurred (but was not handled) previously, here it is re-thrown.

Public Members

S &stream#
using icey::Clock = std::chrono::system_clock#
using icey::Time = std::chrono::time_point<Clock>#
using icey::Duration = Clock::duration#