2022-03-09
这里不打算完整地解释 Seastar 中 future&promise 的实现,里面旁枝末节实在太多,比如:
所以还是希望抓住其中的一些重难点以及做的一些优化进行分析。
这个系列的第一篇文章1中已经 future&promise 的大致实现流程进行了解释,Seastar 整体思想与之类似,但是还是有一些不同2:
std::mutex
和 std::condition_variable
吧std::shared_ptr<SharedState>
吧首先看看 FPC 的类结构,这里直接放出其 UML 图:
非常复杂的层次结构,关于图片有几点注意:
uninitialized_wrapper_base
有两种偏特化实现,但是这里只标注了其一public
和 private
继承,但是在 UML
图中没有表示出来从之前的 SharedState
我们可以总结出,future
和 proimise
的共享状态需要包含 future
的状态,异常值、数据;在 Seastar 中这一角色由 future_state
承担:其中数据从 uninitialize_wrapper
中继承,状态、异常值则从 future_state_base
中继承。
主要介绍 fpc 中几个重要的操作。
future
的串联:也就是 then
方法的实现,它会在 future ready
时调度一段代码执行,由于这个方法返回的也是一个
future
,所以可以实现 future
的串联;由于
then()
涉及到两队 future&promise
以及好几个函数,所以在看代码之前我们有必要约定几个叫法:
future
:即 then()
的调用者promise
:即「调用方
future
」关联的 promsie
future
:即 then()
方法的返回值(也是一个 future
)promise
:即「返回方
future
」关联的 promsie
then()
方法的参数(func
)then()
方法内部对「用户函数」的一个封装(wrapper
)此外,对于 future
中的值从待定变为确定的过程(实际上是future_state_base::any::st
的变化),我们称之为
fulfil/resolve/ready。
好,来看看代码:
template <typename Func, typename Result = futurize_t<typename call_then_impl::template result_type<Func>>>
( requires std::invocable<Func, T SEASTAR_ELLIPSIS> || internal::CanInvokeWhenAllSucceed<Func, T SEASTAR_ELLIPSIS>)
SEASTAR_CONCEPT
Result(Func&& func) noexcept {
then#ifndef SEASTAR_TYPE_ERASE_MORE
return call_then_impl::run(*this, std::move(func));
#else
using func_type = typename call_then_impl::template func_type<Func>;
<func_type> ncf;
noncopyable_function{
::scoped_critical_alloc_section _;
memory= noncopyable_function<func_type>([func = std::forward<Func>(func)](auto&&... args) mutable {
ncf return futurize_invoke(func, std::forward<decltype(args)>(args)...);
});
}
return call_then_impl::run(*this, std::move(ncf));
#endif
}
先不管函数签名里面的各种
typename
、XXX::template
以及
SEASTAR_CONCEPT
;直接看函数体,其中会根据
SEASTAR_TYPE_ERASE_MORE
宏进行条件编译,但不管是哪种情况,最终都会调用
call_then_impl::run()
函数:
template <typename SEASTAR_ELLIPSIS T>
class SEASTAR_NODISCARD future : private internal::future_base {
using call_then_impl = internal::call_then_impl<future>;
...
};
这个类根据调用 then
的 future
的类型(更准确地说是 future
中包含的值的类型)进行了偏特化,但是我们只关注 generic
case,发现最终还是走到了 future::then_impl()
方法:
template <typename Func, typename Result = futurize_t<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>>
Result(Func&& func) noexcept {
then_implusing futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
if (failed()) {
return futurator::make_exception_future(static_cast<future_state_base&&>(get_available_state_ref()));
} else if (available()) {
return futurator::invoke(std::forward<Func>(func), get_available_state_ref().take_value());
}
return then_impl_nrvo<Func, Result>(std::forward<Func>(func));
}
把里面的宏都去掉了,可以发现分为两段逻辑:
future
调用 then
时已经被 fulfil
了:如果 future
成功(available()
),那么就原地执行(in-place,后面 reactor
中执行 func
的情况,则为 out-place) func
;如果
future
失败(failed()
),直接返回一个 exception
futurefuture
调用 then
时还是 unavailable
状态,那就调用 then_impl_nvro
:template <typename Func, typename Result>
(Func&& func) noexcept {
Result then_impl_nrvousing futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
typename futurator::type fut(future_for_get_promise_marker{});
using pr_type = decltype(fut.get_promise());
(fut.get_promise(), std::move(func), [](pr_type&& pr, Func& func, future_state&& state) {
schedule...
});
return fut;
}
从名字就可以看出来,这个函数主要是为了利用 C++ 的 Named Return Value
Optimization 特性3,但是这不是我们今天的重点;首先
futurator
是一个 helper 类,会根据 func
以及「调用方
future
」存储的值的类型萃取出许多有用的类型,比如func
返回的 future
类型(也就是后面的
futurator::type
),future
关联的
promise
的类型…,,然后创建一个 future
作为整个 then
的返回值,最终将「返回方
promise
」,以及「用户函数」func
包装成一个
lambda,这里先不用管这个 lambda
的签名,也不管它做了什么,只知道它被调度了:
template <typename Pr, typename Func, typename Wrapper>
void schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept {
::scoped_critical_alloc_section _;
memoryauto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS>(std::move(pr), std::move(func), std::move(wrapper));
(tws);
schedule._u.st = future_state_base::state::invalid;
_state}
void schedule(continuation_base<T SEASTAR_ELLIPSIS>* tws) noexcept {
::schedule(tws, &tws->_state);
future_base}
对于这个 schedule
方法,我们时刻记住:
pr
是「返回方 promise
」func
是「用户函数」,即实际传给 then()
执行的函数wrapper
才是真正调用 func
的地方:[](pr_type&& pr, Func& func, future_state&& state) {
if (state.failed()) {
.set_exception(static_cast<future_state_base&&>(std::move(state)));
pr} else {
::satisfy_with_result_of(std::move(pr), [&func, &state] {
futuratorreturn internal::future_invoke(func, std::move(state).get_value());
});
}
}
第一个参数是「返回方
promise
」,第二个参数是「用户函数」,第三个参数
state
指向的是创建的 continuation
中的本地状态(continuation::_state
);首先通过这个状态我们检查「调用方
future
」被 resolve 时的状态:如果它是成功的,那么就执行
func
并通过 pr.set_value()
(相关逻辑在
futurator::satisfy_with_result_of
中)
以正常的方式激活「返回方
future
」;否则如果它失败了,那么就不执行
func
,而是直接 pr.set_exception
以异常的方式激活「返回方 future
」从而将异常传播出去。
所以我们知道为什么除了 func
之外,还需要一个
wrapper
:因为 func
只有在特定条件下(future
无异常)才会被执行;并且当「调用方
future
」resolve 之后,除了调用
func
,我们还得激活「返回方
future
」,这些逻辑都是 wrapper
需要做的。
那么现在问题来了,为什么我们可以通过 continuation
中的本地状态就可以得到调用方 future
的结果呢? 那得继续看
future_base::schedule
:
void schedule(task* tws, future_state_base* state) noexcept {
* p = detach_promise();
promise_base->_state = state;
p->_task = tws;
p}
* detach_promise() noexcept {
promise_base->_state = nullptr;
_promise->_future = nullptr;
_promisereturn std::exchange(_promise, nullptr);
}
可以看到里面干了两件事:
future
」的 promise
指针置空,并取消二者的关联关系;promise
」中的状态指针指向新创建的
continuation 的本地状态前面4提到过,在调用 then
之后,「调用方 future」理应要失效;由于 ready 的 「调用方
future
」早已在 then_impl()
方法中就原地执行了「用户函数」,所以这里它肯定是 unavailable
的,而此处取消了它和「调用方
promise
」之间的联系,那么「调用方
promise
」就再也不会激活了,他也就失效了。
而我们也知道了,「调用方
future
」的值必定是要喂给「用户函数」的,而现在「调用方
future
」还没有 ready,所以它的值肯定得等 「调用方
promise
」设置进去之后才会有,所以这里直接让「调用方
promise
」中的状态指针指向
continuation::_state
,这样就可以直接将值设置进来从而喂给「用户函数」了。
这里可以说是「调用方 future」失效了,但是也可以这样理解:「调用方
future」没有失效,但是由用户所持有的那个 future
不能再使用了,它其实是被转移进了 continuation(虽然
continuation
中没有直接存储
future
,但是「调用方 promise
」的状态指针指向了
continuation
的本地状态,所以也可以这样理解),所以说
continuation 中包含有「调用方 future」以及「返回方
promise
」
按照这个思路,那么 future
的串联在物理结构上应该是这样的:
continuation
continuation
中都包含了 「调用方
future
」和「返回方 promise
」future
和 promise
为一对future
是调用层级越外面的「调用方
future
」还有最后一个问题,这个 continuation(或者说
func
/wrapper
)什么时候会执行呢?
我们可以想到,必然是在「调用方 future
」 被
resolve之后,也就是「调用方 promise
」调用了
set_value
或者 set_exception
:
// future.hh
template <typename... A>
void set_value(A&&... a) noexcept {
if (auto *s = get_state()) {
->set(std::forward<A>(a)...);
s<urgent::no>();
make_ready}
}
// future.cc
template <promise_base::urgent Urgent>
void promise_base::make_ready() noexcept {
if (_task) {
if (Urgent == urgent::yes) {
::seastar::schedule_urgent(std::exchange(_task, nullptr));
} else {
::seastar::schedule(std::exchange(_task, nullptr));
}
}
}
// reactor.cc
void schedule(task* t) noexcept {
().add_task(t);
engine}
最终调用的是
seastar::schedule
,这个很名字已经很直观,就不贴内部实现细节了;其实就是把
continuation
添加到当前线程的 reactor 的任务队列里面去,在
reactor 的工作循环中会从队列中取出任务并执行,并调用其
run_and_dispose
方法(对 task
基类虚函数的重写):
virtual void run_and_dispose() noexcept override {
try {
(std::move(this->_pr), _func, std::move(this->_state));
_wrapper} catch (...) {
this->_pr.set_to_current_exception();
}
delete this;
}
这里和前一篇文章5是类似的,都会捕获在执行「用户函数」以及激活「返回方
future
」过程中抛出的异常,并设置到「返回方
future
」中以异常的方式激活它;注意
continuation
是 new
出来的,所以在执行完毕之后需要将其销毁(delete this
,又是我从来没有用过的骚操作)。
总结一下 then
方法的流程:
future
」与「返回方
promise
」,前者作为 then()
方法的返回值返回给调用者,后者用于在 continuation
中激活「返回方 future」continuation
结构,并保存「返回方 promise
」future
」的状态指针指向
continuation
的本地状态promise
」会激活
continuation
——将其放入当前线程的 reactor
的任务队列中等待执行continuation
得到执行后,会通过保存的「返回方
promise
」激活「返回方 future
」前面主要讲了 Seastar 中 future
的 then()
方法的实现,这只是整个 FPC
中的一角,还有一些其他重要细节也值得去了解。
future
的状态前面说过,Seastar 中 FPC 的共享状态数据来自两部分,其中 future
的状态(和共享状态不是一回事)和异常值从 future_state_base
继承而来:
struct future_state_base {
enum class state : uintptr_t {
= 0,
invalid = 1,
future = 2,
result_unavailable = 3,
result = 4,
exception_min };
union any {
;
state ststd::exception_ptr ex;
} _u;
}
其中 future
状态很好理解,就是发起了异步操作但是还没有返回时的状态;result
状态就是异步返回而且成功,而所有 >= exception_min
的状态,都是异常状态,即异步操作返回了但是却失败了。
invalid
的状态,表示这个 future
不能再使用了,比如一个 future
执行了 then
操作之后它就是 invalid 的,被转移后源也是 invalid
…
根据 Seastar 中的相关 commit6 中我们可以找到
result_unavailable
这个状态的作用,它其实是为了解耦「禁止从
future
中多次获取结果」以及「释放 future
包裹的对象」这两个问题的处理流程。如果一个 future
曾经存储过值,那么释放它的时候需要做一些特殊处理,所以如果调用
get
之后直接将其设置为
invalid
,那么就不知道它是否存储过值也就无法进行特殊处理了,所以加上这个状态用来表示这个
future
「曾经存储过值,但是后面被取走了」。
以下是 union any
提供的几个用于判断当前共享状态的状态的方法:
// union any
bool valid() const noexcept { return st != state::invalid && st != state::result_unavailable; }
bool available() const noexcept { return st == state::result || st >= state::exception_min; }
bool failed() const noexcept { return __builtin_expect(st >= state::exception_min, false); }
bool has_result() const noexcept { return st == state::result || st == state::result_unavailable; }
从使用者的角度来看,valid
肯定是共享状态中存有值,而
result_unavailable
只是表示「之前有过值,但是现在没有」,所以肯定不是 valid 的;而
avaiable
表示异步操作已经完成,可能是成功,也可能是失败(异常)。
以下是 future
提供的几个用于判断状态的
public
函数,从 future
使用者的角度来看只有两种状态:
// class future
bool available() const noexcept { return _state.available(); }
bool failed() const noexcept { return _state.failed(); }
即「异步操作是否完成」以及「异步操作是否失败」,没有提供「异步操作是否成功」的接口,因为异步操作完成了不是失败就是成功;二者都是在
union any
提供的方法上做的封装,没啥好说的。
tagged pointer
另外一个值得讲一讲的是为什么 future_state
需要从两个地方分别继承它所需要的数据,为什么要将状态和异常值放一起?
之所以要将状态和异常值放在一个 union
中,是因为这样可以利用 tagged pointer7
技术节省一个指针大小的内存。FPC 是 Seastar
中异步编程的基石,在整个项目中被大量使用,所以可以在这样一个基础功能上节省内存带来的收益将是巨大的。
关于 tagged pointer,大多数架构都是字节可寻址的,但某些类型的数据通常会与数据的大小对齐,比如说指针,在 64 位架构上,指针为 8 字节,所以指针的值总是 8 的倍数,也就意味着其最低 3 位总是为 0,我们可以用它来存储一些信息;相当于给指针加了一个 tag,所以叫做 tagged pointer。
我们再来重新审视一下 union any
的定义:
enum class state : uintptr_t {
= 0,
invalid = 1,
future = 2,
result_unavailable = 3,
result = 4,
exception_min };
union any {
;
state ststd::exception_ptr ex;
} _u;
union any
可以被作为一个 state
解释,也可以作为一个 std::exception_ptr
解释。
state
是一个 uintptr_t
即一个指针大小,但是其中 0/1/2/3 状态只使用了 uintptr_t
的最低两位;std::exception_ptr
其实就是一个指针,所以当
union any
作为一个 std::exception_ptr
解释时,其最低两位是用不上的,而这正好可以给 state
使用;而
std::exception_ptr
只要有值,其值必然是 >=8 的,所以从
state
角度来看,只要它 >=4,说明里面存储的肯定是一个
std::exception_ptr
,直接取出 ex
即可。
但是这里没有直接将二者作为一个字段使用,而是采用了 union 的方式使用两个字段(同一块内存,两种解释),这样会更加直观。
而为什么需要从两个地方分别继承它所需要的数据,我想是为了让各自的职责分明。
在前一篇文章8中解释过,promise
要想激活
future
,那么必须有一个共享的状态关联二者,之前使用的是
std::shared_ptr<SharedState>
,是一个堆上的对象,future
和 promise
都有一个指向该对象的指针从而实现状态的共享;但是
Seastar 中的 future&promise
为了极致的性能,并不希望动态分配内存,从而采用了另外一种复杂但是更加高效的方案。
Seastar 采用的方案是在 future
中存储一个本地状态,promise
有一个状态指针指向它,从而在调用
set_value
/set_exception
时可以直接更新
future
的状态。通常我们先创建一个
promise
,然后获取其关联的 future
返回给异步调用的发起者,此时 promise::_state
指向
future::_state
;这是最核心的逻辑,但是还有一些问题需要考虑。
promise
中的本地状态
倘若 future&promise 的使用模式,都是先创建
promise
,再获取 future
,最后再使
future
ready,那么以上策略就可以了;但是倘若我们在
get_future
之前就调用了
set_value/set_exception
,那么数据该写到哪里呢,毕竟
promise::_state
只是一个指针?所以 promise
中也有一个本地状态用于处理这种情况:
<int> pr;
Promise.set_value(13); pr
倘若在这之后再调用 get_future
,那么获取的就是一个 ready
future:
template <typename SEASTAR_ELLIPSIS T>
inline
<T SEASTAR_ELLIPSIS>
future<T SEASTAR_ELLIPSIS>::get_future() noexcept {
promiseassert(!this->_future && this->_state && !this->_task);
return future<T SEASTAR_ELLIPSIS>(this);
}
class future {
private:
(promise<T SEASTAR_ELLIPSIS>* pr) noexcept : future_base(pr, &_state), _state(std::move(pr->_local_state)) { }
future};
class future_base {
protected:
(promise_base* promise, future_state_base* state) noexcept : _promise(promise) {
future_base->_future = this;
_promise->_state = state;
_promise}
};
可以看到做了三件事情:
future
和 promise
之间的联系promise
中的状态指针指向 future
的本地状态promise
的本地状态移动到 future
的本地状态中去如果 promise
中已经有值了,那么第三步会使得返回的
future
是 ready 的。
continuation
中的本地状态
这一点我们已经在 then()
方法实现中已经提到过了;continuation
作为「调用方
promise」和「返回方 future」之间的桥梁;then
调用之后,「调用方 promise
」需要激活的是
continuation
,再由 continuation
激活「返回方
future」;所以此时 continuation
必须有一个状态供「调用方
promise
」设置,所以它也有一个本地状态。
注意「调用方 promise
」不能将数据直接写入「返回方
future
」,因为「返回方
future
」接收的是「用户函数」在「调用方
promise」产生数据上的调用结果。
总结
future
中有一个本地状态,promise
则有一个状态的指针指向该状态,从而实现状态共享promise
没有和 future
关联(即还没有调用 promise::get_future
)之前,就调用了
promise::set_value/set_exception()
这种情况,promise
中也有一个本地状态用于存储数据then
之后,「调用方
future
」失效(或者说被移入
continuation
),「调用方
promise
」的状态指针指向 continuation
的本地状态,异步函数完成后,「调用方 promise
」激活
continuation
,continuation
执行完「用户函数」后激活「返回方 future
」future
和
promise
的关联以上 future&promise 模型并没有解决 future 或者 promise
移动的问题,倘若 future
被移动,promise
指向的状态不就失效了吗?因此 future
和 promise
中各有一个指向对方的指针,这两个指针将二者关联起来,从而在被
std::move
之后可以更新对状态的引用:
// future.cc
void promise_base::move_it(promise_base&& x) noexcept {
// Don't use std::exchange to make sure x's values are nulled even
// if &x == this.
= x._task;
_task ._task = nullptr;
x= x._state;
_state ._state = nullptr;
x= x._future;
_future if (auto* fut = _future) {
->detach_promise();
fut->_promise = this;
fut}
}
// future.hh
void future_base::move_it(future_base&& x, future_state_base* state) noexcept {
= x._promise;
_promise if (auto* p = _promise) {
.detach_promise();
x->_future = this;
p->_state = state;
p}
}
那么有一个问题,为什么 future 调用 then
之后,「调用方
future
」没有和 continuation
建立这样的相互联系呢?毕竟「调用方
promise
」的状态指针指向了 continuation
的本地状态。这是因为 continuation
是通过 new
动态分配出来的,所以它不会(也不需要)被移动,也就不存在更新状态指针的问题了。
future
通常发起一个异步请求后得到的 future
并不是 ready
的,
<Response> async_get_response(Connection *conn) {
future<> handshake = make_ready_future<>();
futureif (!conn->reused) {
= conn->handshake();
handshake }
return handshake.then([conn]() {
/* TODO: send header/body, recv header/body... */
});
}
上面代码模拟的是带有连接复用的 HTTP
请求流程,如果这个连接是一个新的连接,就需要先进行 TCP
三次握手,这也是一个异步流程,所以它返回一个 future,通过这个 future
可以串联后面的其他操作(发送头部发送 body,接收头部,接收
body);而如果这个连接是复用已有连接,那么就不用再执行 TCP
握手流程,直接发送数据接收数据即可;为了这两种情况的后续流程得以统一(使用
future::then
串联操作),我们需要为连接复用的流程创建一个
ready
的 future,Seastar 为此提供了
make_ready_future
这个 helper。
从前面 then
的实现中我们已经看到,ready
的
future 在执行 then()
方法时,会原地执行
「用户函数」。同样道理,Seastar 还提供了
make_exception_future
这个 ready future 的异常版本。
future
并不非得存储数据(而且也不一定只能存储一个值——即使是,那也是一个
std::tuple
,其中又可以存储任意个异构类型的值),也就是
future<>
,不存储数据的值相当于是一个信号,我们通过
ready
方法判断异步请求是否完成。
Seastar 中使用 future_stored_type
来表示 future
中存储的数据类型(同时还有一个 future_stored_type_t
别名):
struct monostate {};
template<typename T...>
struct future_stored_type;
template<>
struct future_stored_type<> {
using type = monostate;
};
template<typename T>
struct future_stored_type<T> {
using type = std::conditional_t<std::is_void_t<T>, monostate, T>;
};
template<typename ...T>
using future_stored_type_t = typename future_stored_type<T...>::type;
对于存了数据和没有存数据的 future,Seastar 使用模板特化来对不同的情况设置不同的数据类型:
future_stored_type_t
就是monostate
future_stored_type_t
就是数据类型T
但是这里有个问题,为什么要引入monostate
这个类型,直接引入void
不行么?
这是是因为void
虽然表示空,但是它并不是一种类型(尽管它还可以放在函数返回类型的位置上),所以如果直接使用void
作为future_stored_type
,就表示这是一种类型,然后事实并不是这样,毕竟我们无法使用
void
来声明一个变量(void*
倒是可以):
template<>
struct future_stored_type<> {
using type = void; // ok
}
using ft = future_stored_type_t<>;
; // error: Variable has incomplete type 'ft' (aka 'void') ft v1
而类型的作用就是让我们创建变量,所以直接使用void
是不可取的,因此使用了monostate
这个类型。而实际上
C++17 标准库也提供了std::monostate
类型,但是 Seastar
考虑到这只是一个非常简单的类型,没有必要为此引入
<variant>
头文件。
随后在future_stored_type
之外,Seastar
还定义了future_tuple_type_t
:
template<typename T>
using future_tuple_type_t = std::conditional_t<std::is_same_v<T, monostate>, tuple<>, tuple<T>;
这个就是根据future_stored_type
,将包装成 tuple
类型。
https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎
http://seastar.io/futures-promises/↩︎
https://stackoverflow.com/questions/12953127/what-are-copy-elision-and-return-value-optimization↩︎
https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎
https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎
https://github.com/scylladb/seastar/commit/589b24b3535714d24db875f6d7bb3a6207f09ef7↩︎
https://www.wikiwand.com/en/Tagged_pointer↩︎
https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎