C++基于消息隊列的多線程實現示例代碼
前言
實現消息隊列的關鍵因素是考量不同線程訪問消息隊列的同步問題。本實現涉及到幾個知識點
std::lock_guard 介紹
std::lock_gurad 是 C++11 中定義的模板類。定義如下:
template <class Mutex> class lock_guard;
lock_guard 對象通常用于管理某個鎖(Lock)對象,因此與 Mutex RAII 相關,方便線程對互斥量上鎖,即在某個 lock_guard 對象的聲明周期內,它所管理的鎖對象會一直保持上鎖狀態(tài);而 lock_guard 的生命周期結束之后,它所管理的鎖對象會被解鎖(注:類似 shared_ptr 等智能指針管理動態(tài)分配的內存資源 )。
模板參數 Mutex 代表互斥量類型,例如 std::mutex 類型,它應該是一個基本的 BasicLockable 類型,標準庫中定義幾種基本的 BasicLockable 類型,分別 std::mutex, std::recursive_mutex, std::timed_mutex,std::recursive_timed_mutex 以及 std::unique_lock
std::unique_lock 介紹
lock_guard 最大的缺點也是簡單,沒有給程序員提供足夠的靈活度,因此,C++11 標準中定義了另外一個與 Mutex RAII 相關類 unique_lock,該類與 lock_guard 類相似,也很方便線程對互斥量上鎖,但它提供了更好的上鎖和解鎖控制。
顧名思義,unique_lock 對象以獨占所有權的方式( unique owership)管理 mutex 對象的上鎖和解鎖操作,所謂獨占所有權,就是沒有其他的 unique_lock 對象同時擁有某個 mutex 對象的所有權。
新創(chuàng)建的 unique_lock 對象管理 Mutex 對象 m,并嘗試調用 m.lock() 對 Mutex 對象進行上鎖,如果此時另外某個 unique_lock 對象已經管理了該 Mutex 對象 m,則當前線程將會被阻塞。
std::condition介紹
當 std::condition_variable 對象的某個 wait 函數被調用的時候,它使用 std::unique_lock(通過 std::mutex) 來鎖住當前線程。當前線程會一直被阻塞,直到另外一個線程在相同的 std::condition_variable 對象上調用了 notification 函數來喚醒當前線程。
std::condition_variable 提供了兩種 wait() 函數。當前線程調用 wait() 后將被阻塞(此時當前線程應該獲得了鎖(mutex),不妨設獲得鎖 lck),直到另外某個線程調用 notify_* 喚醒了當前線程。
在線程被阻塞時,該函數會自動調用 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競爭上的線程得以繼續(xù)執(zhí)行。另外,一旦當前線程獲得通知(notified,通常是另外某個線程調用 notify_* 喚醒了當前線程),wait() 函數也是自動調用 lck.lock(),使得 lck 的狀態(tài)和 wait 函數被調用時相同。
在第二種情況下(即設置了 Predicate),只有當 pred 條件為 false 時調用 wait() 才會阻塞當前線程,并且在收到其他線程的通知后只有當 pred 為 true 時才會被解除阻塞。因此第二種情況類似以下代碼:
while (!pred()) wait(lck);
std::function介紹
使用std::function可以將普通函數,lambda表達式和函數對象類統(tǒng)一起來。它們并不是相同的類型,然而通過function模板類,可以轉化為相同類型的對象(function對象),從而放入一個vector或其他容器里,方便回調。
代碼實現:
#pragma once #ifndef MESSAGE_QUEUE_H #define MESSAGE_QUEUE_H #include <queue> #include <mutex> #include <condition_variable> template<class Type> class CMessageQueue { public: CMessageQueue& operator = (const CMessageQueue&) = delete; CMessageQueue(const CMessageQueue& mq) = delete; CMessageQueue() :_queue(), _mutex(), _condition(){} virtual ~CMessageQueue(){} void Push(Type msg){ std::lock_guard <std::mutex> lock(_mutex); _queue.push(msg); //當使用阻塞模式從消息隊列中獲取消息時,由condition在新消息到達時提醒等待線程 _condition.notify_one(); } //blocked定義訪問方式是同步阻塞或者非阻塞模式 bool Pop(Type& msg, bool isBlocked = true){ if (isBlocked) { std::unique_lock <std::mutex> lock(_mutex); while (_queue.empty()) { _condition.wait(lock); } //注意這一段必須放在if語句中,因為lock的生命域僅僅在if大括號內 msg = std::move(_queue.front()); _queue.pop(); return true; } else { std::lock_guard<std::mutex> lock(_mutex); if (_queue.empty()) return false; msg = std::move(_queue.front()); _queue.pop(); return true; } } int32_t Size(){ std::lock_guard<std::mutex> lock(_mutex); return _queue.size(); } bool Empty(){ std::lock_guard<std::mutex> lock(_mutex); return _queue.empty(); } private: std::queue<Type> _queue;//存儲消息的隊列 mutable std::mutex _mutex;//同步鎖 std::condition_variable _condition;//實現同步式獲取消息 }; #endif//MESSAGE_QUEUE_H
線程池可以直接在構造函數中構造線程,并傳入回調函數,也可以寫一個Run函數顯示調用。這里我們選擇了第二種,對比:
- 在handler函數外部做循環(huán)接受消息,當消息到達后調用hanlder處理。這種實現在上層做封裝,但是會在線程中頻繁的切換調用函數。這種設計無法復用一些資源,如當在handler中做數據庫操作時,需要頻繁的連接和斷開連接,可以通過定義兩個虛函數Prehandler和AfterHandler來實現。
!?。嬙旌瘮抵姓{用虛函數并不會能真正的調用子類的實現?。?!
雖然可以對虛函數進行實調用,但程序員編寫虛函數的本意應該是實現動態(tài)聯編。在構造函數中調用虛函數,函數的入口地址是在編譯時靜態(tài)確定的,并未實現虛調用 - 寫一個Run函數,將這一部分實現放在run函數中,顯示調用。
《Effective C++ 》條款9:永遠不要在構造函數或析構函數中調用虛函數
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <functional> #include <vector> #include <thread> #include "MessageQueue.h" #define MIN_THREADS 1 template<class Type> class CThreadPool { CThreadPool& operator = (const CThreadPool&) = delete; CThreadPool(const CThreadPool& other) = delete; public: CThreadPool(int32_t threads, std::function<void(Type& record, CThreadPool<Type>* pSub)> handler); virtual ~CThreadPool(); void Run(); virtual void PreHandler(){} virtual void AfterHandler(){} void Submit(Type record); private: bool _shutdown; int32_t _threads; std::function<void(Type& record, CThreadPool<Type>* pSub)> _handler; std::vector<std::thread> _workers; CMessageQueue<Type> _tasks; }; template<class Type> CThreadPool<Type>::CThreadPool(int32_t threads, std::function<void(Type& record, CThreadPool<Type>* pSub)> handler) :_shutdown(false), _threads(threads), _handler(handler), _workers(), _tasks() { //第一種實現方案,注意這里的虛函數調用不正確 /*if (_threads < MIN_THREADS) _threads = MIN_THREADS; for (int32_t i = 0; i < _threads; i++) { _workers.emplace_back( [this]{ PreHandler(); while (!_shutdown){ Type record; _tasks.Pop(record, true); _handler(record, this); } AfterHandler(); } ); }*/ } //第二種實現方案 template<class Type> void CThreadPool<Type>::Run() { if (_threads < MIN_THREADS) _threads = MIN_THREADS; for (int32_t i = 0; i < _threads; i++) { _workers.emplace_back( [this]{ PreHandler(); while (!_shutdown){ Type record; _tasks.Pop(record, true); _handler(record, this); } AfterHandler(); } ); } } template<class Type> CThreadPool<Type>::~CThreadPool() { for (std::thread& worker : _workers) worker.join(); } template<class Type> void CThreadPool<Type>::Submit(Type record) { _tasks.Push(record); } #endif // !THREAD_POOL_H
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對我們的支持。
欄 目:C語言
下一篇:VC++文件監(jiān)控之ReadDirectoryChangesW
本文標題:C++基于消息隊列的多線程實現示例代碼
本文地址:http://mengdiqiu.com.cn/a1/Cyuyan/369.html
您可能感興趣的文章
- 04-02c語言沒有round函數 round c語言
- 01-10深入理解C++中常見的關鍵字含義
- 01-10使用C++實現全排列算法的方法詳解
- 01-10c++中inline的用法分析
- 01-10用C++實現DBSCAN聚類算法
- 01-10全排列算法的非遞歸實現與遞歸實現的方法(C++)
- 01-10C++大數模板(推薦)
- 01-10淺談C/C++中的static與extern關鍵字的使用詳解
- 01-10深入C/C++浮點數在內存中的存儲方式詳解
- 01-10基于atoi()與itoa()函數的內部實現方法詳解


閱讀排行
本欄相關
- 04-02c語言函數調用后清空內存 c語言調用
- 04-02func函數+在C語言 func函數在c語言中
- 04-02c語言的正則匹配函數 c語言正則表達
- 04-02c語言用函數寫分段 用c語言表示分段
- 04-02c語言中對數函數的表達式 c語言中對
- 04-02c語言編寫函數冒泡排序 c語言冒泡排
- 04-02c語言沒有round函數 round c語言
- 04-02c語言分段函數怎么求 用c語言求分段
- 04-02C語言中怎么打出三角函數 c語言中怎
- 04-02c語言調用函數求fibo C語言調用函數求
隨機閱讀
- 01-10C#中split用法實例總結
- 08-05dedecms(織夢)副欄目數量限制代碼修改
- 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
- 04-02jquery與jsp,用jquery
- 01-10delphi制作wav文件的方法
- 01-10使用C語言求解撲克牌的順子及n個骰子
- 01-11ajax實現頁面的局部加載
- 01-10SublimeText編譯C開發(fā)環(huán)境設置
- 08-05織夢dedecms什么時候用欄目交叉功能?
- 08-05DEDE織夢data目錄下的sessions文件夾有什