Asynchronous primitives#

The asynchronous primitives like Promise and Stream are the building blocks of ICEY.

template<class _Value, class _Error>
struct Result : private std::variant<std::monostate, _Value, _Error>, public icey::ResultTag#

A Result-type is a sum type that can either hold Value or Error, or, different to Rust, none. It is used as the state for the Stream. Note that this Result type is API-compatible with std::expected (C++23), so a change is easily possible once we target C++23.

Public Types

using Value = _Value#
using Error = _Error#
using Self = Result<_Value, _Error>#

Public Functions

bool has_none() const#
bool has_value() const#
bool has_error() const#
const Value &value() const#
const Error &error() const#
void set_none()#
void set_value(const Value &x)#
void set_error(const Error &x)#
auto get() const#

Public Static Functions

static Self None()#
static Self Ok(const _Value &x)#
static Self Err(const _Error &x)#
struct Nothing#

A special type that indicates that there is no value. (Using void for this would cause many problems, so defining an extra struct is easier.)

template<class _Value, class _Error = Nothing, class ImplBase = Nothing>
class Stream : public icey::StreamTag#

A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript except that state transitions are not final. This is the base class for all the other streams.

Note

This class does not have any fields except a weak pointer to the actual implementation (i.e. it uses the PIMPL idiom). You should not add any fields when inheriting form this class. Instead, put the additional fields that you need in a separate struct and pass it as the ImplBase template parameter. These fields become available through impl().<my_field>, (i.e. the Impl-class will derive from ImplBase).

Template Parameters:
  • _Value – the type of the value

  • _Error – the type of the error. It can also be an exception.

  • ImplBase – a class from which the implementation (impl::Stream) derives, used as an extention point.

Subclassed by icey::ParameterStream< _Value >

Public Types

using Value = _Value#
using Error = _Error#
using Self = Stream<_Value, _Error, ImplBase>#
using Impl = impl::Stream<Value, Error, WithDefaults<ImplBase>, WithDefaults<Nothing>>#

The actual implementation of the Stream.

using promise_type = Self#

[Coroutine support]

Public Functions

Stream() = default#
explicit Stream(Context &ctx)#

Create s new stream using the context.

explicit Stream(std::shared_ptr<Impl> impl)#
Self &get_return_object()#

[Coroutine support]

std::suspend_never initial_suspend()#

[Coroutine support]

std::suspend_never final_suspend() const noexcept#

[Coroutine support]

auto return_value()#

[Coroutine support] return_value returns the value of the Steam.

void unhandled_exception()#

[Coroutine support] Store the unhandled exception in case it occurs: We will re-throw it when it’s time. (The compiler can’t do this for us because of reasons)

Awaiter<Self> operator co_await()#

[Coroutine support] Allow this stream to be awaited with co_await stream in C++20 coroutines.

void return_value(const Value &x)#

[Coroutine support] Implementation of the operator co_return(x): It sets the value of the Stream object that is about to get returned (the compiler creates it beforehand)

Weak<Impl> impl() const#

Returns a weak pointer to the implementation.

template<class F>
auto then(F &&f)#

Calls the given function (synchronous or asynchronous) f every time this stream receives a value. /// It returns a new stream that receives the values that this function f returns. The returned Stream also passes though the errors of this stream so that chaining thens with an except works.

Template Parameters:

F – Must be (X) -> Y, where X is:

  • V_1, …, V_n if Value is std::tuple<V_1, …, V_n>

  • Value otherwise

Returns:

A new Stream that changes it’s value to y every time this stream receives a value x, where y = f(x). The type of the returned stream is:

  • Stream<Nothing, _Error> if F is (X) -> void

  • Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>

  • Stream<NewValue, _Error> if F is (X) -> std::optional<NewValue>

  • Stream<Y, _Error> otherwise

template<class F>
auto except(F &&f)#

Calls the given function (synchronous or asynchronous) f every time this Stream receives an error. It returns a new Stream that receives the values that this function f returns.

Template Parameters:

F – Must be (X) -> Y, where X is:

  • V_1, …, V_n if Value is std::tuple<V_1, …, V_n>

  • Value otherwise

Returns:

A new Stream that changes it’s value to y every time this stream receives an error x, where y = f(x). The type of the returned stream is:

  • Stream<Nothing, Nothing> if F is (X) -> void

  • Stream<NewValue, NewError> if F is (X) -> Result<NewValue, NewError>

  • Stream<NewValue, Nothing> if F is (X) -> std::optional<NewValue>

  • Stream<Y, Nothing> otherwise

template<AnyStream Output>
void connect_values(Output output)#

Connect this Stream to the given output stream so that the output stream receives all the values.

Todo:

remove this, use unwrap

void publish(const std::string &topic_name, const rclcpp::QoS &qos = rclcpp::SystemDefaultsQoS(), const rclcpp::PublisherOptions publisher_options = {})#

Creates a publisher so that every new value of this Stream will get published.

See also

PublisherStream

template<AnyStream PublisherType, class ...Args>
void publish(Args&&... args)#

Creates a custom publisher so that every new value of this Stream will get published.

See also

PublisherStream

void publish_transform()#

Creates a transform publisher so that every new value of this Stream, which must be of type geometry_msgs::msg::TransformStamped, will get published.

auto unpack()#

Unpacks an Stream holding a tuple as value to multiple Streams for each tuple element. Given that Value is of type std::tuple<Value1, Value2, ..., ValueN>, it returns std::tuple< Stream<Value1>, Stream<Value2>, ..., Stream<ValueN>>

template<class F>
Stream<Value, Nothing> unwrap_or(F &&f)#

Unwraps, i.e. creates an ErrorFreeStream by handling the error with the given function f. The returned Stream will receive only the values of this stream.

Template Parameters:

F – Function receiving the Error of this Stream (it is unpacked if it’s a tuple) and returning void.

template<class F>
Stream<Value, Error> filter(F f)#

Outputs the Value only if the given predicate f returns true.

Template Parameters:

F – Function receiving the Value of this Stream (it is unpacked if it’s a tuple) and returning bool

Buffer<Value> buffer(std::size_t N) const#

Buffers N elements. Each time N elements were accumulated, the returned Stream will yield an array of exactly N elements.

TimeoutFilter<Value> timeout(const Duration &max_age, bool create_extra_timer = true)#
Parameters:
  • max_age – the maximum age a message is allowed to have before the timeout occurs

  • create_extra_timer – If set to false, the timeout will only be detected after at least one message was received. If set to true, an extra timer is created so that timeouts can be detected even if no message is received.

Returns:

A new Stream that errors on a timeout, i.e. when this stream has not received any value for some time max_age.

TransformSynchronizer<Value> synchronize_with_transform(const std::string &target_frame, const Duration &lookup_timeout)#

Synchronizes a topic with a transform using the tf2_ros::MessageFilter.

Example:

  /// Synchronize with a transform: This will yield the message and the transform from the child_frame_id of the header message
  /// and the given target_frame ("map") at the time of the header stamp. It will wait up to 200ms for the transform. 
  node->icey().create_subscription<sensor_msgs::msg::Image>("camera")
    .synchronize_with_transform("map", 200ms)
    .unwrap_or([&](std::string error) { RCLCPP_INFO_STREAM(node->get_logger(), "Transform lookup error: " << error);})
    .then([](sensor_msgs::msg::Image::SharedPtr image, 
        const geometry_msgs::msg::TransformStamped &transform_to_map) {

    });

Parameters:
  • target_frame – the transform on which we wait is specified by source_frame and target_frame, where source_frame is the frame in the header of the message.

  • lookup_timeout – The maximum time to wait until the transform gets available for a message

template<AnyStream S, class ...Args>
S create_stream(Args&&... args) const#

Creates a new stream of type S using the Context. See Context::create_stream.

Warning

doxygenclass: Cannot find class “icey::PromiseBase” in doxygen xml output for project “icey” from directory: .build/doxygenxml/

template<class _Value, class _Error = Nothing>
class Promise#

This is the type that users writing coroutines use as the return type. It is what is returned when calling promise_type::get_return_value(). It is necessary because of the C++20 coroutines spec that apparently tries to optimize the copying of the promise inside the coroutine state to the caller. To not confuse the users with C++ coroutine spec’s intricacies, we just call this “Promise”. Note that this is not a “Task”: this term is used seemingly exclusively for the structured programming approach of lazily started coroutines. I.e. it is not a Task as implemented in in Lewis Bakers’s cppcoro library and described in https://www.open-std.org/JTC1/SC22/WG21/docs/papers/2018/p1056r1.html. We cannot use the structured programming approach because it requires a custom executor but we want to use the existing ROS executor.

Public Types

using Self = Promise<_Value, _Error>#
using Value = _Value#
using Error = _Error#
using promise_type = impl::Promise<_Value, _Error>#

Public Functions

explicit Promise(std::coroutine_handle<promise_type> coroutine)#
Promise(const Promise&) = delete#
Promise(Promise &&other) = delete#
Promise &operator=(const Promise&) = delete#
Promise &operator=(Promise &&other) = delete#
~Promise()#
bool await_ready() const noexcept#
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept#
auto await_resume()#

Actual implementation of the stream (impl Stream)#

template<class _Value, class _Error, class Base, class DefaultBase>
class Stream : public icey::impl::StreamImplBase, public Base#

A stream, an abstraction over an asynchronous sequence of values. It has a state of type Result and a list of callbacks that get notified when this state changes. It is conceptually very similar to a promise in JavaScript but the state transitions are not final.

Template Parameters:
  • _Value – the type of the value

  • _Error – the type of the error. It can also be an exception.

  • Base – a class from which this class derives, used as an extention point.

  • DefaultBase – When new Streams get created using then and except, this is used as a template parameter for Base so that a default extention does not get lost when we call then or except.

Public Types

using Value = _Value#
using Error = _Error#
using Self = Stream<Value, Error, Base, DefaultBase>#
using State = Result<Value, Error>#
using MaybeResult = std::conditional_t<std::is_same_v<Error, Nothing>, Value, State>#

If no error is possible (Error is Nothing), this it just the Value instead of the State to not force the user to write unnecessary error handling/unwraping code.

using Handler = std::function<void(const State&)>#

Public Functions

Stream() = default#
Stream(const Self&) = delete#

A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.

Stream(Self&&) = delete#

A Stream is non-movable since it has members that reference it and therefore it should change it’s address.

Stream &operator=(const Self&) = delete#

A Stream is non-copyable since it has members that reference it and therefore it should change it’s address.

Stream &operator=(Self&&) = delete#

A Stream is non-movable since it has members that reference it and therefore it should change it’s address.

~Stream() = default#
bool has_none() const#
bool has_value() const#
bool has_error() const#
const Value &value() const#
const Error &error() const#
State &get_state()#
const State &get_state() const#
void register_handler(Handler &&cb)#

Register a handler (i.e. a callback) that gets called when the state changes. It receives the new state as an argument. TODO If there is any value to take, we should immediately call the handler

void set_none()#

Sets the state to hold none, but does not notify about this state change.

void set_value(const Value &x)#

Sets the state to hold a value, but does not notify about this state change.

void set_error(const Error &x)#

Sets the state to hold an error, but does not notify about this state change.

void set_state(const State &x)#
State take_state()#

Returns the current state and sets it to none.

MaybeResult take()#

Returns the current state and sets it to none. If no error is possible (Error is not Nothing), it just the Value to not force the user to write unnecessary error handling/unwraping code.

void notify()#

It takes (calls take) the current state and notifies the callbacks. It notifies only in case we have an error or value. If the state is none, it does not notify. If the state is an error and the Error is an exception type (a subclass of std::runtime_error) and also no handlers were registered, the exception is re-thrown. TODO We should take the value of this stream after notifying

void put_value(const Value &x)#

Sets the state to a value and notifies.

void put_error(const Error &x)#

Sets the state to an error and notifies.

void put_state(const State &x)#

Sets the state to the given one and notifies.

template<class F>
auto then(F &&f)#
template<class F>
auto except(F &&f)#

Traits#

template<class T>
using icey::ErrorOf = remove_shared_ptr_t<T>::Error#

The value type of the given Stream type.

template<class T>
using icey::ValueOf = typename remove_shared_ptr_t<T>::Value#

The error type of the given Stream type.

template<class T>
using icey::MessageOf = remove_shared_ptr_t<ValueOf<T>>#

The ROS-message of the given Stream type.

C++20 coroutines support#

template<class S>
struct Awaiter#

An awaiter required to implement the operator co_await for Streams. It it needed for supporting C++ coroutines.

Public Functions

Awaiter(S &s)#
bool await_ready() const noexcept#
Returns:

Returns whether this Stream already has a value.

void await_suspend(std::coroutine_handle<> continuation) noexcept#

Registers the continuation (that’s the code that follows the co_await statement, in form of a function pointer) as a callback of the stream. This callback then get’s called by the ROS executor.

auto await_resume() const#

Returns the current value of the stream. If an exception occurred (but was not handled) previously, here it is re-thrown.

Public Members

S &stream#
using icey::Clock = std::chrono::system_clock#
using icey::Time = std::chrono::time_point<Clock>#
using icey::Duration = Clock::duration#