2022-04-18
在 Seastar 中,文件相关的操作并不直接由 reactor
线程(工作线程)执行,而是被 offload 到一个专门的线程中去执行,为此
Seastar 在 reactor
中提供了诸多文件操作相关的方法;比如重命名一个文件就可以使用
rename_file
方法:
static ss::future<> f() {
return ss::engine()
.rename_file("hello.txt", "world.txt")
.then_wrapped([](ss::future<> &&fut) {
if (fut.failed()) {
::print("rename file failed: {}", fut.get_exception());
fmt}
return ss::make_ready_future<>();
});
}
所有这些文件操作相关的函数都返回 future
每个 reactor 中都有一个类型为 thread_pool
的变量,但是我觉得有点名不副实,因为它并不是一个
pool,而只是一个单独的线程——仅仅用于执行文件相关的 syscall,所以叫
syscall thread 或许会更好一些;不过这些都是细枝末节,先看看
rename_file
的实现:
<>
future::rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept {
reactor// Allocating memory for a sstring can throw, hence the futurize_invoke
return futurize_invoke([old_pathname, new_pathname] {
return engine()._thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] {
return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
}).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname);
srreturn make_ready_future<>();
});
});
}
逻辑非常简单,就是往 syscall thread
中提交一个任务,该任务也不过是简单地调用了一下 rename
1
这个系统调用;如果该调用出错,则抛出异常——当然被
futurize_invoke
捕获并转换为一个 exceptional future
返回给调用方。
现在还没有看到 thread_pool
的实现,不过在此之前我们可以先想象一下它的大致实现——依靠我们对 Seastar
核间通信机制2的了解,毕竟他俩都是线程之间的通信:
class thread_pool {
* _reactor;
reactoruint64_t _aio_threaded_fallbacks = 0;
;
syscall_work_queue inter_thread_wq;
posix_thread _worker_threadstd::atomic<bool> _stopped = { false };
std::atomic<bool> _main_thread_idle = { false };
};
目前我们只需要理解 inter_thread_wq
和
_worker_thread
这俩成员,前者自然是任务队列,后者则是一个线程实体
class syscall_work_queue {
static constexpr size_t queue_length = 128;
struct work_item;
using lf_queue = boost::lockfree::spsc_queue<work_item*,
boost::lockfree::capacity<queue_length>>;
;
lf_queue _pending;
lf_queue _completed;
writeable_eventfd _start_eventfd= { queue_length };
semaphore _queue_has_room };
_pending
和 _completed
分别是提交队列与完成队列,_queue_has_room
是一个
seastar::semaphore
,主要是限制向 syscall thread
提交任务的数目,毕竟线程的执行能力是有限的;最后一个是
_start_eventfd
,是一个 eventfd3,用于通知 syscall thread
有任务到来
整个任务的执行分为三部分:
其中第一步和第三步都是由 reactor 发起;这里不打算讲
void syscall_work_queue::submit_item(std::unique_ptr<syscall_work_queue::work_item> item) {
(void)_queue_has_room.wait().then_wrapped([this, item = std::move(item)] (future<> f) mutable {
// propagate wait failure via work_item
if (f.failed()) {
->set_exception(f.get_exception());
itemreturn;
}
.push(item.release());
_pending.signal(1);
_start_eventfd});
}
非常简单,首先根据 reactor 提交的 callable object 构建一个
work_item
插入到 task queue
的末尾;和前面提到的一样,这里使用了 semaphore 来限制提交的数目;然后往
eventfd 中写入 1 通知 syscall thread
在构建 posix_thread
对象时需要传入一个函数传给
pthread_craete
创建线程;对于 syscall
thread,这个函数就是它的工作循环:
void thread_pool::work(sstring name) {
(pthread_self(), name.c_str());
pthread_setname_npsigset_t mask;
(&mask);
sigfillsetauto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
(r);
throw_pthread_errorstd::array<syscall_work_queue::work_item*, syscall_work_queue::queue_length> tmp_buf;
while (true) {
uint64_t count;
auto r = ::read(inter_thread_wq._start_eventfd.get_read_fd(), &count, sizeof(count));
assert(r == sizeof(count));
if (_stopped.load(std::memory_order_relaxed)) {
break;
}
auto end = tmp_buf.data();
._pending.consume_all([&] (syscall_work_queue::work_item* wi) {
inter_thread_wq*end++ = wi;
});
for (auto p = tmp_buf.data(); p != end; ++p) {
auto wi = *p;
->process();
wi._completed.push(wi);
inter_thread_wq}
if (_main_thread_idle.load(std::memory_order_seq_cst)) {
uint64_t one = 1;
::write(_reactor->_notify_eventfd.get(), &one, 8);
}
}
}
首先是为这个线程改名字,便于用命令行工具查看;然后屏蔽所有的信号,避免被误伤;最后开始工作循环,里面主要干了 4 件事情:
_start_eventfd
:如果 reactor
中没有任务,那么该线程会被阻塞在这个 read
操作上;否则通过读取到的数值可以确定任务队列中任务的个数(应该是至少有那么多个?)_stopped
,如果为 true
则跳出循环——外部会通过该标志位让 syscall thread 退出提交队列中的任务被 syscall thread 执行完后其结果会被放在完成队列中,reactor 中为此注册了 syscall poller 收割这些执行结果:
class reactor::syscall_pollfn final : public reactor::pollfn {
public:
virtual bool poll() final override {
return _r._thread_pool->complete();
}
};
unsigned syscall_work_queue::complete() {
std::array<work_item*, queue_length> tmp_buf;
auto end = tmp_buf.data();
auto nr = _completed.consume_all([&] (work_item* wi) {
*end++ = wi;
});
for (auto p = tmp_buf.data(); p != end; ++p) {
auto wi = *p;
->complete();
widelete wi;
}
.signal(nr);
_queue_has_roomreturn nr;
}
由于 Seastar 以轮询代替了中断(比如各种
poller),所以当没有任务需要执行时,Seastar 会进入休眠避免无谓的 CPU
消耗,当有外部事件发生时再唤醒它继续执行(相当于从轮询模式转变至中断模式);所以每个
poller 都需要实现 try_enter_interrupt_mode
和
exit_interrupt_mode
两个方法,当 reactor 进入休眠时会调用
enter 方法通知 poller 自己已经进入了休眠,此时 poller 不能再期待 reactor
会主动轮询它,而应该在有事件完成时唤醒 reactor;reactor
退出休眠状态时则会调用 exit 方法通知 poller 状态变更:
void enter_interrupt_mode() { _main_thread_idle.store(true, std::memory_order_seq_cst); }
void exit_interrupt_mode() { _main_thread_idle.store(false, std::memory_order_relaxed); }
当 reactor 进入休眠时,syscall thread 会记录下该状态,并且在其 run
loop 中检查 _main_thread_idle
标志位,这个标志位如果设置了,说明 reactor 进入了休眠,而由于 syscall
thread 只有执行了任务时才会检查该状态,说明此时有结果等待着 reactor
去收割,也就是说它有活干了,所以通过
reactor::_notify_eventfd
通知 reactor 停止休眠:通过写该 fd
产生一个可读事件,从而被 worker 的 epoll 感知到并唤醒线程继续执行其 main
run loop
当 reactor 结束时,syscall thread 也要退出;一般的方式时向它投递一个
pthread_exit()
的任务让它执行;这里也是类似的,不过并没有实际投递任务,而是将
_stopped
标记位设置为 true 并通知 syscall thread:
::~thread_pool() {
thread_pool.store(true, std::memory_order_relaxed);
_stopped._start_eventfd.signal(1);
inter_thread_wq.join();
_worker_thread}
syscall thread 在其 run loop 中发现有任务之后,首先会检查
_stopped
标记并决定是否要退出,这样也达到了目的