Seastar: sharded service(1)

Jianyong Chen

Preface

Seastar 是一个多线程异步库, 基于它的 App 通常都有在多个 shard(也称 logic core) 上对称部署服务的诉求; 为此我们需要在多个 shard 上创建服务的实例, 并且让这些实例执行某些操作; 这些涉及到 shard 之间的通信, Seastar 提倡使用显式的消息传递(message passing)而不是传统多线程编程常用的共享内存&加锁的方式进行通信, 为此提供了 smp 工具; 所以我们其实可以这样写:

std::vector<Service> instances; 
instances.resive(smp::count);

smp::invoke_on_all([&instances]() {
    instance[this_shard_id()] = new Service();
});

使用 smp 中的工具, 我们可以实现这些操作, 但是如果有很多这样的服务, 那么就需要重复大量类似的代码(以上代码只是该场景下很小的一部分), 所以我们应该将其沉淀为一个通用的组件, 而 Seastar 也是这样做的, 这就是 seastar::sharded.

一开始它并不叫这个名字, 而是 seastar::distributed, 即分布式服务, 后面考虑到 distributed 经常用于跨机器的场景, 而这里实际上是同机器跨核, 所以改名为 seastar::sharded, 不过考虑到向前兼容还是把 seastar::distributed 作为 seastar::sharded 的一个别名保留, 只是不再推荐使用

一个 🌰

在了解它的实现之前, 先看看它的一些用法, Seastar 提供了一个简易的 HTTP server, 以此为例, 看看如何使用 sharded 在多个 core 上部署它; 为了简化逻辑, 使用 seastar::async 以串行的方式写:

return ss::async([]() {
  ss::sharded<ss::httpd::http_server> server;
  auto deferred = ss::defer([&server]() noexcept {
    server.stop().get0();
  });

  ss::sstring name = "sharded httpd";
  server.start(std::move(name)).get0();
  server.invoke_on_all([](ss::httpd::http_server &s) {}).get0();

  uint16_t port = 12345;
  ss::socket_address sa(port);
  server
      .invoke_on_all<ss::future<> (ss::httpd::http_server::*)(
          ss::socket_address)>(&ss::httpd::http_server::listen, sa)
      .get0();

  auto handler = new my_handler();
  server
      .invoke_on_all([handler](ss::httpd::http_server &s) {
        s._routes.add_default_handler(handler);
      })
      .get0();

  return 0;
});

有几个比较重要的步骤:

  1. 首先调用 ss::shardedstart() 方法, 该方法负责在所有 shard 上创建 Service(此处即 ss::httpd::http_server)实例, 该方法可接收变长参数, 会拷贝转发给 Service 的构造函数
  2. 调用 invoke_on_all 方法在所有 shard 的实例上执行某个函数, 这里的函数可以是自由函数(更准确地说是 functor), 也可以是 Service 的类方法; 这是非常常用的操作, ss::sharded 为此重载了许多变体
  3. 调用 ss::shardedstop() 方法释放/清理实例, 这一步是必需的(这里是作为一个 deferred action 通过 RAII 来确保释放)

内部实现

template<typename Service>
class sharded {
    struct entry {
        shared_ptr<Service> service;
        promise<> freed;
    };
    std::vector<entry> _instances;
};

很简单的一个结构, _instance 则保存着所有 shard 上的实例, 以 ss::this_shard_id() 作为数组索引, 每个 shard 只访问属于自己的实例, 所以不会有并发读写的问题; 暂时可以忽略 entry 中的 freed 成员, 它比较特殊就留到后面再介绍.

创建实例

前面我们讲过, 使用 ss::sharded 首先要调用其 start() 方法在各个 shard 上创建实例:

template <typename Service>
template <typename... Args>
future<> sharded<Service>::start(Args &&...args) noexcept {
  return sharded_parallel_for_each(
      [this, args = std::make_tuple(std::forward<Args>(args)...)](
          unsigned c) mutable {
        return smp::submit_to(c, [this, args]() mutable {
          _instances[this_shard_id()].service = std::apply(
              [this](Args... args) {
                return create_local_service(std::forward<Args>(args)...);
              },
              args);
        });
      });
}

逻辑非常简单直观, 其实就是借助 smp::submit_to 将一个创建 Service 的任务分发到各个 shard 去执行, 不过有几个地方需要注意:

执行函数

这是最常用的操作, 为此 Seastar 在三个维度上提供了许多 invoke_on_* 变体:

这些 invoke_on_* 变体大都大同小异, 并且是以一种统一、通用的方式实现; 比如对于需要在所有 shard 上执行函数的场景, Seastar 提供了以下两个 invoke_on_all 变体(以及另外两个带 smp_submit_options 的变体):

future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept;

template <typename Func, typename... Args>
future<> invoke_on_all(Func func, Args... args) noexcept;

看起来第一个其实没有必要——它也可以通过第二个模板函数实现: 该模板可以接受自由函数, 也可以接受 Service 类中的方法, 并且允许函数接受除 Service& 之外的其余参数; 但是考虑到编译速度, 模板实例化导致的代码膨胀, Seastar 还是提供了第一个方法, 该方法中通过 std::function 从而避免了模板, 所以 Seastar 称之为 type-erased 的版本

模板版本其实也是也是将要执行的函数包装为非模板版本所需要的函数类型, 从而转发给他去执行:


template <typename Service>
template <typename Func, typename... Args>
SEASTAR_CONCEPT(requires std::invocable<Func, Service &,
                                        internal::sharded_unwrap_t<Args>...>)
inline future<> sharded<Service>::invoke_on_all(smp_submit_to_options options,
                                                Func func,
                                                Args... args) noexcept 
  return invoke_on_all(
      options, invoke_on_all_func_type([func = std::move(func),
                                        args = std::tuple(std::move(args)...)](
                                           Service &service) mutable {
        return std::apply(
            [&service, &func](Args &&...args) mutable {
              return futurize_apply(
                  func, std::tuple_cat(std::forward_as_tuple(service),
                                       std::tuple(internal::unwrap_sharded_arg(
                                           std::forward<Args>(args))...)));
            },
            std::move(args));
      }));
}

这里的 func 既可以是自由函数, 也可以是 Service 类的方法; 如果是自由函数, 那么它的第一个参数一定是 Service &, 而如果是 Service 类方法, 则没有此限定(通常也不会是 Service, 毕竟可以直接通过 this 方法), 不过由于 futurize_apply 底层使用 std::apply 去执行 functor, 所以对于 Service 类方法的指针, 需要额外传入一个 Service 实例作为第一个参数, 这样就统一了二者

至于非模板的版本, 其内部逻辑和创建实例的逻辑类似, 用 parall_for_each + smp::submit_to 即可实现

清理服务

调用 stop() 即可清理服务, 这个方法不接受任何参数:

return sharded_parallel_for_each([this] (unsigned c) mutable {
    return smp::submit_to(c, [this] () mutable {
        auto inst = _instances[this_shard_id()].service;
        if (!inst) {
            return make_ready_future<>();
        }
        return internal::stop_sharded_instance(*inst);
    });
}).then_wrapped([this] (future<> fut) {
    return sharded_parallel_for_each([this] (unsigned c) {
        return smp::submit_to(c, [this] {
            if (_instances[this_shard_id()].service == nullptr) {
                return make_ready_future<>();
            }
            _instances[this_shard_id()].service = nullptr;
            return _instances[this_shard_id()].freed.get_future();
        });
    }).finally([this, fut = std::move(fut)] () mutable {
        _instances.clear();
        _instances = std::vector<sharded<Service>::entry>();
        return std::move(fut);
    });
});

可以看到这是一个二段式的清理逻辑:

  1. 第一个 sharded_parallel_for_each 通过 internal::stop_sharded_instance() 调用 Service 自己的 stop() 方法(如果有的话)并等待它们 resolve
  2. 第二个 sharded_parallel_for_each 则是释放实例(递减引用计数), 并等待其 freed promise 被 resolve

有几个点需要注意:

Reference