ceph internal 之 ThreadPool和WorkQueue

| 分类 ceph-internal  | 标签 ceph-internal 

前言

线程池和工作队列其实是密不可分的,从ceph的代码中也可以看出来。让任务推入工作队列,而线程池中的线程负责从工作队列中取出任务进行处理。这种处理任务的模式任何一个有C编程经验的人都不会陌生。当年我在ZTE和Trend工作的时候,都写过或者维护过类似的代码。

工作队列和线程池的关系,类似于狡兔和走狗的关系,正是因为有任务,所以才需要雇佣线程来完成任务,没有了狡兔,走狗也就失去了存在的意义。而线程必须要可以从工作队列中认领任务并完成,这就类似于猎狗要有追捕狡兔的功能。正因为两个数据结构拥有如此紧密的关系,因此,Ceph中他们的相关函数都位于WorkQueue.cc和WorkQueue.h中。

线程池线程个数调整

线程池的关键在于线程的主函数做的事情。首先是工作线程.

线程池中会有很多的WorkThread,它的基类就是Thread。线程的主函数为pool->worker,即ThreadPool::worker函数。

  struct WorkThread : public Thread {
    ThreadPool *pool;
    // cppcheck-suppress noExplicitConstructor
    WorkThread(ThreadPool *p) : pool(p) {}
    void *entry() {
      pool->worker(this);
      return 0;
    }
  };
void ThreadPool::worker(WorkThread *wt)

线程池和heartbeat是交织在一起的,后面会有专门的文章介绍,在此先略过。 线程池是支持动态调整线程个数的。所谓调整,有两种可能性,一种是线程个数增加,一种线程个数减少。我们知道,当添加OSD的时候,数据会重分布,恢复的速度可以调节,其中一个重要的参数为osd-max-recovery-threads,该值修改可以实时生效。

ceph tell osd.* injectargs '--osd-max-recovery-threads 8'

该值之所以可以实时生效,说到底,不过是因为OSD类中的recovery_tp就是一种普通的ThreadPool而已


private:

  ThreadPool osd_tp;
  ShardedThreadPool osd_op_tp;
  ThreadPool recovery_tp;
  ThreadPool disk_tp;
  ThreadPool command_tp;

线程个数减少

线程个数减少相对比较简单,线程自杀即可。

void ThreadPool::worker(WorkThread *wt)
{  
  _lock.Lock();
  ldout(cct,10) << "worker start" << dendl;
  
  std::stringstream ss;
  ss << name << " thread " << (void*)pthread_self();
  heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str());

  while (!_stop) {

    // manage dynamic thread pool
    join_old_threads();
    if (_threads.size() > _num_threads) {
      ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
      _threads.erase(wt);
      _old_threads.push_back(wt);
      break;
    }

线程本身是一个loop,不停地处理WorkQueue中的任务,在一个loop的开头,线程个数是否超出了配置的个数,如果超出了,就需要自杀,所谓自杀即将自身推送到_old_threads中,然后跳出loop,直接返回了。线程池中的其他兄弟在busy-loop开头的join_old_threads函数会判断是否存在自杀的兄弟,如果存在的话,执行join,为兄弟收尸。

void ThreadPool::join_old_threads()
{
  assert(_lock.is_locked());
  while (!_old_threads.empty()) {
    ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
    _old_threads.front()->join();
    delete _old_threads.front();
    _old_threads.pop_front();
  }
}

线程个数增加

线程池的线程个数如果不够用,也可以动态的增加,通过配置的变化来做到:

void ThreadPool::handle_conf_change(const struct md_config_t *conf,
				    const std::set <std::string> &changed)
{
  if (changed.count(_thread_num_option)) {
    char *buf;
    int r = conf->get_val(_thread_num_option.c_str(), &buf, -1);
    assert(r >= 0);
    int v = atoi(buf);
    free(buf);
    if (v > 0) {
      _lock.Lock();
      _num_threads = v;
      start_threads();
      _cond.SignalAll();
      _lock.Unlock();
    }
  }
}

void ThreadPool::start_threads()
{
  assert(_lock.is_locked());
  while (_threads.size() < _num_threads) {
    WorkThread *wt = new WorkThread(this);
    ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
    _threads.insert(wt);

    int r = wt->set_ioprio(ioprio_class, ioprio_priority);
    if (r < 0)
      lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;

    wt->create(thread_name.c_str());
  }
}

start_threads函数不仅仅可以用在初始化时启动所有工作线程,而且可以用于动态增加,它会根据配置要求的线程数_num_threads和当前线程池中线程的个数,来创建WorkThread,当然了,他会调整线程的io优先级。

工作线程的暂停执行和恢复执行

线程池的工作线程,绝大部分时间内,自然是busy-loop中处理工作队列上的任务,但是有一种场景是,需要让工作暂时停下来,停止工作,不要处理WorkQueue中的任务。

线程池提供了一个标志为_pause,只要_pause不等于0,那么线程池中线程就在loop中就不会处理工作队列中的任务,而是空转。为了能够及时的醒来,也不是sleep,而是通过条件等待,等待执行的时间。

void ThreadPool::pause()
{
  ldout(cct,10) << "pause" << dendl;
  _lock.Lock();
  _pause++;
  while (processing)
    _wait_cond.Wait(_lock);
  _lock.Unlock();
  ldout(cct,15) << "paused" << dendl;
}

void ThreadPool::pause_new()
{
  ldout(cct,10) << "pause_new" << dendl;
  _lock.Lock();
  _pause++;
  _lock.Unlock();
}



void ThreadPool::worker(WorkThread *wt)
{
  while (!_stop) {

    // manage dynamic thread pool
    join_old_threads();
    if (_threads.size() > _num_threads) {
      ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
      _threads.erase(wt);
      _old_threads.push_back(wt);
      break;
    }

    if (!_pause && !work_queues.empty()) {
      ...
    }

    ldout(cct,20) << "worker waiting" << dendl;
    
    /*重新设置timeout时间,放置误判timeout*/
    cct->get_heartbeat_map()->reset_timeout(
      hb,
      cct->_conf->threadpool_default_timeout,
      0);
      
    /*此处相当于sleep,但是条件变量的存在,一旦情况有变(比如调用了unpause函数)能及时醒来*/
    _cond.WaitInterval(cct, _lock,
      utime_t(
	            cct->_conf->threadpool_empty_queue_max_wait, 0));
  }
  
  ...
}

那么处理pause_new和pause 函数做的事情差不多,两者有什么区别呢?关键在于

  while (processing)
    _wait_cond.Wait(_lock);

当下达pause指令的时候,很可能线程池中的某几个线程正在处理工作队列中的任务,这种情况下并不是立刻就能停下的,只有处理完手头的任务,在下一轮loop中检查_pause标志位才能真正地停下。那么pause指令就面临选择,要不要等工作线程WorkThread处理完手头的任务。pause函数是等,pauser_new函数并不等,pause_new函数只负责设置标志位,当其返回的时候,某几个线程可能仍然在处理工作队列中的任务。

工作线程的工作内容

讲了这么多,基本都是旁枝,而不是工作线程的主干,主干部分是处理工作队列中的任务:

if (!_pause && !work_queues.empty()) {
      WorkQueue_* wq;
      int tries = work_queues.size();
      bool did = false;
      while (tries--) {
	       last_work_queue++;
	       last_work_queue %= work_queues.size();
	       wq = work_queues[last_work_queue];
	       
	       void *item = wq->_void_dequeue();
	       if (item) {
	           processing++;
	           ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
	               << " (" << processing << " active)" << dendl;
	           TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
	           /*重设heartbeat的超时时间*/
	           tp_handle.reset_tp_timeout();
	           _lock.Unlock();
	           wq->_void_process(item, tp_handle);
	           _lock.Lock();
	           wq->_void_process_finish(item);
	           processing--;
	           ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
					    << " (" << processing << " active)" << dendl;
					 if (_pause || _draining)
					     _wait_cond.Signal();
					 did = true;
					 break;
			 }
      }
     if (did)
	    continue;
}

其中_void_process和_void_process_finish是WorkQueue基类中定义的函数,真正定义工作队列的时候,可以继承该基类,定义自己的_process函数和_process_finish 函数,来雇用线程完成特定的任务。

template<class T>
  class WorkQueue : public WorkQueue_ {
    ThreadPool *pool;
    
    /// Add a work item to the queue.
    virtual bool _enqueue(T *) = 0;
    /// Dequeue a previously submitted work item.
    virtual void _dequeue(T *) = 0;
    /// Dequeue a work item and return the original submitted pointer.
    virtual T *_dequeue() = 0;
    virtual void _process_finish(T *) {}

    // implementation of virtual methods from WorkQueue_
    void *_void_dequeue() {
      return (void *)_dequeue();
    }
    void _void_process(void *p, TPHandle &handle) {
      _process(static_cast<T *>(p), handle);
    }
    void _void_process_finish(void *p) {
      _process_finish(static_cast<T *>(p));
    }

  protected:
    /// Process a work item. Called from the worker threads.
    virtual void _process(T *t, TPHandle &) = 0;

  public:
    WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(n, ti, sti), pool(p) {
      pool->add_work_queue(this);
    }
    ~WorkQueue() {
      pool->remove_work_queue(this);
    }

我们不妨以FileStore的op_tp和op_wq为例查看下该线程池中工作线程的日常任务。

ThreadPool op_tp;
  struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
    FileStore *store;
    OpWQ(FileStore *fs, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
      : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", timeout, suicide_timeout, tp), store(fs) {}

    bool _enqueue(OpSequencer *osr) {
      store->op_queue.push_back(osr);
      return true;
    }
    void _dequeue(OpSequencer *o) {
      assert(0);
    }
    bool _empty() {
      return store->op_queue.empty();
    }
    OpSequencer *_dequeue() {
      if (store->op_queue.empty())
	       return NULL;
      OpSequencer *osr = store->op_queue.front();
      store->op_queue.pop_front();
      return osr;
    }
    void _process(OpSequencer *osr, ThreadPool::TPHandle &handle) override {
      store->_do_op(osr, handle);
    }
    void _process_finish(OpSequencer *osr) {
      store->_finish_op(osr);
    }
    void _clear() {
      assert(store->op_queue.empty());
    }
  } op_wq;

工作队列和线程池建立合作关系

好像至今也没有介绍如何建立起合作关系,还是以FileStore的op_tp和op_wq为例:

  op_tp(g_ceph_context, "FileStore::op_tp", "tp_fstore_op", g_conf->filestore_op_threads, "filestore_op_threads"),
  
  op_wq(this, g_conf->filestore_op_thread_timeout,
	g_conf->filestore_op_thread_suicide_timeout, &op_tp),

FileStore实例化的时候,op_tp作为参数传递给了op_wq的构造函数。

我们不妨看看WorkQueue的构造函数和析构函数:

 public:
    WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(n, ti, sti), pool(p) {
      pool->add_work_queue(this);
    }
    ~WorkQueue() {
      pool->remove_work_queue(this);
    }
    
    

而ThreadPool中的add_work_queue和remove_work_queue就是用来建立和移除与WorkQueue关联的函数

  /// assign a work queue to this thread pool
  void add_work_queue(WorkQueue_* wq) {
    Mutex::Locker l(_lock);
    work_queues.push_back(wq);
  }
  /// remove a work queue from this thread pool
  void remove_work_queue(WorkQueue_* wq) {
    Mutex::Locker l(_lock);
    unsigned i = 0;
    while (work_queues[i] != wq)
      i++;
    for (i++; i < work_queues.size(); i++) 
      work_queues[i-1] = work_queues[i];
    assert(i == work_queues.size());
    work_queues.resize(i-1);
  }

因此建立狼狈为奸的关系,需要先创建线程池,然后创建WorkQueue的时候,将线程池作为参数传递给WorkQueue,就能建立关系。


上一篇     下一篇