dlvhex
2.5.0
|
00001 /* DMCS -- Distributed Nonmonotonic Multi-Context Systems. 00002 * Copyright (C) 2006-2015 Thomas Krennwallner 00003 * 00004 * This file is part of DMCS. 00005 * 00006 * DMCS is free software: you can redistribute it and/or modify 00007 * it under the terms of the GNU General Public License as published by 00008 * the Free Software Foundation, either version 3 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * DMCS is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU General Public License 00017 * along with DMCS. If not, see <http://www.gnu.org/licenses/>. 00018 */ 00019 00029 #ifndef _CONCURRENT_MESSAGE_QUEUE_OWNING_H 00030 #define _CONCURRENT_MESSAGE_QUEUE_OWNING_H 00031 00032 #include <boost/shared_ptr.hpp> 00033 #include <boost/thread/mutex.hpp> 00034 #include <boost/thread/condition_variable.hpp> 00035 #include <boost/thread/thread_time.hpp> 00036 #include <boost/date_time/time_duration.hpp> 00037 00038 #include <queue> 00039 00040 namespace dlvhex 00041 { 00042 00047 template<class MessageBase> 00048 class ConcurrentMessageQueueOwning 00049 { 00050 private: 00051 typedef boost::shared_ptr<MessageBase> MessagePtr; 00052 00054 std::queue<MessagePtr> q; 00056 const std::size_t n; 00058 std::size_t enq; 00060 std::size_t deq; 00061 00063 mutable boost::mutex mtx; 00065 boost::condition_variable cnd; 00066 00068 inline void 00069 notifyConsumer() { 00070 if (deq > 0) { // is some consumer waiting? 00071 // notify one consuming thread 00072 cnd.notify_one(); 00073 } 00074 } 00075 00077 inline void 00078 notifyProducer() { 00079 if (enq > 0) { // is some producer waiting? 00080 // notify one producing thread 00081 cnd.notify_one(); 00082 } 00083 } 00084 00086 inline void 00087 waitOnCapacity(boost::mutex::scoped_lock& lock) { 00088 // maximum capacity reached 00089 while (n == q.size()) { 00090 ++enq; 00091 cnd.wait(lock); 00092 --enq; 00093 } 00094 00095 notifyConsumer(); 00096 } 00097 00099 inline void 00100 waitOnEmpty(boost::mutex::scoped_lock& lock) { 00101 // minimum capacity reached 00102 while (q.empty()) { 00103 ++deq; 00104 cnd.wait(lock); 00105 --deq; 00106 } 00107 00108 notifyProducer(); 00109 } 00110 00115 inline bool 00116 waitOnTimedCapacity(boost::mutex::scoped_lock& lock, const boost::posix_time::time_duration& t) { 00117 bool no_timeout = true; 00118 00119 // maximum capacity reached 00120 while (n == q.size() && no_timeout) { 00121 ++enq; 00122 no_timeout = cnd.timed_wait(lock, t); 00123 --enq; 00124 } 00125 00126 notifyConsumer(); 00127 00128 return no_timeout; 00129 } 00130 00135 inline bool 00136 waitOnTimedEmpty(boost::mutex::scoped_lock& lock, const boost::posix_time::time_duration& t) { 00137 bool no_timeout = true; 00138 00139 // minimum capacity reached 00140 while (q.empty() && no_timeout) { 00141 ++deq; 00142 no_timeout = cnd.timed_wait(lock, t); 00143 --deq; 00144 } 00145 00146 notifyProducer(); 00147 00148 return no_timeout; 00149 } 00150 00153 inline void 00154 pushMessage (MessagePtr m) { 00155 q.push(m); 00156 } 00157 00160 inline void 00161 popMessage (MessagePtr& m) { 00162 m = q.front(); 00163 q.pop(); 00164 } 00165 00166 public: 00167 00169 ConcurrentMessageQueueOwning() 00170 : n(1), enq(0), deq(0) 00171 { } 00172 00178 ConcurrentMessageQueueOwning(std::size_t capacity) 00179 : n(capacity > 0 ? capacity : 1), enq(0), deq(0) 00180 { } 00181 00184 ConcurrentMessageQueueOwning(const ConcurrentMessageQueueOwning<MessageBase>& q) 00185 : n(q.n), enq(0), deq(0) 00186 { } 00187 00189 virtual 00190 ~ConcurrentMessageQueueOwning() { 00191 flush(); 00192 } 00193 00195 void flush() { { 00196 boost::mutex::scoped_lock lock(mtx); 00197 // just pop all elements from the queue (the smart pointers automatically 00198 // destruct the elements and free the memory) 00199 while (!q.empty()) 00200 q.pop(); 00201 } 00202 notifyProducer(); 00203 } 00204 00207 bool 00208 empty () const 00209 { 00210 boost::mutex::scoped_lock lock(mtx); 00211 return q.empty(); 00212 } 00213 00216 bool 00217 size () const 00218 { 00219 return n; 00220 } 00221 00225 void 00226 send (MessagePtr m, unsigned int prio) { 00227 boost::mutex::scoped_lock lock(mtx); 00228 waitOnCapacity(lock); 00229 pushMessage(m); 00230 } 00231 00236 bool 00237 try_send (MessagePtr m, unsigned int prio) { 00238 boost::mutex::scoped_lock lock(mtx); 00239 00240 if (q.size() < n) { 00241 pushMessage(m); 00242 notifyConsumer(); 00243 return true; 00244 } 00245 00246 return false; 00247 } 00248 00254 bool 00255 timed_send (MessagePtr m, unsigned int prio, const boost::posix_time::time_duration& t) { 00256 boost::mutex::scoped_lock lock(mtx); 00257 00258 if (waitOnTimedCapacity(lock, t)) { 00259 pushMessage(m); 00260 return true; 00261 } 00262 00263 return false; 00264 } 00265 00269 void 00270 receive (MessagePtr& m, unsigned int& prio) { 00271 boost::mutex::scoped_lock lock(mtx); 00272 waitOnEmpty(lock); 00273 popMessage(m); 00274 } 00275 00280 bool 00281 try_receive (MessagePtr& m, unsigned int prio) { 00282 boost::mutex::scoped_lock lock(mtx); 00283 00284 if (!q.empty()) { 00285 popMessage(m); 00286 notifyProducer(); 00287 return true; 00288 } 00289 00290 return false; 00291 } 00292 00298 bool 00299 timed_receive (MessagePtr& m, unsigned int& prio, const boost::posix_time::time_duration& t) { 00300 boost::mutex::scoped_lock lock(mtx); 00301 00302 if (waitOnTimedEmpty(lock, t)) { 00303 popMessage(m); 00304 return true; 00305 } 00306 00307 return false; 00308 } 00309 00310 }; 00311 00312 } // namespace dlvhex 00313 #endif // _CONCURRENT_MESSAGE_QUEUE_H 00314 00315 00316 // vim:expandtab:ts=4:sw=4: 00317 // mode: C++ 00318 // End: