整体思路

轻量协程,完成用户栈上的协程上下文切换,图例解释使用方法,对应下文的示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "log.h"
#include "fiber.h"


sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();

void run_in_fiber() {
SYLAR_LOG_INFO(g_logger) << "run_in_fiber begin";
sylar::Fiber::GetThis()->back(); // 切换回主协程
SYLAR_LOG_INFO(g_logger) << "run_in_fiber end";
sylar::Fiber::GetThis()->back();
}

void test_fiber() {
SYLAR_LOG_INFO(g_logger) << "main begin -1";
{
auto m = sylar::Fiber::GetThis(); // 创建主协程
SYLAR_LOG_INFO(g_logger) << "main begin 0";
sylar::Fiber::ptr fiber(new sylar::Fiber(run_in_fiber, 128*1024, true)); // 创建子协程
fiber->call(); // 切换至fiber子协程
SYLAR_LOG_INFO(g_logger) << "main after swapIn";
fiber->call();
SYLAR_LOG_INFO(g_logger) << "main after end";
fiber->call();
}
SYLAR_LOG_INFO(g_logger) << "main after end2";
}

int main() {
std::vector<sylar::Thread::ptr> thrs;
for (int i = 0; i < 1; ++i) {
thrs.push_back(sylar::Thread::ptr(
new sylar::Thread(test_fiber, "thread_" + std::to_string(i))));
}
for (auto i : thrs) {
i->join();
}
return 0;
}

成员变量

协程类成员变量包括

  • 协程id、栈指针、栈大小、运行状态
  • 协程上下文
  • 协程运行的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Fiber : public std::enable_shared_from_this<Fiber>{
public:
typedef std::shared_ptr<Fiber> ptr;

enum State {
INIT, // 初始化状态
HOLD, // 暂停状态
EXEC, // 执行中状态
TERM, // 结束状态
READY, // 可执行状态
EXCEPT // 异常状态
};
private:
uint64_t m_id = 0; // 协程id
uint32_t m_stacksize = 0; // 协程运行栈大小
State m_state = INIT; // 协程状态
ucontext_t m_ctx; // 协程上下文
void* m_stack = nullptr; // 协程运行栈指针
std::function<void()> m_cb; // 协程运行函数
};

局部变量

1
2
3
4
5
static std::atomic<uint64_t> s_fiber_id{ 0 };
static std::atomic<uint64_t> s_fiber_count{ 0 };

static thread_local Fiber* t_fiber = nullptr; // 当前协程
static thread_local Fiber::ptr t_threadFiber = nullptr; // 记录主协程

静态函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Fiber : public std::enable_shared_from_this<Fiber>{
public:
static void SetThis(Fiber* f); // 设置当前线程的运行协程
static Fiber::ptr GetThis(); // 返回当前所在的协程

static void YieldToReady(); // 将当前协程切换到后台,并设置为READY状态
static void YieldToHold(); // 将当前协程切换到后台,并设置为HOLD状态

static void MainFunc(); // 协程执行函数 执行完成返回到线程主协程
static void CallerMainFunc(); // 协程执行函数 执行完成返回到线程调度协程

static uint64_t TotalFibers(); // 返回当前协程的总数量
static uint64_t GetFiberId(); // 获取当前协程的id
};

SetThis、GetThis

  • SetThis:设置当前正在执行的协程
  • GetThis:获取当前协程
    • 在线程中首次调用会创建主协程new Fiber(),其构造函数会设置当前协程为主协程
    • GetThis方法会设置线程的主协程t_threadFiber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 设置当前协程
void Fiber::SetThis(Fiber* f) {
t_fiber = f;
}

// 返回当前协程
Fiber::ptr Fiber::GetThis() {
if (t_fiber) {
return t_fiber->shared_from_this();
}
Fiber::ptr main_fiber(new Fiber);
SYLAR_ASSERT(t_fiber == main_fiber.get());
t_threadFiber = main_fiber;
return t_fiber->shared_from_this();
}

YieldToReady、YieldToHold

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//协程切换到后台,并且设置为Ready状态
void Fiber::YieldToReady() {
Fiber::ptr cur = GetThis();
SYLAR_ASSERT(cur->m_state == EXEC);
cur->m_state = READY;
cur->swapOut();
}

//协程切换到后台,并且设置为Hold状态
void Fiber::YieldToHold() {
Fiber::ptr cur = GetThis();
SYLAR_ASSERT(cur->m_state == EXEC);
cur->swapOut();
}

MainFunc、CallerMainFunc

这两个静态方法是真正执行的函数,将传入的function<void()>方法进行了包装

  • 获取当前协程的指针,执行m_cb()协程函数并将其制空
  • MainFuncCallerMainFunc的区别是切换出去时调用swapOut()还是back()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void Fiber::MainFunc() {
Fiber::ptr cur = GetThis();
SYLAR_ASSERT(cur);
try {
cur->m_cb();
cur->m_cb = nullptr;
cur->m_state = TERM;
}
catch (std::exception& ex) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
<< " fiber_id=" << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}
catch (...) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except"
<< " fiber_id=" << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}

auto raw_ptr = cur.get();
cur.reset();
raw_ptr->swapOut();
SYLAR_ASSERT2(false, "never reach fiber_id=" + std::to_string(raw_ptr->getId()));
}

void Fiber::CallerMainFunc() {
Fiber::ptr cur = GetThis();
SYLAR_ASSERT(cur);
try {
cur->m_cb();
cur->m_cb = nullptr;
cur->m_state = TERM;
}
catch (std::exception& ex) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
<< " fiber_id=" << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}
catch (...) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except"
<< " fiber_id=" << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}

auto raw_ptr = cur.get();
cur.reset();
raw_ptr->back();
SYLAR_ASSERT2(false, "never reach fiber_id=" + std::to_string(raw_ptr->getId()));

}

成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Fiber : public std::enable_shared_from_this<Fiber>{
private:
Fiber();
public:
Fiber(std::function<void()> cb, size_t stacksize = 0, bool use_caller = false);
~Fiber();

void reset(std::function<void()> cb);
void swapIn(); // 切换到当前协程执行
void swapOut(); // 将当前协程切换到后台
void call(); // 将当前线程切换到执行状态
void back(); // 将当前线程切换到后台

uint64_t getId() const { return m_id; }
State getState() const { return m_state; }
};

Fiber、~Fiber

  • Fiber():私有构造函数,用于构造主协程,通过GetThis()调用
  • Fiber(std::function<void()> cb, size_t stacksize, bool use_caller):构造函数,传入协程调用的方法、栈大小、以及是否使用当前线程
  • ~Fiber():析构函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
Fiber::Fiber() {
m_state = EXEC;
SetThis(this);

if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}

++s_fiber_count;

SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber main";
}

Fiber::Fiber(std::function<void()> cb, size_t stacksize, bool use_caller)
:m_id(++s_fiber_id)
, m_cb(cb) {
++s_fiber_count;
m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();

m_stack = StackAllocator::Alloc(m_stacksize);
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;

if (!use_caller) {
makecontext(&m_ctx, &Fiber::MainFunc, 0);
}
else {
makecontext(&m_ctx, &Fiber::CallerMainFunc, 0);
}

SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber id=" << m_id;
}

Fiber::~Fiber() {
--s_fiber_count;
if (m_stack) {
SYLAR_ASSERT(m_state == TERM
|| m_state == EXCEPT
|| m_state == INIT);

StackAllocator::Dealloc(m_stack, m_stacksize);
}
else {
SYLAR_ASSERT(!m_cb);
SYLAR_ASSERT(m_state == EXEC);

Fiber* cur = t_fiber;
if (cur == this) {
SetThis(nullptr);
}
}
SYLAR_LOG_DEBUG(g_logger) << "Fiber::~Fiber id=" << m_id
<< " total=" << s_fiber_count;
}

swapIn、swapOut

用于在Scheduler控制的主协程和子协程之间进行切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//切换到当前协程执行
void Fiber::swapIn() {
SetThis(this);
SYLAR_ASSERT(m_state != EXEC);
m_state = EXEC;
if (swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}

//切换到后台执行
void Fiber::swapOut() {
SetThis(Scheduler::GetMainFiber());
if (swapcontext(&m_ctx, &Scheduler::GetMainFiber()->m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}

call、back

用于在主协程和子协程之间进行切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Fiber::call() {
SetThis(this);
m_state = EXEC;
if (swapcontext(&t_threadFiber->m_ctx, &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}

void Fiber::back() {
SetThis(t_threadFiber.get());
if (swapcontext(&m_ctx, &t_threadFiber->m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}

reset

重置协程执行的函数,并重置状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Fiber::reset(std::function<void()> cb) {
SYLAR_ASSERT(m_stack);
SYLAR_ASSERT(m_state == TERM
|| m_state == EXCEPT
|| m_state == INIT);
m_cb = cb;
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}

m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;

makecontext(&m_ctx, &Fiber::MainFunc, 0);
m_state = INIT;
}