整体思路 轻量协程,完成用户栈上的协程上下文切换,图例解释使用方法,对应下文的示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include "log.h" #include "fiber.h" sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT (); void run_in_fiber () { SYLAR_LOG_INFO (g_logger) << "run_in_fiber begin" ; sylar::Fiber::GetThis ()->back (); SYLAR_LOG_INFO (g_logger) << "run_in_fiber end" ; sylar::Fiber::GetThis ()->back (); } void test_fiber () { SYLAR_LOG_INFO (g_logger) << "main begin -1" ; { auto m = sylar::Fiber::GetThis (); SYLAR_LOG_INFO (g_logger) << "main begin 0" ; sylar::Fiber::ptr fiber (new sylar::Fiber(run_in_fiber, 128 *1024 , true )) ; fiber->call (); SYLAR_LOG_INFO (g_logger) << "main after swapIn" ; fiber->call (); SYLAR_LOG_INFO (g_logger) << "main after end" ; fiber->call (); } SYLAR_LOG_INFO (g_logger) << "main after end2" ; } int main () { std::vector<sylar::Thread::ptr> thrs; for (int i = 0 ; i < 1 ; ++i) { thrs.push_back (sylar::Thread::ptr ( new sylar::Thread (test_fiber, "thread_" + std::to_string (i)))); } for (auto i : thrs) { i->join (); } return 0 ; }
成员变量 协程类成员变量包括
协程id、栈指针、栈大小、运行状态
协程上下文
协程运行的函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Fiber : public std::enable_shared_from_this<Fiber>{public : typedef std::shared_ptr<Fiber> ptr; enum State { INIT, HOLD, EXEC, TERM, READY, EXCEPT }; private : uint64_t m_id = 0 ; uint32_t m_stacksize = 0 ; State m_state = INIT; ucontext_t m_ctx; void * m_stack = nullptr ; std::function<void ()> m_cb; };
局部变量 1 2 3 4 5 static std::atomic<uint64_t > s_fiber_id{ 0 };static std::atomic<uint64_t > s_fiber_count{ 0 };static thread_local Fiber* t_fiber = nullptr ; static thread_local Fiber::ptr t_threadFiber = nullptr ;
静态函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Fiber : public std::enable_shared_from_this<Fiber>{public : static void SetThis (Fiber* f) ; static Fiber::ptr GetThis () ; static void YieldToReady () ; static void YieldToHold () ; static void MainFunc () ; static void CallerMainFunc () ; static uint64_t TotalFibers () ; static uint64_t GetFiberId () ; };
SetThis、GetThis
SetThis
:设置当前正在执行的协程
GetThis
:获取当前协程
在线程中首次调用会创建主协程new Fiber()
,其构造函数会设置当前协程为主协程
GetThis
方法会设置线程的主协程t_threadFiber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void Fiber::SetThis (Fiber* f) { t_fiber = f; } Fiber::ptr Fiber::GetThis () { if (t_fiber) { return t_fiber->shared_from_this (); } Fiber::ptr main_fiber (new Fiber) ; SYLAR_ASSERT (t_fiber == main_fiber.get ()); t_threadFiber = main_fiber; return t_fiber->shared_from_this (); }
YieldToReady、YieldToHold 1 2 3 4 5 6 7 8 9 10 11 12 13 14 void Fiber::YieldToReady () { Fiber::ptr cur = GetThis (); SYLAR_ASSERT (cur->m_state == EXEC); cur->m_state = READY; cur->swapOut (); } void Fiber::YieldToHold () { Fiber::ptr cur = GetThis (); SYLAR_ASSERT (cur->m_state == EXEC); cur->swapOut (); }
MainFunc、CallerMainFunc 这两个静态方法是真正执行的函数,将传入的function<void()>
方法进行了包装
获取当前协程的指针,执行m_cb()
协程函数并将其制空
MainFunc
和CallerMainFunc
的区别是切换出去时调用swapOut()
还是back()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 void Fiber::MainFunc () { Fiber::ptr cur = GetThis (); SYLAR_ASSERT (cur); try { cur->m_cb (); cur->m_cb = nullptr ; cur->m_state = TERM; } catch (std::exception& ex) { cur->m_state = EXCEPT; SYLAR_LOG_ERROR (g_logger) << "Fiber Except: " << ex.what () << " fiber_id=" << cur->getId () << std::endl << sylar::BacktraceToString (); } catch (...) { cur->m_state = EXCEPT; SYLAR_LOG_ERROR (g_logger) << "Fiber Except" << " fiber_id=" << cur->getId () << std::endl << sylar::BacktraceToString (); } auto raw_ptr = cur.get (); cur.reset (); raw_ptr->swapOut (); SYLAR_ASSERT2 (false , "never reach fiber_id=" + std::to_string (raw_ptr->getId ())); } void Fiber::CallerMainFunc () { Fiber::ptr cur = GetThis (); SYLAR_ASSERT (cur); try { cur->m_cb (); cur->m_cb = nullptr ; cur->m_state = TERM; } catch (std::exception& ex) { cur->m_state = EXCEPT; SYLAR_LOG_ERROR (g_logger) << "Fiber Except: " << ex.what () << " fiber_id=" << cur->getId () << std::endl << sylar::BacktraceToString (); } catch (...) { cur->m_state = EXCEPT; SYLAR_LOG_ERROR (g_logger) << "Fiber Except" << " fiber_id=" << cur->getId () << std::endl << sylar::BacktraceToString (); } auto raw_ptr = cur.get (); cur.reset (); raw_ptr->back (); SYLAR_ASSERT2 (false , "never reach fiber_id=" + std::to_string (raw_ptr->getId ())); }
成员函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Fiber : public std::enable_shared_from_this<Fiber>{private : Fiber (); public : Fiber (std::function<void ()> cb, size_t stacksize = 0 , bool use_caller = false ); ~Fiber (); void reset (std::function<void ()> cb) ; void swapIn () ; void swapOut () ; void call () ; void back () ; uint64_t getId () const { return m_id; } State getState () const { return m_state; } };
Fiber、~Fiber
Fiber()
:私有构造函数,用于构造主协程,通过GetThis()
调用
Fiber(std::function<void()> cb, size_t stacksize, bool use_caller)
:构造函数,传入协程调用的方法、栈大小、以及是否使用当前线程
~Fiber()
:析构函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 Fiber::Fiber () { m_state = EXEC; SetThis (this ); if (getcontext (&m_ctx)) { SYLAR_ASSERT2 (false , "getcontext" ); } ++s_fiber_count; SYLAR_LOG_DEBUG (g_logger) << "Fiber::Fiber main" ; } Fiber::Fiber (std::function<void ()> cb, size_t stacksize, bool use_caller) :m_id (++s_fiber_id) , m_cb (cb) { ++s_fiber_count; m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue (); m_stack = StackAllocator::Alloc (m_stacksize); if (getcontext (&m_ctx)) { SYLAR_ASSERT2 (false , "getcontext" ); } m_ctx.uc_link = nullptr ; m_ctx.uc_stack.ss_sp = m_stack; m_ctx.uc_stack.ss_size = m_stacksize; if (!use_caller) { makecontext (&m_ctx, &Fiber::MainFunc, 0 ); } else { makecontext (&m_ctx, &Fiber::CallerMainFunc, 0 ); } SYLAR_LOG_DEBUG (g_logger) << "Fiber::Fiber id=" << m_id; } Fiber::~Fiber () { --s_fiber_count; if (m_stack) { SYLAR_ASSERT (m_state == TERM || m_state == EXCEPT || m_state == INIT); StackAllocator::Dealloc (m_stack, m_stacksize); } else { SYLAR_ASSERT (!m_cb); SYLAR_ASSERT (m_state == EXEC); Fiber* cur = t_fiber; if (cur == this ) { SetThis (nullptr ); } } SYLAR_LOG_DEBUG (g_logger) << "Fiber::~Fiber id=" << m_id << " total=" << s_fiber_count; }
swapIn、swapOut 用于在Scheduler
控制的主协程和子协程之间进行切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void Fiber::swapIn () { SetThis (this ); SYLAR_ASSERT (m_state != EXEC); m_state = EXEC; if (swapcontext (&Scheduler::GetMainFiber ()->m_ctx, &m_ctx)) { SYLAR_ASSERT2 (false , "swapcontext" ); } } void Fiber::swapOut () { SetThis (Scheduler::GetMainFiber ()); if (swapcontext (&m_ctx, &Scheduler::GetMainFiber ()->m_ctx)) { SYLAR_ASSERT2 (false , "swapcontext" ); } }
call、back 用于在主协程和子协程之间进行切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void Fiber::call () { SetThis (this ); m_state = EXEC; if (swapcontext (&t_threadFiber->m_ctx, &m_ctx)) { SYLAR_ASSERT2 (false , "swapcontext" ); } } void Fiber::back () { SetThis (t_threadFiber.get ()); if (swapcontext (&m_ctx, &t_threadFiber->m_ctx)) { SYLAR_ASSERT2 (false , "swapcontext" ); } }
reset 重置协程执行的函数,并重置状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void Fiber::reset (std::function<void ()> cb) { SYLAR_ASSERT (m_stack); SYLAR_ASSERT (m_state == TERM || m_state == EXCEPT || m_state == INIT); m_cb = cb; if (getcontext (&m_ctx)) { SYLAR_ASSERT2 (false , "getcontext" ); } m_ctx.uc_link = nullptr ; m_ctx.uc_stack.ss_sp = m_stack; m_ctx.uc_stack.ss_size = m_stacksize; makecontext (&m_ctx, &Fiber::MainFunc, 0 ); m_state = INIT; }
Author:
mxwu
Permalink:
https://mingxuanwu.com/2024/07/24/202407242052/
License:
Copyright (c) 2023 CC-BY-NC-4.0 LICENSE