点击次数:333      更新时间:2017-04-17 19:42:00       作者:本站整理      来源:www.lan6.net        QQ交流群:626957820


 

一、ServerBootstrap介绍

      ServerBootstrap,顾名思义,它是作为Wangle服务端的一个启动辅助类,熟悉Netty的朋友都知道builder模式,它的唯一目的就是以方便的、统一的方式启动一个Server。

二、示例

      下面以官方提供了一个Echo demo为例,先宏观了解一下ServerBootstrap的用法,代码如下:

typedef Pipeline EchoPipeline;
class EchoPipelineFactory : public PipelineFactory {
 public:
  EchoPipeline::Ptr newPipeline(std::shared_ptr sock) {
    auto pipeline = EchoPipeline::create();
    pipeline->addBack(AsyncSocketHandler(sock));
    pipeline->addBack(LineBasedFrameDecoder(8192));
    pipeline->addBack(StringCodec());
    pipeline->addBack(EchoHandler());
    pipeline->finalize();
    return pipeline;
  }
};
int main(int argc, char** argv) {
  google::ParseCommandLineFlags(&argc, &argv, true);

  ServerBootstrap server;
  server.childPipeline(std::make_shared());
  server.bind(FLAGS_port);
  server.waitForStop();

  return 0;
}

 乍一看,是不是觉得和Netty很像,的确,Wangle在设计思想上很大部分都是借鉴了Netty。在定义ServerBootstrap时需要指定一个模板参数,这个参数为新连接对应的Pipleline的类型,上例中是一个Read类型为IOBufQueue&、Write类型为std::string的Pipeline。有了Pipeline类型还不够,还需要一个Pipeline类型工厂专门用于创建该类型的Pipeline(上例中为EchoPipelineFactory),使用ServerBootstrap的childPipeline方法设置。设置完成之后就可以调用bind进行端口监听了。其实ServerBootstrap上还可以设置很多选项,比如使用group方法设置acceptor线程池和io线程池等。

三、bind过程源码分析

  bind有多个重载版本,本文以直接绑定一个本地端口为例(这是最常用的用法),源码为:

     void bind(int port) {
            CHECK(port >= 0);
            folly::SocketAddress address;
            // 设置本地地址
            address.setFromLocalPort(port);
            bindImpl(address);
        }

  bind只是使用setFromLocalPort初始化了一个folly::SocketAddress,然后直接调用bindImpl,继续看bindImpl实现:

 void bindImpl(folly::SocketAddress &address) {
            // 之前没有手动设置group
            if (!workerFactory_) {
                group(nullptr);
            }

            // 如果accept线程数大于1,那么就在所有的accept线程中重用端口进行监听
            bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1);

            std::mutex sock_lock;
            std::vector> new_sockets;

            std::exception_ptr exn;

            // 定义一个lamda表达式,执行ServerSocket创建和accept操作,该函数一定会在accept线程中执行
            auto startupFunc = [&](std::shared_ptr> barrier) {

                try {
                    // 创建服务端监听socket
                    // 此函数不会阻塞
                    // AsyncServerSocketFactory
                    auto socket = socketFactory_->newSocket(address, socketConfig.acceptBacklog, reusePort,
                                                            socketConfig);
                    sock_lock.lock();
                    new_sockets.push_back(socket);
                    sock_lock.unlock();
                    // 获取socket绑定的本地地址
                    socket->getAddress(&address);
                    // 唤醒
                    barrier->post();
                } catch (...) {
                    // 先把异常记录下来
                    exn = std::current_exception();
                    barrier->post();

                    return;
                }

            };

            auto wait0 = std::make_shared>();
            // 在acceptor_group_线程池中添加并执行startupFunc任务(异步)
            acceptor_group_->add(std::bind(startupFunc, wait0));
            wait0->wait();//等待

            // 从1开始,在剩下的acceptor线程中启动监听
            for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
                auto barrier = std::make_shared>();
                acceptor_group_->add(std::bind(startupFunc, barrier));
                barrier->wait();
            }

            // 如果前面有异常
            if (exn) {
                // 异常重新抛出
                std::rethrow_exception(exn);
            }

            // 遍历new_sockets(所有新创建的listening中的socket)
            for (auto &socket : new_sockets) {
                // Startup all the threads
                workerFactory_->forEachWorker([this, socket](Acceptor *worker) {
                    // 在工作线程中
                    socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
                            [this, worker, socket]() {
                                // 异步的添加accept回调worker
                                socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
                            });
                });
                // 缓存所有处于listening状态的socket
                sockets_->push_back(socket);
            }
        }

  首先,检查workerFactory_是否为空,如果为空,说明之前没有手动调用group设置过acceptor、io线程池,那么此时会先调用group(nullptr)进行线程池的默认设置,group函数源码如下:

 ServerBootstrap *group(std::shared_ptr io_group) {
            return group(nullptr, io_group);
        }

  进一步调用group的重载版本:

 ServerBootstrap *group(
                std::shared_ptr accept_group,  // acceptor线程
                std::shared_ptr io_group) {    // io线程
            // 如果没有设置accept线程
            if (!accept_group) {
                // 就创建一个只有一个线程的线程池负责accept
                accept_group = std::make_shared(
                        1, std::make_shared("Acceptor Thread"));
            }
            // 如果没有设置IO线程池
            if (!io_group) {
                auto threads = std::thread::hardware_concurrency();// 返回CPU核数
                if (threads <= 0) {
                    // Reasonable mid-point for concurrency when actual value unknown
                    threads = 8;
                }
                // 创建IO线程,线程数为CPU核数(这一步会真正的创建threads个线程)
                io_group = std::make_shared(
                        threads, std::make_shared("IO Thread"));
            }

            // TODO better config checking
            // CHECK(acceptorFactory_ || childPipelineFactory_);
            CHECK(!(acceptorFactory_ && childPipelineFactory_));

            // 如果自己提供了定制的ServerWorkerPool
            if (acceptorFactory_) {
                workerFactory_ = std::make_shared(
                        acceptorFactory_,
                        io_group.get(),
                        sockets_,
                        socketFactory_);
            } else {
                // 否则就是用默认的
                workerFactory_ = std::make_shared(
                        /* ServerAcceptorFactory用于创建一个ServerAcceptor,这个ServerAcceptor
                         * 负责新建acceptPipeline_,并且它自己还是一个wangle::InboundHandler
                         * 将自己添加到ServerAcceptor,负责对新的连接的创建和管理
                         * 注意在不设置时,acceptPipelineFactory_的值默认无DefaultAcceptPipelineFactory(只是单纯的创建了一个空白Pipeline)
                         * */
                        std::make_shared>(acceptPipelineFactory_, childPipelineFactory_,
                                                                          accConfig_),
                        io_group.get(),
                        sockets_,// listening中的sockets,在bind调用之前这里的sockets_为空
                        socketFactory_);
            }

            // 为IO线程池添加观察者!这一步会出发调用每一个线程的threadPreviouslyStarted方法
            // workerFactory_是一个ThreadPoolExecutor::Observer
            io_group->addObserver(workerFactory_);

            acceptor_group_ = accept_group;
            io_group_ = io_group;

            return this;
        }

 该group函数有两个参数,分别为aceeptor线程池和io线程池,如果两个参数都为nullptr,那么acceptor线程数默认为1,而io线程数为cpu硬件核数,否则就使用参数提供的值,线程池设置完毕之后,会创建workerFactory_。workerFactory_实际上是一个ServerWorkerPool,而ServerWorkerPool实际上是一个线程池的观察者(实现了观察者Observer接口的threadStarted、threadStopped、threadPreviouslyStarted、threadNotYetStoppe方法),ServerWorkerPool构造方法需要四个参数,分别为:创建Acceptor的工厂AcceptorFactory、IO线程池、处于listening中的socket集合、以及用于创建ServerSocket的工厂ServerSocketFactory。其中AcceptorFactory和ServerSocketFactory分别可以通过ServerBootstrap的childHandler方法和channelFactory手动设置,没有设置时分别默认为ServerAcceptorFactory和AsyncServerSocketFactory(大多数情况下都是使用默认值)。在默认情况下,创建ServerAcceptorFactory时,需要显示提供3个参数,他们分别为:AcceptPipelineFactory、childPipelineFactory以及ServerSocketConfig。其中AcceptPipelineFactory的值可以使用ServerBootstrap的pipeline进行设置,如果没有手动设置,默认为DefaultAcceptPipelineFactory(大多数情况都是如此),DefaultAcceptPipelineFactory只是创建一个空白的Pipeline(空白指的是没有添加任何Handler)。childPipelineFactory通过ServerBootstrap的childPipeline方法设置,它主要用来为每个新到来的连接创建Pipeline。上文提到,workerFactory_本质是一个线程池观察者(Observer),那么它是用来观察谁呢?从代码“io_group->addObserver(workerFactory_)”可以看到,它是用来观察io_group(IO线程池),addObserver代码如下:

void ThreadPoolExecutor::addObserver(std::shared_ptr o) {
  RWSpinLock::ReadHolder r{&threadListLock_};
  observers_.push_back(o);
  // 遍历线程列表,分别为每个线程调用观察者的threadPreviouslyStarted
  for (auto& thread : threadList_.get()) {
    o->threadPreviouslyStarted(thread.get());
  }
}

 首先是遍历线程池中的每一个线程,然后为每个线程调用观察者的threadPreviouslyStarted方法,threadPreviouslyStarted方法代码如下:

   virtual void threadPreviouslyStarted(ThreadHandle* h) {
      threadStarted(h);
    }

  进而调用threadStarted:

    void ServerWorkerPool::threadStarted(wangle::ThreadPoolExecutor::ThreadHandle *h) {
        // 创建一个ServerAcceptor,该Acceptor绑定到一个线程池中,此处的exec_为IO线程池
        // exec_->getEventBase(h) 表示获取io线程句柄h对应的eventbase
        auto worker = acceptorFactory_->newAcceptor(exec_->getEventBase(h));
        {
            Mutex::WriteHolder holder(workersMutex_.get());
            // 插入映射(IO线程句柄、ServerAcceptor)
            workers_->insert({h, worker});
        }

        // 遍历所有Listening中的socket,理论上在调用bind之前这里应该直接为空
        for (auto socket : *sockets_) {
            // 在eventbase中执行
            socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
                    [this, worker, socket]() {
                        // 添加accept回调为ServerAcceptor,也就是会在io线程池中执行ServerAcceptor回调
                        // 这个回调有connectionAccepted、acceptError、acceptStarted、acceptStopped
                        socketFactory_->addAcceptCB(socket, worker.get(), worker->getEventBase());
                    });
        }
    }

 threadStarted函数主要作用就是使用前文设置的acceptorFactory_(默认为ServerAcceptorFactory)来创建一个acceptor(默认为ServerAcceptor),并将其放到workers_(本质为一个map)映射起来,后面还会用到。其中,重点看一下ServerAcceptorFactory的newAcceptor实现:

   std::shared_ptr newAcceptor(folly::EventBase *base) {
            auto acceptor = std::make_shared>(acceptPipelineFactory_, childPipelineFactory_,
                                                                       accConfig_);
            // 初始化这个acceptor
            acceptor->init(nullptr, base, nullptr);
            return acceptor;
}

  首先是创建一个ServerAcceptor,然后调用init对其进行初始化。

void init(folly::AsyncServerSocket *serverSocket,
                  folly::EventBase *eventBase,
                  SSLStats *stats = nullptr) override {

            // eventBase为io线程的
            Acceptor::init(serverSocket, eventBase, stats);

            // 创建acceptPipeline,参数为Acceptor
            acceptPipeline_ = acceptPipelineFactory_->newPipeline(this);

            // 如果设置了childPipelineFactory,这就意味着没有自己提供定制的AcceptPipelineFactory
            // 而是采用了默认的,因此需要将ServerAcceptor(本身也是一个Inbound Handler)也添加到
            // AcceptPipeline
            if (childPipelineFactory_) {
                // This means a custom AcceptPipelineFactory was not passed in via
                // pipeline() and we're using the DefaultAcceptPipelineFactory.
                // Add the default inbound handler here.
                acceptPipeline_->addBack(this);
            }

            acceptPipeline_->finalize();
}

 这里需要注意的一点是,ServerAcceptor本身还是一个wangle::InboundHandler类型的Handler,所以将其加入到aceeptor Pipeline中(也是acceptor种唯一一个Handler)。当ServerSocket acceptor一个新连接之后,会调用AcceptorCB回调函数,回调函数相应的方法经过一些处理之后就会在acceptor Pipeline中触发相应的事件。具体的过程后文在讲解一个新连接到来过程的时候还会具体的说明。

  分析完了group函数,继续回到bindImpl函数:

bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1);

  这行代码的意思是,如果代码显示设置过reusePort_或者是acceptor的线程池中的线程数大于1,那么就开起重用端口设置,也就是说,当存在多个acceptor线程池时,同一个端口会在多个acceptor线程池上同时启用监听。接下来了定义了一个startupFunc lambda表达式,它使用上文设置的socketFactory_的newSocket创建一个AsyncServerSocket,newSocket一共需要四个参数,分别为:要监听的ServerSocket地址、acceptor的backlog大小、是否重用端口标识、ServerSocket配置信息。以AsyncServerSocketFactory为例,代码如下:

std::shared_ptr newSocket(
      folly::SocketAddress address, int /*backlog*/, bool reuse,
      ServerSocketConfig& config) override {
    //获取当前线程的eventbase(一定会在accept线程)
    auto* evb = folly::EventBaseManager::get()->getEventBase();
    // 创建AsyncServerSocket
    std::shared_ptr socket(new folly::AsyncServerSocket(evb),ThreadSafeDestructor());
    //是否重用端口
    socket->setReusePortEnabled(reuse);
    // 是否使能tcp的fastopen
    if (config.enableTCPFastOpen) {
      socket->setTFOEnabled(true, config.fastOpenQueueSize);
    }
    // 绑定的地址
    socket->bind(address);
    // 设置监听参数,启动监听
    socket->listen(config.acceptBacklog);
    // 开始accept,这里不会阻塞,只是向事件层注册了持久的read事件
    socket->startAccepting();

    return socket;
  }

 前面只是定义了一个startupFunc函数并没有执行,那么它在哪里执行呢?继续看bindImpl后面的代码:

  auto wait0 = std::make_shared>();
  // 在acceptor_group_线程池中添加并执行startupFunc任务(异步)
  acceptor_group_->add(std::bind(startupFunc, wait0));
  wait0->wait();//等待

可以看到,startupFunc函数被放在了acceptor_group_线程池中执行。同时,如前文所说,如果acceptor线程数大于1,那么会在所有的acceptor线程池中启用监听,代码如下:

for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
     auto barrier = std::make_shared>();
     acceptor_group_->add(std::bind(startupFunc, barrier));
     barrier->wait();
}

 bindImpl函数的最后:

 // 遍历new_sockets(所有新创建的listening中的socket)
for (auto &socket : new_sockets) {
     // 遍历IO线程池
     workerFactory_->forEachWorker([this, socket](Acceptor *worker) {
     // 在acceptor线程中执行
     socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
                            [this, worker, socket]() {
                                // 异步的添加accept回调worker
                                socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
                            });
                });
       // 缓存所有处于listening状态的socket
       sockets_->push_back(socket);
}

  遍历处于listening状态的socket,并为每一个scoket设置AcceptCB(acceptor回调函数,在每一个socket acceptor一个连接后会执行acceptor回调函数)和回调函数执行的线程(此处为IO线程)。这里的回调函数其实就是一个Acceptor,它是在什么时候创建的呢?还记得前文为IO线程池添加观察者时的threadStarted函数吗?是的,就是在threadStarted中为了IO线程池创建了一个Acceptor,来看一下Acceptor的继承关系:

class Acceptor :
  public folly::AsyncServerSocket::AcceptCallback,
  public wangle::ConnectionManager::Callback,
  public folly::AsyncUDPServerSocket::Callback 

  重点看一下folly::AsyncServerSocket::AcceptCallback:

class AcceptCallback {
   public:
    virtual ~AcceptCallback() = default;

    virtual void connectionAccepted(int fd,const SocketAddress& clientAddr)noexcept = 0;

    virtual void acceptError(const std::exception& ex) noexcept = 0;

    virtual void acceptStarted() noexcept {}

    virtual void acceptStopped() noexcept {}
  };

  该回调接口分别对应了acceptor的不同状态。

四、完整的acceptor过程

前文重点讲解了bind的过程,概括来说主要完成看:创建AcceptorPipeline、启动端口监听、为ServerSocket设置Acceptor回调函数等。

 那么当一个处于listening状态的ServerSocket accept一个新连接时会发生什么事情呢?

 前文提到过,每一个处于listening状态的ServerSocker都设置了一个AcceptorCB,这里AcceptorCB就是ServerAcceptor,也就是当有一个新连接被accept时,Acceptor中的connectionAccepted就会被调用,connectionAccepted只是记录了一下accept时间然后调用onDoneAcceptingConnection:

  void Acceptor::connectionAccepted(
            int fd, const SocketAddress &clientAddr) noexcept {
        namespace fsp = folly::portability::sockets;
        if (!canAccept(clientAddr)) {
            // Send a RST to free kernel memory faster
            struct linger optLinger = {1, 0};
            fsp::setsockopt(fd, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger));
            close(fd);
            return;
        }
        // 记录accept开始时间
        auto acceptTime = std::chrono::steady_clock::now();
        for (const auto &opt: socketOptions_) {
            opt.first.apply(fd, opt.second);
        }

        onDoneAcceptingConnection(fd, clientAddr, acceptTime);
    }

onDoneAcceptingConnection定义了连接的TransportInfo,继续调用processEstablishedConnection。

void Acceptor::onDoneAcceptingConnection(
            int fd,
            const SocketAddress &clientAddr,
            std::chrono::steady_clock::time_point acceptTime) noexcept {
        TransportInfo tinfo;
        processEstablishedConnection(fd, clientAddr, acceptTime, tinfo);
}

  processEstablishedConnection中主要处理了一大堆和ssl相关的问题,关键代码如下:

            tinfo.secure = false;
            tinfo.acceptTime = acceptTime;
            // 创建AsyncSocket,此处的base_是之前init的时候传进来的IO线程
            AsyncSocket::UniquePtr sock(makeNewAsyncSocket(base_, fd));
            tinfo.tfoSucceded = sock->getTFOSucceded();
            plaintextConnectionReady(
                    std::move(sock),
                    clientAddr,
                    empty_string,
                    SecureTransportType::NONE,
                    tinfo);

 最终processEstablishedConnection会调用plaintextConnectionReady函数,代码如下:

void Acceptor::plaintextConnectionReady(
            AsyncTransportWrapper::UniquePtr sock,
            const SocketAddress &clientAddr,
            const string &nextProtocolName,
            SecureTransportType secureTransportType,
            TransportInfo &tinfo) {
        connectionReady(
                std::move(sock),
                clientAddr,
                nextProtocolName,
                secureTransportType,
                tinfo);
}

plaintextConnectionReady会调用connectionReady,connectionReady最终会调用onNewConnection,而onNewConnection是Acceptor定义的一个抽象方法 ,ServerAcceptor将其实现为:

void onNewConnection(folly::AsyncTransportWrapper::UniquePtr transport,
                             const folly::SocketAddress *clientAddr,
                             const std::string &nextProtocolName,
                             SecureTransportType secureTransportType,
                             const TransportInfo &tinfo) override {

            ConnInfo connInfo = {transport.release(), clientAddr, nextProtocolName, secureTransportType, tinfo};
            // 在acceptPipeline传播read
            acceptPipeline_->read(connInfo);
}

  重点关注“acceptPipeline_->read(connInfo)”,该连接最终会在accept pipeline中以read时间传播,前文也说过,accept pipeline中默认只有一个唯一的Handler,就是ServerAcceptor本身,那么看一下ServerAcceptor中的read方法都干了什么:

void read(Context *, AcceptPipelineType conn) override {
            if (conn.type() != typeid(ConnInfo &)) {
                return;
            }

            auto connInfo = boost::get(conn);

            folly::AsyncTransportWrapper::UniquePtr transport(connInfo.sock);

            // Setup local and remote addresses
            auto tInfoPtr = std::make_shared(connInfo.tinfo);

            tInfoPtr->localAddr = std::make_shared(accConfig_.bindAddress);

            transport->getLocalAddress(tInfoPtr->localAddr.get());

            tInfoPtr->remoteAddr = std::make_shared(*connInfo.clientAddr);

            tInfoPtr->appProtocol = std::make_shared(connInfo.nextProtoName);

            // 为新连接创建一个pipeline(参数为AsyncTransport)
            auto pipeline = childPipelineFactory_->newPipeline(
                    std::shared_ptr(
                            transport.release(), folly::DelayedDestruction::Destructor()));

            // 设置TransportInfo
            pipeline->setTransportInfo(tInfoPtr);

            // 创建一个新的可被管理的连接,并绑定pipeline(相当于Netty中的Channel)
            auto connection = new ServerConnection(std::move(pipeline));
            // 将连接管理起来
            Acceptor::addConnection(connection);
            // 初始化这个连接
            connection->init();
 }

  首先,设置了远端地址和本地地址,然后使用childPipelineFactory_创建了一个新连接的Pipeline,最后初始化这个新连接,init方法非常简单:

 void init() {
      // 在该连接绑定的pipeline中引发Active事件
      pipeline_->transportActive();
}

  至此,是不是看到了熟悉的身影,没错,从此处,事件便开始在你的业务handler中传播了。如果要从最源头考虑一个新连接建立的过程(包括folly、libevent那一层),那么可以简单归纳为:

handlerReady->consumeMessages->messageAvailable->connectionAccepted->onDoneAcceptingConnection
->processEstablishedConnection->plaintextConnectionReady->connectionReady->onNewConnection

下面来一张图,便于理解:

          

             

<