整体思路
创建N个线程,有M个协程,scheduler
会调度N个线程执行M个协程的任务
- 创建
Schedule
类,传入需要的线程数量,是否使用当前线程作为主线程,线程名
- 调用
Schedule::start()
方法,根据线程数量创建线程,加入线程队列vector<Thread::ptr>
- 线程执行
bind(&Scheduler::run, this)
方法
- 该方法循环查找协程队列中待执行的协程或方法
- 调用
Schedule::schdule()
方法,传入函数或协程,加入调度队列list<FiberAndThread>
- 调用
Schedule::stop()
方法结束
成员变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class Scheduler { private: MutexType m_mutex; std::vector<Thread::ptr> m_threads; std::list<FiberAndThread> m_fibers; Fiber::ptr m_rootFiber; std::string m_name; protected: std::vector<int> m_threadIds; size_t m_threadCount = 0; std::atomic<size_t> m_activeThreadCount = { 0 }; std::atomic<size_t> m_idleThreadCount = { 0 }; bool m_stopping = true; bool m_autoStop = false; int m_rootThread = 0; };
|
成员函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| class Scheduler { public: Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = ""); virtual ~Scheduler(); const std::string& getName() const { return m_name; } void start(); void stop(); template<class FiberOrCb> void schedule(FiberOrCb fc, int thread = -1) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); need_tickle = scheduleNoLock(fc, thread); } if (need_tickle) { tickle(); } }
template<class InputIterator> void schedule(InputIterator begin, InputIterator end) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); while (begin != end) { need_tickle = scheduleNoLock(&*begin, -1) || need_tickle; ++begin; } } if (need_tickle) { tickle(); } } void switchTo(int thread = -1); std::ostream& dump(std::ostream& os); protected: virtual void tickle(); void run(); virtual bool stopping(); virtual void idle(); void setThis(); bool hasIdleThreads() { return m_idleThreadCount > 0; } private: template<class FiberOrCb> bool scheduleNoLock(FiberOrCb fc, int thread) { bool need_tickle = m_fibers.empty(); FiberAndThread ft(fc, thread); if (ft.fiber || ft.cb) { m_fibers.push_back(ft); } return need_tickle; } };
|
start、stop
start
创建线程,指定执行函数run
,将线程加入到线程池vector<Thread::ptr>
中
stop
结束线程,设置自动停止m_autoStop
为true
,设置正在停止m_stopping
为true
,结束线程join()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| void Scheduler::start() { MutexType::Lock lock(m_mutex); if (!m_stopping) { return; } m_stopping = false; SYLAR_ASSERT(m_threads.empty());
m_threads.resize(m_threadCount); for (size_t i = 0; i < m_threadCount; ++i) { m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this) , m_name + "_" + std::to_string(i))); m_threadIds.push_back(m_threads[i]->getId()); } lock.unlock(); }
void Scheduler::stop() { m_autoStop = true; if (m_rootFiber && m_threadCount == 0 && (m_rootFiber->getState() == Fiber::TERM || m_rootFiber->getState() == Fiber::INIT)) { SYLAR_LOG_INFO(g_logger) << this << " stopped"; m_stopping = true;
if (stopping()) { return; } }
if (m_rootThread != -1) { SYLAR_ASSERT(GetThis() == this); } else { SYLAR_ASSERT(GetThis() != this); }
m_stopping = true; for (size_t i = 0; i < m_threadCount; ++i) { tickle(); }
if (m_rootFiber) { tickle(); }
if (m_rootFiber) { if (!stopping()) { m_rootFiber->call(); } }
std::vector<Thread::ptr> thrs; { MutexType::Lock lock(m_mutex); thrs.swap(m_threads); }
for (auto& i : thrs) { i->join(); } }
|
schedule
schedule
需要传入Fiber::ptr
或std::function<void()> cb
或迭代器,将其加入到待执行的队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| template<class FiberOrCb> void schedule(FiberOrCb fc, int thread = -1) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); need_tickle = scheduleNoLock(fc, thread); } if (need_tickle) { tickle(); } }
template<class InputIterator> void schedule(InputIterator begin, InputIterator end) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); while (begin != end) { need_tickle = scheduleNoLock(&*begin, -1) || need_tickle; ++begin; } } if (need_tickle) { tickle(); } }
|
run
run
函数是核心代码,线程真正执行的函数。具体运行参见第一节整体思路。该函数实现了循环获取待执行的协程函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| void Scheduler::run() { SYLAR_LOG_DEBUG(g_logger) << m_name << " run"; setThis(); if (sylar::GetThreadId() != m_rootThread) { t_scheduler_fiber = Fiber::GetThis().get(); }
Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this))); Fiber::ptr cb_fiber;
FiberAndThread ft; while (true) { ft.reset(); bool tickle_me = false; bool is_active = false; { MutexType::Lock lock(m_mutex); auto it = m_fibers.begin(); while (it != m_fibers.end()) { if (it->thread != -1 && it->thread != sylar::GetThreadId()) { ++it; tickle_me = true; continue; }
SYLAR_ASSERT(it->fiber || it->cb); if (it->fiber && it->fiber->getState() == Fiber::EXEC) { ++it; continue; }
ft = *it; m_fibers.erase(it++); ++m_activeThreadCount; is_active = true; break; } tickle_me |= it != m_fibers.end(); }
if (tickle_me) { tickle(); }
if (ft.fiber && (ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT)) { ft.fiber->swapIn(); --m_activeThreadCount;
if (ft.fiber->getState() == Fiber::READY) { schedule(ft.fiber); } else if (ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) { ft.fiber->m_state = Fiber::HOLD; } ft.reset(); } else if (ft.cb) { if (cb_fiber) { cb_fiber->reset(ft.cb); } else { cb_fiber.reset(new Fiber(ft.cb)); } ft.reset(); cb_fiber->swapIn(); SYLAR_LOG_INFO(g_logger) << cb_fiber->getState(); --m_activeThreadCount; if (cb_fiber->getState() == Fiber::READY) { schedule(cb_fiber); cb_fiber.reset(); } else if (cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) { cb_fiber->reset(nullptr); } else { cb_fiber->m_state = Fiber::HOLD; cb_fiber.reset(); } } else { if (is_active) { --m_activeThreadCount; continue; } if (idle_fiber->getState() == Fiber::TERM) { SYLAR_LOG_INFO(g_logger) << "idle fiber term"; break; }
++m_idleThreadCount; idle_fiber->swapIn(); --m_idleThreadCount; if (idle_fiber->getState() != Fiber::TERM && idle_fiber->getState() != Fiber::EXCEPT) { idle_fiber->m_state = Fiber::HOLD; } } } }
|
静态函数
1 2 3 4 5
| class Scheduler { public: static Scheduler* GetThis(); static Fiber* GetMainFiber(); };
|
Author:
mxwu
Permalink:
https://mingxuanwu.com/2024/07/27/202407271733/
License:
Copyright (c) 2023 CC-BY-NC-4.0 LICENSE