The producer and consumer problem we visited earlier used CRITICAL_SECTION
for synchronization and the threads were spinning and checking over the queue size to add or remove the elements. In this post, we are going to use Windows Event objects for synchronizing the Producer and Consumer threads.
Key Points
1. Events are kernel objects which contain an usage count, flag indicating the type of event (manual-reset or auto-reset) and another flag indicating the state (signaled or non-signaled)
2. Applications use Event Objects to notify the waiting threads that an operation has been completed
3. An event is signaled using the callSetEvent(Handle hEvent)
4. When a manual-reset event is signaled, all threads waiting on the event become schedulable. When an auto-reset event is signaled, only one of the threads waiting on the event becomes schedulable
Event uses five functions
CreateEvent
, OpenEvent
, SetEvent
, ResetEvent
, and PulseEvent
. The functions I will be using in my examples are:HANDLE CreateEvent( LPSECURITY_ATTRIBUTES lpEventAttributes, BOOL bManualReset, BOOL bInitialState, LPCTSTR lpName );
BOOL SetEvent( HANDLE hEvent );
BOOL ResetEvent( HANDLE hEvent );
The Event class below is a wrapper over windows manual-reset Event Object, provides easy interface to use and also takes care of the cleanup activities for the event handles.
#ifndef _Event_H_ #define _Event_H_ //File: Event.h #include <windows.h> #include <process.h> #include <iostream> namespace examples { class Event : public NonCopyable { public: Event() { m_evt = (HANDLE) ::CreateEvent(0, true, false, 0); if(NULL == m_evt) { std::cout << "ERROR: Cannot create event" << std::endl; } } ~Event() { ::CloseHandle(m_evt); } bool wait(size_t timeout) { bool retval = false; switch(::WaitForSingleObjectEx(m_evt, timeout == 0 ? INFINITE : timeout, false)) { case WAIT_OBJECT_0: retval = true; break; default: std::cout << "ERROR: Wait Failed " << ::GetLastError() << std::endl; break; } ::ResetEvent(m_evt); return retval; } bool signal() { return ::SetEvent(m_evt); } private: HANDLE m_evt; }; } #endif //_Event_H_The Shared Message Queue Class
#ifndef _MQ_H_ #define _MQ_H_ //File: MQ.h #include <iostream> #include <deque> #include "Lock.h" #include "Event.h" using namespace std; namespace examples { class MQ { public: MQ(size_t size = 10) : m_max_size(size){} ~MQ() { m_q.clear(); } void add(const string& elem) { m_qlock.acquire(); while(isFull()) { m_qlock.release(); m_evtProducer.wait(0); m_qlock.acquire(); } m_q.push_back(elem); debug_print("Producer"); m_evtConsumer.signal(); m_qlock.release(); } void remove() { m_qlock.acquire(); while(isEmpty()) { m_qlock.release(); m_evtConsumer.wait(0); m_qlock.acquire(); } m_q.pop_front(); debug_print("Consumer"); m_evtProducer.signal(); m_qlock.release(); } bool isEmpty() const { return m_q.size() == 0; } bool isFull() const { return m_q.size() >= m_max_size - 1; } private: void debug_print(const std::string& str) const { std::cout << str.c_str() << " [" << ::GetCurrentThreadId() << "] Size=[" << m_q.size() << "]"; std::cout << endl; } const size_t m_max_size; deque<const string> m_q; Lock m_qlock; Event m_evtProducer; Event m_evtConsumer; }; } #endif //_MQ_H_Visit earlier posts for Thread creation and Implementation: 1. Producer and Consumer Problem Using Windows Threads 2. Encapsulating Windows Threads in C++ Objects The Client Code
#include <iostream> #include "MQ.h" #include "Thread.h" #include "Runnable.h" #include "Producer.h" #include "Consumer.h" //File: Main.cpp using namespace examples; int main() { try { MQ q(10); Producer producer(q); Consumer consumer(q); Thread t[4] = { producer, producer, consumer, consumer }; //start the producer and consumer threads t[0].start(); t[1].start(); t[2].start(); t[3].start(); t[0].join(20000); t[1].join(20000); t[2].join(20000); t[3].join(20000); std::cout << std::endl << "Threads timed out!!" << std::endl; }catch(std::exception& e) { std::cerr << e.what() << std::endl; }catch(...) { std::cerr << "Unknown Exception" << std::endl; } return 0; }The Producer Header File
#ifndef _Producer_H_ #define _Producer_H_ //File: Producer.h #include "Runnable.h" #include "MQ.h" namespace examples { class Producer : public Runnable { public: Producer(MQ &q); ~Producer(); virtual void run(); private: MQ& m_queue; }; } #endif //_Producer_H_The Producer Class File
#include "Producer.h" //File: Producer.cpp using namespace examples; Producer::Producer(MQ &q) : m_queue(q) {} Producer::~Producer() {} void Producer::run() { while(true) { m_queue.add("Object"); } }The Consumer Header File
#ifndef _Consumer_H_ #define _Consumer_H_ //File: Consumer.h #include "Runnable.h" #include "MQ.h" namespace examples { class Consumer : public Runnable { public: Consumer(MQ &q); ~Consumer(); virtual void run(); private: MQ& m_queue; }; } #endif //_Consumer_H_The Consumer Class File
#include "Consumer.h" //File: Consumer.cpp using namespace examples; Consumer::Consumer(MQ &q) : m_queue(q) {} Consumer::~Consumer() {} void Consumer::run() { while(true) { m_queue.remove(); } }Sample Run:
No comments :
Post a Comment