Feb 14, 2011

Producer and Consumer Problem using Windows Threads

Above is class diagram for a simple producer/consumer problem
#include <iostream>
#include "MQ.h"
#include "Thread.h"
#include "Runnable.h"
#include "ProductionTask.h"
#include "ConsumptionTask.h"

//File: Main.cpp

using namespace examples;

int main()
{
    try
    {
        MQ q(10);
        ProductionTask producerTask(q);
        ConsumptionTask consumerTask(q);
        Thread t[2] = { producerTask, consumerTask };

        //start the producer and consumer threads
        t[0].start();
        t[1].start();

        //wait 50000 ms before terminating the threads
        t[0].join(50000);
        t[1].join(50000);
        std::cout << std::endl
            << "Threads timed out!!" << std::endl;

    }catch(std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }catch(...)
    {
        std::cerr << "Unknown Exception" << std::endl;
    }
    return 0;
}
Key Features

1. Threads are Tasks are decoupled. A thread can execute any task provided it implements the Runnable interface.
2. Windows threads are encapsulated using the technique demonstrated in the earlier blog post Encapsulating Windows Threads in C++ Objects
3. Two Tasks ProductionTask and ConsumptionTask share the common MQ instance q and try to execute add and remove operations on q concurrently. Synchronization is achieved here by means of Lock implemented using Windows CRITICAL_SECTION
4. The threads are allowed to run for 50 seconds before they are terminated. This is not a graceful termination and in real world applications may lead to unexpected results
#ifndef _Runnable_H_
#define _Runnable_H_

//File: Runnable.h

#include <iostream>
#include "NonCopyable.h"

namespace examples
{
    class Runnable
    {
    public:
        virtual ~Runnable(){}
        virtual void run() = 0;
    };

}

#endif //_Runnable_H_
#ifndef _ConsumptionTask_H_
#define _ConsumptionTask_H_

//File: ConsumptionTask.h

#include "Runnable.h"
#include "MQ.h"

namespace examples
{
    class ConsumptionTask : public Runnable
    {
    public:
        ConsumptionTask(MQ &q);
        ~ConsumptionTask();
        virtual void run();

    private:
        MQ&  m_queue;
    };

}

#endif //_ConsumptionTask_H_
#include "ConsumptionTask.h"

//File: ConsumptionTask.cpp

using namespace examples;

ConsumptionTask::ConsumptionTask(MQ &q) : m_queue(q)
{}

ConsumptionTask::~ConsumptionTask()
{}

void ConsumptionTask::run()
{
    while(true)
    {
        m_queue.remove();
        ::Sleep(550);
    }
}
#ifndef _ProductionTask_H_
#define _ProductionTask_H_

//File: ProductionTask.h

#include "Runnable.h"
#include "MQ.h"

namespace examples
{
    class ProductionTask : public Runnable
    {
    public:
        ProductionTask(MQ &q);
        ~ProductionTask();
        virtual void run();

    private:
        MQ&  m_queue;
    };

}

#endif //_ProductionTask_H_
#include "ProductionTask.h"

//File: ProductionTask.cpp

using namespace examples;

ProductionTask::ProductionTask(MQ &q) : m_queue(q)
{}

ProductionTask::~ProductionTask()
{}

void ProductionTask::run()
{
    while(true)
    {
        m_queue.add("Object");
        ::Sleep(500);
    }
}

#ifndef _Lock_H_
#define _Lock_H_

//File: Lock.h

#include <windows.h>
#include <process.h>
#include "NonCopyable.h"

namespace examples
{
    class Lock : public NonCopyable
    {
    public:
        Lock()
        {
            ::InitializeCriticalSection(&m_cs);
        }

        ~Lock()
        {
            ::DeleteCriticalSection(&m_cs);
        }

        void acquire()
        {
            ::EnterCriticalSection(&m_cs);
        }

        void release()
        {
            ::LeaveCriticalSection(&m_cs);
        }

    private:

        CRITICAL_SECTION m_cs;
    };

}

#endif //_Lock_H_
#ifndef _MQ_H_
#define _MQ_H_

//File: MQ.h

#include <iostream>
#include <deque>
#include "Lock.h"

using namespace std;


namespace examples
{
    class MQ
    {
    public:
        MQ(size_t SIZE = 10) : Q_MAX_SIZE(SIZE){}

        ~MQ()
        {
            m_q.clear();
        }

        void add(const string& elem)
        {
            m_lock.acquire();
            if(m_q.size() == Q_MAX_SIZE - 1)
            {
                std::cout << "Queue full" << std::endl;
                m_lock.release();
                ::Sleep(5000);
                return;
            }
            m_q.push_back(elem);
            debug_print();
            m_lock.release();
        }

        void remove()
        {
            m_lock.acquire();
            if(m_q.size() == 0)
            {
                std::cout << "Queue empty" << std::endl;
                m_lock.release();
                return;
            }
            m_q.pop_front();
            debug_print();
            m_lock.release();
        }

        void debug_print()
        {
            std::cout << "\r\b";
            std::cout << "Size=[" << m_q.size() << "]";
        }

    private:
        const size_t Q_MAX_SIZE;
        deque<const string>   m_q;
        Lock           m_lock;
    };

}

#endif //_MQ_H_
#ifndef _Thread_H_
#define _Thread_H_

//File: Thread.h

#include "NonCopyable.h"
#include "Runnable.h"

namespace examples
{
    class Thread : public NonCopyable
    {
    public:
        Thread(Runnable&);
        ~Thread();

        bool join();
        bool join(size_t ms);
        void start();
        void setName(const std::string&);
        const char* getName() const;

    private:
        Thread();

        class ThreadImpl *m_impl;
    };

}

#endif //_Thread_H_
#include "Thread.h"
#include "ThreadImpl.h"
#include <iostream>

//File: Thread.cpp

using namespace examples;

Thread::Thread(Runnable& task) : m_impl(new ThreadImpl(task))
{}

Thread::~Thread()
{}

bool Thread::join()
{
    return m_impl->join();
}

bool Thread::join(size_t ms)
{
    return m_impl->join(ms);
}

const char* Thread::getName() const
{
    return m_impl->getName();
}

void Thread::setName(const std::string& thrName)
{
    return m_impl->setName(thrName);
}

void Thread::start()
{
    m_impl->start();
}
#ifndef _ThreadImpl_H_
#define _ThreadImpl_H_

//File: ThreadImpl.h

#include <windows.h>
#include <process.h>
#include <iostream>
#include <string>
#include "NonCopyable.h"
#include "Runnable.h"

namespace examples
{
    class ThreadImpl : public NonCopyable
    {
    public:
        ThreadImpl(Runnable&);
        ThreadImpl(const std::string&, Runnable&);
        ~ThreadImpl();

        bool join() const;
        bool join(size_t) const;
        void start() const;
        void setName(const std::string&);
        const char* getName() const;

    private:
        ThreadImpl();

        static unsigned __stdcall dispatch(void *);
        bool spawn(Runnable&);

        HANDLE      m_hthread;
        unsigned    m_thrdid;
        std::string m_thrName;
    };

}

#endif //_ThreadImpl_H_
#include "ThreadImpl.h"
#include "Runnable.h"

//File: ThreadImpl.cpp

using namespace examples;

unsigned __stdcall ThreadImpl::dispatch(void *args)
{
    Runnable *task = static_cast<Runnable *>(args);
    task->run();
    return 0;
}

ThreadImpl::ThreadImpl(Runnable& task) : m_hthread(0), m_thrdid(0), m_thrName("")
{
    spawn(task);
}

ThreadImpl::ThreadImpl(const std::string& thrName, Runnable& task) : m_hthread(0), m_thrdid(0), m_thrName(thrName)
{
    spawn(task);
}

ThreadImpl::~ThreadImpl()
{
    ::CloseHandle(m_hthread);
    m_hthread = NULL;
}

const char* ThreadImpl::getName() const
{
    return m_thrName.c_str();
}

void ThreadImpl::setName(const std::string& thrName)
{
    m_thrName = thrName;
}

bool ThreadImpl::spawn(Runnable &task)
{
    m_hthread = (HANDLE) ::_beginthreadex(0, 0, &ThreadImpl::dispatch, &task, CREATE_SUSPENDED, &m_thrdid);
    return m_hthread != NULL;
}

bool ThreadImpl::join() const

{
    DWORD retval = ::WaitForSingleObjectEx(m_hthread, INFINITE, false);
    return retval == WAIT_OBJECT_0;
}

bool ThreadImpl::join(size_t ms) const
{
    DWORD retval = ::WaitForSingleObjectEx(m_hthread, ms, false);
    return retval == WAIT_OBJECT_0;
}

void ThreadImpl::start() const
{
    ::ResumeThread(m_hthread);
}

Sample Run of the Program:

1 comment :

Priya Kannan said...

Somebody necessarily help to make severely posts I might state. This is the first time I frequented your website page and to this point? I surprised with the research you made to create this particular post extraordinary. Well done admin..
SQL Server Training in Chennai

Post a Comment