整体思路

创建N个线程,有M个协程,scheduler会调度N个线程执行M个协程的任务

  1. 创建Schedule类,传入需要的线程数量,是否使用当前线程作为主线程,线程名
  2. 调用Schedule::start()方法,根据线程数量创建线程,加入线程队列vector<Thread::ptr>
    1. 线程执行bind(&Scheduler::run, this)方法
    2. 该方法循环查找协程队列中待执行的协程或方法
  3. 调用Schedule::schdule()方法,传入函数或协程,加入调度队列list<FiberAndThread>
  4. 调用Schedule::stop()方法结束

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Scheduler {
private:
MutexType m_mutex;
std::vector<Thread::ptr> m_threads; // 线程池
std::list<FiberAndThread> m_fibers; // 待执行的协程队列
Fiber::ptr m_rootFiber; // use_caller为true时有效, 调度协程
std::string m_name; // 协程调度器名称
protected:
std::vector<int> m_threadIds; // 协程下的线程id数组
size_t m_threadCount = 0; // 线程数量
std::atomic<size_t> m_activeThreadCount = { 0 }; // 工作线程数量
std::atomic<size_t> m_idleThreadCount = { 0 }; // 空闲线程数量
bool m_stopping = true; // 是否正在停止
bool m_autoStop = false; // 是否自动停止
int m_rootThread = 0; // 主线程id(use_caller)
};

成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Scheduler {
public:
Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "");
virtual ~Scheduler();
const std::string& getName() const { return m_name; }
void start();
void stop();
template<class FiberOrCb>
void schedule(FiberOrCb fc, int thread = -1) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
need_tickle = scheduleNoLock(fc, thread);
}
if (need_tickle) {
tickle();
}
}

template<class InputIterator>
void schedule(InputIterator begin, InputIterator end) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
while (begin != end) {
need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
++begin;
}
}
if (need_tickle) {
tickle();
}
}
void switchTo(int thread = -1);
std::ostream& dump(std::ostream& os);
protected:
virtual void tickle();
void run();
virtual bool stopping();
virtual void idle();
void setThis();
bool hasIdleThreads() { return m_idleThreadCount > 0; }
private:
template<class FiberOrCb>
bool scheduleNoLock(FiberOrCb fc, int thread) {
bool need_tickle = m_fibers.empty();
FiberAndThread ft(fc, thread);
if (ft.fiber || ft.cb) {
m_fibers.push_back(ft);
}
return need_tickle;
}
};

start、stop

  • start创建线程,指定执行函数run,将线程加入到线程池vector<Thread::ptr>
  • stop结束线程,设置自动停止m_autoStoptrue,设置正在停止m_stoppingtrue,结束线程join()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    void Scheduler::start() {
    MutexType::Lock lock(m_mutex);
    if (!m_stopping) {
    return;
    }
    m_stopping = false;
    SYLAR_ASSERT(m_threads.empty());

    m_threads.resize(m_threadCount);
    for (size_t i = 0; i < m_threadCount; ++i) {
    m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this)
    , m_name + "_" + std::to_string(i)));
    m_threadIds.push_back(m_threads[i]->getId());
    }
    lock.unlock();
    }

    void Scheduler::stop() {
    m_autoStop = true;
    if (m_rootFiber
    && m_threadCount == 0
    && (m_rootFiber->getState() == Fiber::TERM
    || m_rootFiber->getState() == Fiber::INIT)) {
    SYLAR_LOG_INFO(g_logger) << this << " stopped";
    m_stopping = true;

    if (stopping()) {
    return;
    }
    }

    if (m_rootThread != -1) {
    SYLAR_ASSERT(GetThis() == this);
    }
    else {
    SYLAR_ASSERT(GetThis() != this);
    }

    m_stopping = true;
    for (size_t i = 0; i < m_threadCount; ++i) {
    tickle();
    }

    if (m_rootFiber) {
    tickle();
    }

    if (m_rootFiber) {
    if (!stopping()) {
    m_rootFiber->call();
    }
    }

    std::vector<Thread::ptr> thrs;
    {
    MutexType::Lock lock(m_mutex);
    thrs.swap(m_threads);
    }

    for (auto& i : thrs) {
    i->join();
    }
    }

schedule

schedule需要传入Fiber::ptrstd::function<void()> cb或迭代器,将其加入到待执行的队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<class FiberOrCb>
void schedule(FiberOrCb fc, int thread = -1) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
need_tickle = scheduleNoLock(fc, thread);
}
if (need_tickle) {
tickle();
}
}

template<class InputIterator>
void schedule(InputIterator begin, InputIterator end) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
while (begin != end) {
need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
++begin;
}
}
if (need_tickle) {
tickle();
}
}

run

run函数是核心代码,线程真正执行的函数。具体运行参见第一节整体思路。该函数实现了循环获取待执行的协程函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void Scheduler::run() {
SYLAR_LOG_DEBUG(g_logger) << m_name << " run";
setThis();
if (sylar::GetThreadId() != m_rootThread) {
t_scheduler_fiber = Fiber::GetThis().get(); // 创建当前线程的主协程
}

Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this))); // 创建空闲空转协程
Fiber::ptr cb_fiber; // 回调函数协程

FiberAndThread ft;
while (true) {
ft.reset();
bool tickle_me = false; // 用于标记是否需要唤醒其他线程
bool is_active = false; // 标记是否有活跃的协程
{ // 上锁, 保护共享资源m_fibers, 在队列中寻找一个协程or函数跑起来
MutexType::Lock lock(m_mutex);
auto it = m_fibers.begin();
while (it != m_fibers.end()) {
if (it->thread != -1 && it->thread != sylar::GetThreadId()) {
++it;
tickle_me = true; // 需要唤醒其他线程
continue;
}

SYLAR_ASSERT(it->fiber || it->cb);
if (it->fiber && it->fiber->getState() == Fiber::EXEC) {
++it;
continue;
}

ft = *it;
m_fibers.erase(it++);
++m_activeThreadCount; // 该线程准备执行*it中的回调, 活跃线程+1
is_active = true;
break;
}
tickle_me |= it != m_fibers.end(); // 如果队列未遍历完, 则需要唤醒其他线程
}

if (tickle_me) {
tickle(); // 唤醒其他线程
}

// 如果是协程
if (ft.fiber && (ft.fiber->getState() != Fiber::TERM
&& ft.fiber->getState() != Fiber::EXCEPT)) {
ft.fiber->swapIn(); // 切入协程执行, 执行完成后自动返回
--m_activeThreadCount; // 执行结束, 活跃线程-1

if (ft.fiber->getState() == Fiber::READY) {
schedule(ft.fiber); // 外部函数可能手动切换YieldToReady, 需要重新加入队列
}
else if (ft.fiber->getState() != Fiber::TERM
&& ft.fiber->getState() != Fiber::EXCEPT) {
ft.fiber->m_state = Fiber::HOLD;
}
ft.reset();
}
else if (ft.cb) {
if (cb_fiber) { // 回调协程已经存在时, 只需要调用协程类的重置回调函数
cb_fiber->reset(ft.cb);
}
else { // 回调协程不存在时, 创建一个回调携程, 并赋值回调函数
cb_fiber.reset(new Fiber(ft.cb));
}
ft.reset(); // 已经获取需要的数据, 重置 ft
cb_fiber->swapIn(); // 切换到该协程运行 与当前线程主线程完成切换 swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)
SYLAR_LOG_INFO(g_logger) << cb_fiber->getState();
--m_activeThreadCount;
if (cb_fiber->getState() == Fiber::READY) {
schedule(cb_fiber); // 通过YieldToReady()切换出来, 所以Ready状态需要重新加入队列
cb_fiber.reset(); // 释放 cb_fiber
}
else if (cb_fiber->getState() == Fiber::EXCEPT
|| cb_fiber->getState() == Fiber::TERM) {
cb_fiber->reset(nullptr); // 协程出错或执行结束, 将该线程执行的函数设置为空, 下次进入时再reset新的函数即可
}
else {
cb_fiber->m_state = Fiber::HOLD; // 其他情况从协程出来就设置为HOLD状态
cb_fiber.reset(); // 释放 cb_fiber
}
}
else {
if (is_active) { // 如果前面拿到了一个新的ft, 会对活跃线程+1, 但ft实际上是空的, 活跃线程-1
--m_activeThreadCount;
continue;
}
if (idle_fiber->getState() == Fiber::TERM) { // idle协程为终止时结束
SYLAR_LOG_INFO(g_logger) << "idle fiber term";
break;
}

++m_idleThreadCount; // 空闲线程+1
idle_fiber->swapIn(); // 切换入空闲协程执行
--m_idleThreadCount; // 空闲线程-1
if (idle_fiber->getState() != Fiber::TERM
&& idle_fiber->getState() != Fiber::EXCEPT) {
idle_fiber->m_state = Fiber::HOLD; // 空闲协程设置为暂停而非EXEC执行状态, 线程进入下一次循环
}
}
}
}

静态函数

1
2
3
4
5
class Scheduler {
public:
static Scheduler* GetThis();
static Fiber* GetMainFiber();
};