A Developer's Diary

Mar 2, 2011

Producer and Consumer Problem revisited with Events

The producer and consumer problem we visited earlier used CRITICAL_SECTION for synchronization and the threads were spinning and checking over the queue size to add or remove the elements. In this post, we are going to use Windows Event objects for synchronizing the Producer and Consumer threads.

Key Points
1. Events are kernel objects which contain an usage count, flag indicating the type of event (manual-reset or auto-reset) and another flag indicating the state (signaled or non-signaled)
2. Applications use Event Objects to notify the waiting threads that an operation has been completed
3. An event is signaled using the call
SetEvent(Handle hEvent)

4. When a manual-reset event is signaled, all threads waiting on the event become schedulable. When an auto-reset event is signaled, only one of the threads waiting on the event becomes schedulable

Event uses five functions CreateEvent, OpenEvent, SetEvent, ResetEvent, and PulseEvent. The functions I will be using in my examples are:

HANDLE CreateEvent(
  LPSECURITY_ATTRIBUTES lpEventAttributes,
  BOOL bManualReset,
  BOOL bInitialState,
  LPCTSTR lpName
);
BOOL SetEvent(
  HANDLE hEvent
);
BOOL ResetEvent(
  HANDLE hEvent
);

The Event class below is a wrapper over windows manual-reset Event Object, provides easy interface to use and also takes care of the cleanup activities for the event handles.
#ifndef _Event_H_
#define _Event_H_

//File: Event.h

#include <windows.h>
#include <process.h>
#include <iostream>

namespace examples
{
    class Event : public NonCopyable
    {
    public:
        Event()
        {
            m_evt = (HANDLE) ::CreateEvent(0, true, false, 0);
            if(NULL == m_evt)
            {
                std::cout <<  "ERROR: Cannot create event" << std::endl;
            }
        }

        ~Event()
        {
            ::CloseHandle(m_evt);
        }

        bool wait(size_t timeout)
        {
            bool retval = false;

            switch(::WaitForSingleObjectEx(m_evt, timeout == 0 ? INFINITE : timeout, false))
            {
            case WAIT_OBJECT_0:
                retval = true;
                break;
            default:
                std::cout << "ERROR: Wait Failed " << ::GetLastError() << std::endl;
                break;
            }
            ::ResetEvent(m_evt);

            return retval;
        }

        bool signal()
        {
            return ::SetEvent(m_evt);
        }

    private:
        HANDLE m_evt;
    };
}

#endif //_Event_H_
The Shared Message Queue Class
#ifndef _MQ_H_
#define _MQ_H_

//File: MQ.h

#include <iostream>
#include <deque>
#include "Lock.h"
#include "Event.h"

using namespace std;

namespace examples
{
    class MQ
    {
    public:
        MQ(size_t size = 10) : m_max_size(size){}

        ~MQ()
        {
            m_q.clear();
        }

        void add(const string& elem)
        {
            m_qlock.acquire();
            while(isFull())
            {
                m_qlock.release();
                m_evtProducer.wait(0);
                m_qlock.acquire();
            }
            m_q.push_back(elem);
            debug_print("Producer");
            m_evtConsumer.signal();
            m_qlock.release();
        }

        void remove()
        {
            m_qlock.acquire();
            while(isEmpty())
            {
                m_qlock.release();
                m_evtConsumer.wait(0);
                m_qlock.acquire();
            }
            m_q.pop_front();
            debug_print("Consumer");
            m_evtProducer.signal();
            m_qlock.release();
        }

        bool isEmpty() const
        {
            return m_q.size() == 0;
        }

        bool isFull() const
        {
            return m_q.size() >= m_max_size - 1;
        }

    private:

        void debug_print(const std::string& str) const
        {
            std::cout << str.c_str() << "  [" << ::GetCurrentThreadId() << "] Size=[" << m_q.size() << "]";
            std::cout << endl;
        }

        const size_t   m_max_size;
        deque<const string>   m_q;
        Lock           m_qlock;
        Event          m_evtProducer;
        Event          m_evtConsumer;
    };

}

#endif //_MQ_H_
Visit earlier posts for Thread creation and Implementation: 1. Producer and Consumer Problem Using Windows Threads 2. Encapsulating Windows Threads in C++ Objects The Client Code
#include <iostream>
#include "MQ.h"
#include "Thread.h"
#include "Runnable.h"
#include "Producer.h"
#include "Consumer.h"

//File: Main.cpp

using namespace examples;

int main()
{
    try
    {
        MQ q(10);
        Producer producer(q);
        Consumer consumer(q);
        Thread t[4] = { producer, producer, consumer, consumer };

        //start the producer and consumer threads
        t[0].start();
        t[1].start();
        t[2].start();
        t[3].start();

        t[0].join(20000);
        t[1].join(20000);
        t[2].join(20000);
        t[3].join(20000);
        std::cout << std::endl << "Threads timed out!!" << std::endl;

    }catch(std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }catch(...)
    {
        std::cerr << "Unknown Exception" << std::endl;
    }
    return 0;
}
The Producer Header File
#ifndef _Producer_H_
#define _Producer_H_

//File: Producer.h

#include "Runnable.h"
#include "MQ.h"

namespace examples
{
    class Producer : public Runnable
    {
    public:
        Producer(MQ &q);
        ~Producer();
        virtual void run();

    private:
        MQ&  m_queue;
    };

}

#endif //_Producer_H_
The Producer Class File
#include "Producer.h"

//File: Producer.cpp

using namespace examples;

Producer::Producer(MQ &q) : m_queue(q)
{}

Producer::~Producer()
{}

void Producer::run()
{
    while(true)
    {
        m_queue.add("Object");
    }
}
The Consumer Header File
#ifndef _Consumer_H_
#define _Consumer_H_

//File: Consumer.h

#include "Runnable.h"
#include "MQ.h"

namespace examples
{
    class Consumer : public Runnable
    {
    public:
        Consumer(MQ &q);
        ~Consumer();
        virtual void run();

    private:
        MQ&  m_queue;
    };

}

#endif //_Consumer_H_
The Consumer Class File
#include "Consumer.h"

//File: Consumer.cpp

using namespace examples;

Consumer::Consumer(MQ &q) : m_queue(q)
{}

Consumer::~Consumer()
{}

void Consumer::run()
{
    while(true)
    {
        m_queue.remove();
    }
}
Sample Run:

No comments :

Post a Comment