萬盛學電腦網

 萬盛學電腦網 >> 服務器教程 >> linux中編寫並發隊列類

linux中編寫並發隊列類

 這篇文章主要介紹了linux中編寫並發隊列類,功能有:並發阻塞隊列、有超時限制、有大小限制

設計並發隊列   代碼如下: #include <pthread.h> #include <list> using namespace std;   template <typename T> class Queue  {  public:      Queue( )      {          pthread_mutex_init(&_lock, NULL);      }      ~Queue( )      {          pthread_mutex_destroy(&_lock);     }      void push(const T& data);     T pop( );  private:      list<T> _list;      pthread_mutex_t _lock; };   template <typename T> void Queue<T>::push(const T& value )  {      pthread_mutex_lock(&_lock);     _list.push_back(value);     pthread_mutex_unlock(&_lock); }   template <typename T> T Queue<T>::pop( )  {      if (_list.empty( ))      {          throw "element not found";     }     pthread_mutex_lock(&_lock);      T _temp = _list.front( );     _list.pop_front( );     pthread_mutex_unlock(&_lock);     return _temp; }       上述代碼是有效的。但是,請考慮這樣的情況:您有一個很長的隊列(可能包含超過 100,000 個元素),而且在代碼執行期間的某個時候,從隊列中讀取數據的線程遠遠多於添加數據的線程。因為添加和取出數據操作使用相同的互斥鎖,所以讀取數據的速度會影響寫數據的線程訪問鎖。那麼,使用兩個鎖怎麼樣?一個鎖用於讀取操作,另一個用於寫操作。給出修改後的 Queue 類。   代碼如下: template <typename T> class Queue  {  public:      Queue( )      {          pthread_mutex_init(&_rlock, NULL);          pthread_mutex_init(&_wlock, NULL);     }      ~Queue( )      {          pthread_mutex_destroy(&_rlock);         pthread_mutex_destroy(&_wlock);     }      void push(const T& data);     T pop( );  private:      list<T> _list;      pthread_mutex_t _rlock, _wlock; };     template <typename T> void Queue<T>::push(const T& value )  {      pthread_mutex_lock(&_wlock);     _list.push_back(value);     pthread_mutex_unlock(&_wlock); }   template <typename T> T Queue<T>::pop( )  {      if (_list.empty( ))      {          throw "element not found";     }     pthread_mutex_lock(&_rlock);     T _temp = _list.front( );     _list.pop_front( );     pthread_mutex_unlock(&_rlock);     return _temp; }       設計並發阻塞隊列   目前,如果讀線程試圖從沒有數據的隊列讀取數據,僅僅會拋出異常並繼續執行。但是,這種做法不總是我們想要的,讀線程很可能希望等待(即阻塞自身),直到有數據可用時為止。這種隊列稱為阻塞的隊列。如何讓讀線程在發現隊列是空的之後等待?一種做法是定期輪詢隊列。但是,因為這種做法不保證隊列中有數據可用,它可能會導致浪費大量 CPU 周期。推薦的方法是使用條件變量,即 pthread_cond_t 類型的變量。    代碼如下: template <typename T> class BlockingQueue  {  public:      BlockingQueue ( )      {          pthread_mutexattr_init(&_attr);          // set lock recursive         pthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP);          pthread_mutex_init(&_lock,&_attr);         pthread_cond_init(&_cond, NULL);     }      ~BlockingQueue ( )      {          pthread_mutex_destroy(&_lock);         pthread_cond_destroy(&_cond);     }      void push(const T& data);     bool push(const T& data, const int seconds); //time-out push     T pop( );     T pop(const int seconds); // time-out pop   private:      list<T> _list;      pthread_mutex_t _lock;     pthread_mutexattr_t _attr;     pthread_cond_t _cond; };   template <typename T> T BlockingQueue<T>::pop( )  {      pthread_mutex_lock(&_lock);     while (_list.empty( ))      {          pthread_cond_wait(&_cond, &_lock) ;     }     T _temp = _list.front( );     _list.pop_front( );     pthread_mutex_unlock(&_lock);     return _temp; }   template <typename T> void BlockingQueue <T>::push(const T& value )  {      pthread_mutex_lock(&_lock);     const bool was_empty = _list.empty( );     _list.push_back(value);     pthread_mutex_unlock(&_lock);     if (was_empty)          pthread_cond_broadcast(&_cond); }       並發阻塞隊列設計有兩個要注意的方面:   1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 會釋放至少一個等待條件變量的線程,這個線程不一定是等待時間最長的讀線程。盡管使用 pthread_cond_signal 不會損害阻塞隊列的功能,但是這可能會導致某些讀線程的等待時間過長。   2.可能會出現虛假的線程喚醒。因此,在喚醒讀線程之後,要確認列表非空,然後再繼續處理。強烈建議使用基於 while 循環的 pop()。   設計有超時限制的並發阻塞隊列   在許多系統中,如果無法在特定的時間段內處理新數據,就根本不處理數據了。例如,新聞頻道的自動收報機顯示來自金融交易所的實時股票行情,它每 n 秒收到一次新數據。如果在 n 秒內無法處理以前的一些數據,就應該丟棄這些數據並顯示最新的信息。根據這個概念,我們來看看如何給並發隊列的添加和取出操作增加超時限制。這意味著,如果系統無法在指定的時間限制內執行添加和取出操作,就應該根本不執行操作。     代碼如下: template <typename T> bool BlockingQueue <T>::push(const T& data, const int seconds)  {     struct timespec ts1, ts2;     const bool was_empty = _list.empty( );     clock_gettime(CLOCK_REALTIME, &ts1);     pthread_mutex_lock(&_lock);     clock_gettime(CLOCK_REALTIME, &ts2);     if ((ts2.tv_sec – ts1.tv_sec) <seconds)      {         was_empty = _list.empty( );         _list.push_back(value);     }     pthread_mutex_unlock(&_lock);     if (was_empty)          pthread_cond_broadcast(&_cond); }   template <typename T> T BlockingQueue <T>::pop(const int seconds)  {      struct timespec ts1, ts2;      clock_gettime(CLOCK_REALTIME, &ts1);      pthread_mutex_lock(&_lock);     clock_gettime(CLOCK_REALTIME, &ts2);       // First Check: if time out when get the _lock      if ((ts1.tv_sec – ts2.tv_sec) < seconds)      {          ts2.tv_sec += seconds; // specify wake up time         while(_list.empty( ) && (result == 0))          {              result = pthread_cond_ti
copyright © 萬盛學電腦網 all rights reserved