A Developer's Diary

Feb 14, 2011

Producer and Consumer Problem using Windows Threads

Above is class diagram for a simple producer/consumer problem
#include <iostream>
#include "MQ.h"
#include "Thread.h"
#include "Runnable.h"
#include "ProductionTask.h"
#include "ConsumptionTask.h"

//File: Main.cpp

using namespace examples;

int main()
{
    try
    {
        MQ q(10);
        ProductionTask producerTask(q);
        ConsumptionTask consumerTask(q);
        Thread t[2] = { producerTask, consumerTask };

        //start the producer and consumer threads
        t[0].start();
        t[1].start();

        //wait 50000 ms before terminating the threads
        t[0].join(50000);
        t[1].join(50000);
        std::cout << std::endl
            << "Threads timed out!!" << std::endl;

    }catch(std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }catch(...)
    {
        std::cerr << "Unknown Exception" << std::endl;
    }
    return 0;
}
Key Features

1. Threads are Tasks are decoupled. A thread can execute any task provided it implements the Runnable interface.
2. Windows threads are encapsulated using the technique demonstrated in the earlier blog post Encapsulating Windows Threads in C++ Objects
3. Two Tasks ProductionTask and ConsumptionTask share the common MQ instance q and try to execute add and remove operations on q concurrently. Synchronization is achieved here by means of Lock implemented using Windows CRITICAL_SECTION
4. The threads are allowed to run for 50 seconds before they are terminated. This is not a graceful termination and in real world applications may lead to unexpected results
#ifndef _Runnable_H_
#define _Runnable_H_

//File: Runnable.h

#include <iostream>
#include "NonCopyable.h"

namespace examples
{
    class Runnable
    {
    public:
        virtual ~Runnable(){}
        virtual void run() = 0;
    };

}

#endif //_Runnable_H_
#ifndef _ConsumptionTask_H_
#define _ConsumptionTask_H_

//File: ConsumptionTask.h

#include "Runnable.h"
#include "MQ.h"

namespace examples
{
    class ConsumptionTask : public Runnable
    {
    public:
        ConsumptionTask(MQ &q);
        ~ConsumptionTask();
        virtual void run();

    private:
        MQ&  m_queue;
    };

}

#endif //_ConsumptionTask_H_
#include "ConsumptionTask.h"

//File: ConsumptionTask.cpp

using namespace examples;

ConsumptionTask::ConsumptionTask(MQ &q) : m_queue(q)
{}

ConsumptionTask::~ConsumptionTask()
{}

void ConsumptionTask::run()
{
    while(true)
    {
        m_queue.remove();
        ::Sleep(550);
    }
}
#ifndef _ProductionTask_H_
#define _ProductionTask_H_

//File: ProductionTask.h

#include "Runnable.h"
#include "MQ.h"

namespace examples
{
    class ProductionTask : public Runnable
    {
    public:
        ProductionTask(MQ &q);
        ~ProductionTask();
        virtual void run();

    private:
        MQ&  m_queue;
    };

}

#endif //_ProductionTask_H_
#include "ProductionTask.h"

//File: ProductionTask.cpp

using namespace examples;

ProductionTask::ProductionTask(MQ &q) : m_queue(q)
{}

ProductionTask::~ProductionTask()
{}

void ProductionTask::run()
{
    while(true)
    {
        m_queue.add("Object");
        ::Sleep(500);
    }
}

#ifndef _Lock_H_
#define _Lock_H_

//File: Lock.h

#include <windows.h>
#include <process.h>
#include "NonCopyable.h"

namespace examples
{
    class Lock : public NonCopyable
    {
    public:
        Lock()
        {
            ::InitializeCriticalSection(&m_cs);
        }

        ~Lock()
        {
            ::DeleteCriticalSection(&m_cs);
        }

        void acquire()
        {
            ::EnterCriticalSection(&m_cs);
        }

        void release()
        {
            ::LeaveCriticalSection(&m_cs);
        }

    private:

        CRITICAL_SECTION m_cs;
    };

}

#endif //_Lock_H_
#ifndef _MQ_H_
#define _MQ_H_

//File: MQ.h

#include <iostream>
#include <deque>
#include "Lock.h"

using namespace std;


namespace examples
{
    class MQ
    {
    public:
        MQ(size_t SIZE = 10) : Q_MAX_SIZE(SIZE){}

        ~MQ()
        {
            m_q.clear();
        }

        void add(const string& elem)
        {
            m_lock.acquire();
            if(m_q.size() == Q_MAX_SIZE - 1)
            {
                std::cout << "Queue full" << std::endl;
                m_lock.release();
                ::Sleep(5000);
                return;
            }
            m_q.push_back(elem);
            debug_print();
            m_lock.release();
        }

        void remove()
        {
            m_lock.acquire();
            if(m_q.size() == 0)
            {
                std::cout << "Queue empty" << std::endl;
                m_lock.release();
                return;
            }
            m_q.pop_front();
            debug_print();
            m_lock.release();
        }

        void debug_print()
        {
            std::cout << "\r\b";
            std::cout << "Size=[" << m_q.size() << "]";
        }

    private:
        const size_t Q_MAX_SIZE;
        deque<const string>   m_q;
        Lock           m_lock;
    };

}

#endif //_MQ_H_
#ifndef _Thread_H_
#define _Thread_H_

//File: Thread.h

#include "NonCopyable.h"
#include "Runnable.h"

namespace examples
{
    class Thread : public NonCopyable
    {
    public:
        Thread(Runnable&);
        ~Thread();

        bool join();
        bool join(size_t ms);
        void start();
        void setName(const std::string&);
        const char* getName() const;

    private:
        Thread();

        class ThreadImpl *m_impl;
    };

}

#endif //_Thread_H_
#include "Thread.h"
#include "ThreadImpl.h"
#include <iostream>

//File: Thread.cpp

using namespace examples;

Thread::Thread(Runnable& task) : m_impl(new ThreadImpl(task))
{}

Thread::~Thread()
{}

bool Thread::join()
{
    return m_impl->join();
}

bool Thread::join(size_t ms)
{
    return m_impl->join(ms);
}

const char* Thread::getName() const
{
    return m_impl->getName();
}

void Thread::setName(const std::string& thrName)
{
    return m_impl->setName(thrName);
}

void Thread::start()
{
    m_impl->start();
}
#ifndef _ThreadImpl_H_
#define _ThreadImpl_H_

//File: ThreadImpl.h

#include <windows.h>
#include <process.h>
#include <iostream>
#include <string>
#include "NonCopyable.h"
#include "Runnable.h"

namespace examples
{
    class ThreadImpl : public NonCopyable
    {
    public:
        ThreadImpl(Runnable&);
        ThreadImpl(const std::string&, Runnable&);
        ~ThreadImpl();

        bool join() const;
        bool join(size_t) const;
        void start() const;
        void setName(const std::string&);
        const char* getName() const;

    private:
        ThreadImpl();

        static unsigned __stdcall dispatch(void *);
        bool spawn(Runnable&);

        HANDLE      m_hthread;
        unsigned    m_thrdid;
        std::string m_thrName;
    };

}

#endif //_ThreadImpl_H_
#include "ThreadImpl.h"
#include "Runnable.h"

//File: ThreadImpl.cpp

using namespace examples;

unsigned __stdcall ThreadImpl::dispatch(void *args)
{
    Runnable *task = static_cast<Runnable *>(args);
    task->run();
    return 0;
}

ThreadImpl::ThreadImpl(Runnable& task) : m_hthread(0), m_thrdid(0), m_thrName("")
{
    spawn(task);
}

ThreadImpl::ThreadImpl(const std::string& thrName, Runnable& task) : m_hthread(0), m_thrdid(0), m_thrName(thrName)
{
    spawn(task);
}

ThreadImpl::~ThreadImpl()
{
    ::CloseHandle(m_hthread);
    m_hthread = NULL;
}

const char* ThreadImpl::getName() const
{
    return m_thrName.c_str();
}

void ThreadImpl::setName(const std::string& thrName)
{
    m_thrName = thrName;
}

bool ThreadImpl::spawn(Runnable &task)
{
    m_hthread = (HANDLE) ::_beginthreadex(0, 0, &ThreadImpl::dispatch, &task, CREATE_SUSPENDED, &m_thrdid);
    return m_hthread != NULL;
}

bool ThreadImpl::join() const

{
    DWORD retval = ::WaitForSingleObjectEx(m_hthread, INFINITE, false);
    return retval == WAIT_OBJECT_0;
}

bool ThreadImpl::join(size_t ms) const
{
    DWORD retval = ::WaitForSingleObjectEx(m_hthread, ms, false);
    return retval == WAIT_OBJECT_0;
}

void ThreadImpl::start() const
{
    ::ResumeThread(m_hthread);
}

Sample Run of the Program:

12 comments :

Priya Kannan said...

Somebody necessarily help to make severely posts I might state. This is the first time I frequented your website page and to this point? I surprised with the research you made to create this particular post extraordinary. Well done admin..
SQL Server Training in Chennai

johnsy sai said...

The knowledge of technology you have been sharing thorough this post is very much helpful to develop new idea. here by i also want to share this.
Digital Marketing Training in Chennai

Digital Marketing Training in Bangalore

digital marketing training in tambaram

digital marketing training in annanagar

gowsalya said...

Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
full stack developer training in annanagar

full stack developer training in tambaram

full stack developer training in velachery

Nila shri said...

Your good knowledge and kindness in playing with all the pieces were very useful. I don’t know what I would have done if I had not encountered such a step like this.
Data Science training in btm
Data Science training in rajaji nagar
Data Science training in chennai
Data Science training in kalyan nagar
Data Science training in electronic city
Data Science training in USA
selenium training in chennai
selenium training in bangalore

Mouni yoga said...

Thanks for such a great article here. I was searching for something like this for quite a long time and at last I’ve found it on your blog. It was definitely interesting for me to read  about their market situation nowadays.
python training institute in chennai
python training in Bangalore
python training institute in chennai

simbu said...

I have picked cheery a lot of useful clothes outdated of this amazing blog. I’d love to return greater than and over again. Thanks! 

java training in chennai | java training in bangalore

java online training | java training in pune

selenium training in chennai

selenium training in bangalore

isai 14 said...

Its really an Excellent post. I just stumbled upon your blog and wanted to say that I have really enjoyed reading your blog. Thanks for sharing....
Blueprism training in Pune

Blueprism online training

Blue Prism Training in Pune

sudhagar raja said...

A universal message I suppose, not giving up is the formula for success I think. Some things take longer than others to accomplish, so people must understand that they should have their eyes on the goal, and that should keep them motivated to see it out til the end.

java training in jayanagar | java training in electronic city

java training in chennai | java training in USA

JAHAN said...

Hello. This post couldn’t be written any better! Reading this post reminds me of my previous roommate. He always kept chatting about this. I will forward this page to him. Fairly certain he will have a good read. Thank you for sharing.


AWS Training in Bangalore | Amazon Web Services Training in Bangalore

Amazon Web Services Training in Pune | Best AWS Training in Pune

AWS Online Training | Online AWS Certification Course - Gangboard

Selenium Training in Chennai | Best Selenium Training in Chennai

Selenium Training in Bangalore | Best Selenium Training in Bangalore


saimouni said...

This is an awesome post.Really very informative and creative contents. These concept is a good way to enhance the knowledge.I like it and help me to development very well.Thank you for this brief explanation and very nice information.Well, got a good knowledge.
python training in tambaram
python training in annanagar
python training in jayanagar

johnsy sai said...

The knowledge of technology you have been sharing thorough this post is very much helpful to develop new idea. here by i also want to share this.
Devops training in marathahalli
Devops training in rajajinagar

Revathy A said...

Really great post, I simply unearthed your site and needed to say that I have truly appreciated perusing your blog entries. I want to say thanks for great sharing.
Thank you for an additional great post. Exactly where else could anybody get that kind of facts in this kind of a ideal way of writing? I have a presentation next week, and I’m around the appear for this kind of data.
Really great post, I simply unearthed your site and needed to say that I have truly appreciated perusing your blog entries.
I appreciate that you produced this wonderful article to help us get more knowledge about this topic. I know, it is not an easy task to write such a big article in one day, I've tried that and I've failed. But, here you are, trying the big task and finishing it off and getting good comments and ratings. That is one hell of a job done!
I really appreciate this post. I’ve been looking all over for this! Thank goodness I found it on Bing. You’ve made my day! Thx again!
Thanks for posting this info. I just want to let you know that I just check out your site and I find it very interesting and informative. I can't wait to read lots of your posts
I have visited this blog first time and i got a lot of informative data from here which is quiet helpful for me indeed. 
I really enjoy simply reading all of your weblogs. Simply wanted to inform you that you have people like me who appreciate your work. Definitely a great post I would like to read this
You made such an interesting piece to read, giving every subject enlightenment for us to gain knowledge. Thanks for sharing the such information with us
I likable the posts and offbeat format you've got here! I’d wish many thanks for sharing your expertise and also the time it took to post!!

angularjs Training in bangalore

angularjs Training in btm

angularjs Training in electronic-city

angularjs online Training

angularjs Training in marathahalli

Post a Comment