Giving some work to the I/O service

Now, it is time for us to give some work to the io_service object. Knowing more about boost::bind and boost::mutex will help us to give the io_service object work to do. There are two member functions in the io_service object: the post() and dispatch() functions, which we will frequently use to do this. The post() function is used to request the io_service object to run the io_service object's work after we queue up all the work, so it does not allow us to run the work immediately. While the dispatch() function is also used to make a request to the io_service object to run the io_service object's work, but it will execute the work right away without queuing it up.

Using the post() function

Let's examine the post() function by creating the following code. We will use the mutexbind.cpp file as our base code, since we will just modify the source code:

/* post.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << counter << ".
";
  global_stream_lock.unlock();

  iosvc->run();

  global_stream_lock.lock();
  std::cout << "End.
";
  global_stream_lock.unlock();
}

size_t fac(size_t n) {
  if ( n <= 1 ) {
    return n;
  }
  boost::this_thread::sleep(
    boost::posix_time::milliseconds(1000)
  );
  return n * fac(n - 1);
}

void CalculateFactorial(size_t n) {
  global_stream_lock.lock();
  std::cout << "Calculating " << n << "! factorial" << std::endl;
  global_stream_lock.unlock();

  size_t f = fac(n);

  global_stream_lock.lock();
  std::cout << n << "! = " << f << std::endl;
  global_stream_lock.unlock();
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished." << std::endl;
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  io_svc->post(boost::bind(CalculateFactorial, 5));
  io_svc->post(boost::bind(CalculateFactorial, 6));
  io_svc->post(boost::bind(CalculateFactorial, 7));

  worker.reset();

  threads.join_all();

  return 0;
}

Name the preceding code as post.cpp and compile it using the following command:

g++ -Wall -ansi -I ../boost_1_58_0 post.cpp -o post -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l boost_thread-mgw49-mt-1_58

Before we run the program, let's examine the code to understand its behavior:

size_t fac(size_t n) {
  if (n <= 1) {
    return n;
  }
  boost::this_thread::sleep(
    boost::posix_time::milliseconds(1000)
  );
  return n * fac(n - 1);
}

We add the fac() function to calculate the n factorial recursively. There is a time delay to slow down the process in order to see the work of our worker threads:

io_svc->post(boost::bind(CalculateFactorial, 5));
io_svc->post(boost::bind(CalculateFactorial, 6));
io_svc->post(boost::bind(CalculateFactorial, 7));

In the main block, we post three function objects on the io_service object, using the post() function. We do this just after we initialize the five worker threads. However, because we call the run() function of the io_service object inside each thread, the work of the io_service object will run. This means that the post() function will do its job.

Now, let's run post.cpp and take a look at what has happened here:

Using the post() function

As we can see in the output of the preceding screenshot, the program runs the thread from the pool of threads, and after it finishes one thread, it calls the post() function from the io_service object until all three post() functions and all five threads have been called. Then, it calculates the factorial for each three n number. After it gets the worker.reset() function, it is notified that the work has been finished, and then it joins all the threads via the threads.join_all() function.

Using the dispatch() function

Now, let's examine the dispatch() function to give the io_service function some work. We will still use the mutexbind.cpp file as our base code and we will modify it a little so that it becomes like this:

/* dispatch.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc) {
  global_stream_lock.lock();
  std::cout << "Thread Start.
";
  global_stream_lock.unlock();

  iosvc->run();

  global_stream_lock.lock();
  std::cout << "Thread Finish.
";
  global_stream_lock.unlock();
}

void Dispatch(int i) {
  global_stream_lock.lock();
  std::cout << "dispath() Function for i = " << i <<  std::endl;
  global_stream_lock.unlock();
}

void Post(int i) {
  global_stream_lock.lock();
  std::cout << "post() Function for i = " << i <<  std::endl;
  global_stream_lock.unlock();
}

void Running(boost::shared_ptr<boost::asio::io_service> iosvc) {
  for( int x = 0; x < 5; ++x ) {
    iosvc->dispatch(boost::bind(&Dispatch, x));
    iosvc->post(boost::bind(&Post, x));
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "The program will exit automatically once all work has finished." << std::endl;
  global_stream_lock.unlock();

  boost::thread_group threads;

  threads.create_thread(boost::bind(&WorkerThread, io_svc));

  io_svc->post(boost::bind(&Running, io_svc));

  worker.reset();

  threads.join_all();

  return 0;
}

Give the preceding code the name dispatch.cpp and compile it using the following command:

g++ -Wall -ansi -I ../boost_1_58_0 dispatch.cpp -o dispatch -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l boost_thread-mgw49-mt-1_58

Now, let's run the program to get the following output:

Using the dispatch() function

Different than the post.cpp file, in the dispatch.cpp file, we just create one worker thread. Also, we add two functions, dispatch(), and post() to understand the difference between both functions:

iosvc->dispatch(boost::bind(&Dispatch, x));
iosvc->post(boost::bind(&Post, x));

If we look at the preceding code snippet inside the Running() function, we expect to get the ordered output between the dispatch() and post() functions. However, when we see the output, we find that the result is different because the dispatch() function is called first and the post() function is called after it. This happens because the dispatch() function can be invoked from the current worker thread, while the post() function has to wait until the handler of the worker is complete before it can be invoked. In other words, the dispatch() function's events can be executed from the current worker thread even if there are other pending events queued up, while the post() function's events have to wait until the handler completes the execution before being allowed to be executed.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset