• Main Page
  • Related Pages
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

include/FCam/TSQueue.h

Go to the documentation of this file.
00001 #ifndef FCAM_TSQUEUE_H
00002 #define FCAM_TSQUEUE_H
00003 
00004 #include <deque>
00005 #include <iterator>
00006 #include <semaphore.h>
00007 #include <pthread.h>
00008 
00009 #include <errno.h>
00010 #include <string.h>
00011 
00012 //#include "../../src/Debug.h"
00013 
00023 namespace FCam {
00024 
00029     template<typename T>
00030     class TSQueue {
00031     public:
00032         TSQueue();
00033         ~TSQueue();
00034 
00037         class locking_iterator;
00038 
00040         void push(const T& val);
00042         void pop();
00043 
00045         void pushFront(const T& val);
00047         void popBack();
00048 
00051         T& front();
00052         const T& front() const;
00053 
00056         T& back();
00057         const T& back() const;
00058 
00060         bool empty() const;
00062         size_t size() const;
00063         
00066         bool wait(unsigned int timeout=0);
00067 
00075         T pull();
00076 
00078         T pullBack();
00079 
00082         bool tryPull(T *);
00083         bool tryPullBack(T *);
00084 
00088         locking_iterator begin();
00092         locking_iterator end();
00100         bool erase(TSQueue<T>::locking_iterator);
00101         
00102     private:
00103         std::deque<T> q;
00104         mutable pthread_mutex_t mutex;
00105         sem_t *sem;
00106 
00107         friend class locking_iterator;
00108     };
00109 
00110     template<typename T>
00111     class TSQueue<T>::locking_iterator: public std::iterator< std::random_access_iterator_tag, T> {
00112     public:
00113         locking_iterator();
00114         locking_iterator(TSQueue<T> *, typename std::deque<T>::iterator i);
00115         locking_iterator(const TSQueue<T>::locking_iterator &);
00116         ~locking_iterator();
00117         locking_iterator& operator=(const TSQueue<T>::locking_iterator &);
00118       
00119         locking_iterator& operator++();
00120         locking_iterator operator++(int);
00121         locking_iterator& operator--();
00122         locking_iterator operator--(int);
00123       
00124         locking_iterator operator+(int);
00125         locking_iterator operator-(int);
00126 
00127         locking_iterator& operator+=(int);
00128         locking_iterator& operator-=(int);
00129 
00130         int operator-(const TSQueue<T>::locking_iterator &);
00131 
00132         bool operator==(const TSQueue<T>::locking_iterator &other);
00133         bool operator!=(const TSQueue<T>::locking_iterator &other);
00134         bool operator<(const TSQueue<T>::locking_iterator &other);
00135         bool operator>(const TSQueue<T>::locking_iterator &other);
00136         bool operator<=(const TSQueue<T>::locking_iterator &other);
00137         bool operator>=(const TSQueue<T>::locking_iterator &other);
00138 
00139         T& operator*();
00140         T* operator->();
00141     private:
00142         TSQueue *parent;
00143         typename std::deque<T>::iterator qi;
00144       
00145         friend class TSQueue<T>;
00146     };
00147 
00148     template<typename T>
00149     TSQueue<T>::TSQueue() {
00150         pthread_mutexattr_t mutexAttr;
00151         pthread_mutexattr_init(&mutexAttr);
00152         pthread_mutexattr_settype(&mutexAttr, PTHREAD_MUTEX_RECURSIVE);
00153         pthread_mutex_init(&mutex, &mutexAttr);
00154 #ifdef FCAM_PLATFORM_OSX
00155         // unnamed semaphores not supported on OSX
00156         char semName[256];
00157         // Create a unique semaphore name for this TSQueue using its pointer value
00158         snprintf(semName, 256, "FCam::TSQueue::sem::%llx", (long long unsigned)this);
00159         sem = sem_open(semName, O_CREAT, 666, 0);
00160 #else
00161         sem = new sem_t;
00162         sem_init(sem, 0, 0);
00163 #endif
00164         //dprintf(6,"TSQueue %llx initialized\n", (long long unsigned)this);
00165     }
00166     
00167     template<typename T>
00168     TSQueue<T>::~TSQueue() {
00169         pthread_mutex_destroy(&mutex);       
00170 #ifdef FCAM_PLATFORM_OSX
00171         sem_close(sem);
00172         char semName[256];
00173         // Recreate the unique semaphore name for this TSQueue using its pointer value
00174         snprintf(semName, 256, "FCam::TSQueue::sem::%llx", (long long unsigned)this);
00175         sem_unlink(semName);
00176 #else
00177         sem_destroy(sem);
00178         delete sem;
00179 #endif
00180     }
00181     
00182     template<typename T>
00183     void TSQueue<T>::push(const T& val) {
00184         //dprintf(6, "Pushing to queue %llx\n",(long long unsigned)this);
00185         pthread_mutex_lock(&mutex);
00186         q.push_back(val);
00187         pthread_mutex_unlock(&mutex);
00188         sem_post(sem);
00189     }
00190 
00191     template<typename T>
00192     void TSQueue<T>::pushFront(const T& val) {
00193         pthread_mutex_lock(&mutex);
00194         q.push_front(val);
00195         pthread_mutex_unlock(&mutex);
00196         sem_post(sem);
00197     }
00198 
00199     template<typename T>
00200     void TSQueue<T>::pop() {
00201         // Only safe for a single consumer!!!
00202         sem_wait(sem);
00203         pthread_mutex_lock(&mutex);
00204         q.pop_front();
00205         pthread_mutex_unlock(&mutex);
00206     }
00207 
00208     template<typename T>
00209     void TSQueue<T>::popBack() {
00210         // Only safe for a single consumer!!!
00211         sem_wait(sem);
00212         pthread_mutex_lock(&mutex);
00213         q.pop_back();
00214         pthread_mutex_unlock(&mutex);
00215     }   
00216 
00217     template<typename T>
00218     T& TSQueue<T>::front() {
00219         pthread_mutex_lock(&mutex);
00220         T &val = q.front();
00221         pthread_mutex_unlock(&mutex);
00222         return val;
00223     }
00224 
00225     template<typename T>
00226     const T& TSQueue<T>::front() const{
00227         const T &val;
00228         pthread_mutex_lock(&mutex);
00229         val = q.front();
00230         pthread_mutex_unlock(&mutex);
00231         return val;
00232     }
00233 
00234     template<typename T>
00235     T& TSQueue<T>::back() {
00236         T &val;
00237         pthread_mutex_lock(&mutex);
00238         val = q.back();
00239         pthread_mutex_unlock(&mutex);
00240         return val;
00241     }
00242 
00243     template<typename T>
00244     const T& TSQueue<T>::back() const {
00245         const T &val;
00246         pthread_mutex_lock(&mutex);
00247         val = q.back();
00248         pthread_mutex_unlock(&mutex);
00249         return val;
00250     }
00251 
00252     template<typename T>
00253     bool TSQueue<T>::empty() const {
00254         bool _empty;
00255         pthread_mutex_lock(&mutex);
00256         _empty = q.empty();
00257         pthread_mutex_unlock(&mutex);
00258         return _empty;
00259     }
00260 
00261     template<typename T>
00262     size_t TSQueue<T>::size() const {
00263         size_t _size;
00264         pthread_mutex_lock(&mutex);
00265         _size = q.size();
00266         pthread_mutex_unlock(&mutex);
00267         return _size;
00268     }
00269 
00270     template<typename T>
00271     bool TSQueue<T>::wait(unsigned int timeout) {
00272         bool res;
00273         int err;        
00274         if (timeout == 0) {
00275             err=sem_wait(sem);
00276         } else {
00277 #ifndef FCAM_PLATFORM_OSX // No clock_gettime or sem_timedwait on OSX 
00278             timespec tv;
00279             // This will overflow around 1 hour or so of timeout
00280             clock_gettime(CLOCK_REALTIME, &tv);
00281             tv.tv_nsec += timeout*1000;
00282             tv.tv_sec += tv.tv_nsec / 1000000000;
00283             tv.tv_nsec = tv.tv_nsec % 1000000000;
00284             err=sem_timedwait(sem, &tv);
00285 #else
00286         err=sem_trywait(sem);
00287 #endif
00288         }
00289         if (err == -1) {
00290             switch (errno) {
00291             case EINTR:
00292             case ETIMEDOUT:
00293                 res = false;
00294                 break;
00295             default:
00296                 //error(Event::InternalError, "TSQueue::wait: Unexpected error from wait on semaphore: %s", strerror(errno));
00297                 res = false;
00298                 break;
00299             }
00300         } else {
00301             res = true;
00302             sem_post(sem); // Put back the semaphore since we're not actually popping
00303         }
00304 
00305         return res;
00306     }
00307 
00308     template<typename T>
00309     T TSQueue<T>::pull() {
00310         //dprintf(6, "Pulling from queue %llx\n",(long long unsigned)this);
00311         sem_wait(sem);
00312         pthread_mutex_lock(&mutex);
00313         T copyVal = q.front();
00314         q.pop_front();
00315         pthread_mutex_unlock(&mutex);
00316         //dprintf(6, "Done pull from queue %llx\n",(long long unsigned)this);
00317         return copyVal;
00318     }
00319 
00320     template<typename T>
00321     T TSQueue<T>::pullBack() {
00322         sem_wait(sem);
00323         pthread_mutex_lock(&mutex);
00324         T copyVal = q.back();
00325         q.pop_back();
00326         pthread_mutex_unlock(&mutex);
00327         return copyVal;
00328     }
00329     
00330     template<typename T>
00331     bool TSQueue<T>::tryPull(T *ptr) {
00332         if (sem_trywait(sem)) return false;
00333         pthread_mutex_lock(&mutex);
00334         T copyVal = q.front();
00335         q.pop_front();
00336         pthread_mutex_unlock(&mutex);
00337         *ptr = copyVal;
00338         return true;
00339     }
00340     
00341     template<typename T>
00342     bool TSQueue<T>::tryPullBack(T *ptr) {
00343         if (sem_trywait(sem)) return false;
00344         pthread_mutex_lock(&mutex);
00345         T copyVal = q.back();
00346         q.pop_back();
00347         pthread_mutex_unlock(&mutex);
00348         *ptr = copyVal;
00349         return true;
00350     }
00351 
00352     template<typename T>
00353     typename TSQueue<T>::locking_iterator TSQueue<T>::begin() {
00354         return locking_iterator(this, q.begin());
00355     }
00356 
00357     template<typename T>
00358     typename TSQueue<T>::locking_iterator TSQueue<T>::end() {
00359         return locking_iterator(this, q.end());
00360     }
00361 
00362     template<typename T>
00363     bool TSQueue<T>::erase(TSQueue<T>::locking_iterator li) {
00364         /* Since we're erasing, decrement semaphore. */
00365         if (sem_trywait(sem)) {
00366             return false;
00367         }
00368         q.erase(li.qi);
00369         return true;
00370     }
00371 
00372     template<typename T>
00373     TSQueue<T>::locking_iterator::locking_iterator() : parent(NULL), qi()
00374     {}
00375 
00376     template<typename T>
00377     TSQueue<T>::locking_iterator::locking_iterator(TSQueue<T> *p, 
00378                                                    typename std::deque<T>::iterator i) : 
00379         parent(p), qi(i) 
00380     {
00381         if (parent) {
00382             pthread_mutex_lock(&(parent->mutex));
00383         }
00384     }
00385 
00386     template<typename T>
00387     TSQueue<T>::locking_iterator::locking_iterator(const TSQueue<T>::locking_iterator &other):
00388         parent(other.parent), qi(other.qi)
00389     {
00390         if (parent) {
00391             pthread_mutex_lock(&(parent->mutex));
00392         }
00393     }
00394 
00395     template<typename T>
00396     TSQueue<T>::locking_iterator::~locking_iterator() {
00397         if (parent) {
00398             pthread_mutex_unlock(&(parent->mutex));
00399         }
00400     }
00401 
00402     template<typename T>
00403     typename TSQueue<T>::locking_iterator &TSQueue<T>::locking_iterator::operator=(const TSQueue<T>::locking_iterator &other) {
00404         if (&other == this) return (*this);
00405         if (parent &&
00406             other.qi == qi) return (*this);
00407         
00408         if (parent) pthread_mutex_unlock(&(parent->mutex));
00409         parent = other.parent;
00410         qi = other.qi;
00411         if (parent) pthread_mutex_lock(&(parent->mutex));
00412 
00413     }
00414 
00415     template<typename T>
00416     typename TSQueue<T>::locking_iterator &TSQueue<T>::locking_iterator::operator++() {
00417         qi++;
00418         return (*this);
00419     }
00420 
00421     template<typename T>
00422     typename TSQueue<T>::locking_iterator TSQueue<T>::locking_iterator::operator++(int) {
00423         typename TSQueue<T>::locking_iterator temp(*this);
00424         qi++;
00425         return temp;
00426     }
00427 
00428     template<typename T>
00429     typename TSQueue<T>::locking_iterator &TSQueue<T>::locking_iterator::operator--() {
00430         qi--;
00431         return (*this);
00432     }
00433 
00434     template<typename T>
00435     typename TSQueue<T>::locking_iterator TSQueue<T>::locking_iterator::operator--(int) {
00436         typename TSQueue<T>::locking_iterator temp(*this);
00437         qi--;
00438         return temp;
00439     }
00440 
00441     template<typename T>
00442     typename TSQueue<T>::locking_iterator TSQueue<T>::locking_iterator::operator+(int n) {
00443         typename TSQueue<T>::locking_iterator temp(*this);
00444         temp+=n;
00445         return temp;
00446     }
00447 
00448     template<typename T>
00449     typename TSQueue<T>::locking_iterator operator+(int n, 
00450                                                     typename TSQueue<T>::locking_iterator l) {
00451         return l+n;
00452     }
00453 
00454     template<typename T>
00455     typename TSQueue<T>::locking_iterator TSQueue<T>::locking_iterator::operator-(int n) {
00456         typename TSQueue<T>::locking_iterator temp(*this);
00457         temp-=n;
00458         return temp;
00459     }
00460 
00461     template<typename T>
00462     typename TSQueue<T>::locking_iterator &TSQueue<T>::locking_iterator::operator+=(int n) {
00463         qi += n;
00464         return *this;
00465     }
00466 
00467     template<typename T>
00468     typename TSQueue<T>::locking_iterator &TSQueue<T>::locking_iterator::operator-=(int n) {
00469         qi -= n;
00470         return *this;
00471     }
00472 
00473     template<typename T>
00474     int TSQueue<T>::locking_iterator::operator-(const TSQueue<T>::locking_iterator &other) {
00475         return qi - other.qi;
00476     }
00477     
00478     template<typename T>
00479     bool TSQueue<T>::locking_iterator::operator==(const TSQueue<T>::locking_iterator &other) {
00480         return qi == other.qi;
00481     }
00482 
00483     template<typename T>
00484     bool TSQueue<T>::locking_iterator::operator!=(const TSQueue<T>::locking_iterator &other) {
00485         return qi != other.qi;
00486     }
00487 
00488     template<typename T>
00489     bool TSQueue<T>::locking_iterator::operator<(const TSQueue<T>::locking_iterator &other) {
00490         return qi < other.qi;
00491     }
00492 
00493     template<typename T>
00494     bool TSQueue<T>::locking_iterator::operator>(const TSQueue<T>::locking_iterator &other) {
00495         return qi > other.qi;
00496     }
00497 
00498     template<typename T>
00499     bool TSQueue<T>::locking_iterator::operator<=(const TSQueue<T>::locking_iterator &other) {
00500         return qi <= other.qi;
00501     }
00502 
00503     template<typename T>
00504     bool TSQueue<T>::locking_iterator::operator>=(const TSQueue<T>::locking_iterator &other) {
00505         return qi >= other.qi;
00506     }
00507 
00508     template<typename T>
00509     T& TSQueue<T>::locking_iterator::operator*() {
00510         return *qi;
00511     }
00512 
00513     template<typename T>
00514     T* TSQueue<T>::locking_iterator::operator->() {
00515         return &(*qi);
00516     }
00517 
00518 }
00519 
00520 #endif

Generated on Thu Jul 15 2010 17:51:28 for FCam by  doxygen 1.7.1