Chapter 6. Creating a Client-server Application

In the previous chapter, we delved into the Boost.Asio libraries, which are important in order to develop a network application. And now, we will move to a deeper discussion about a client-server application that can communicate with each other over a computer network between two or more computers. One of them is called client and the other one is the server.

We are going to discuss the development of the server, which is able to send and receive data traffic from the client and also create a client-side program to receive data traffic. In this chapter, we will discuss the following topics:

  • Establishing a connection between the client and server
  • Sending and receiving data between the client and server
  • Wrapping the most frequently used code to simplify the programming process by avoiding code reuse

Establishing a connection

We talked about two types of Internet Protocol (IP) in Chapter 2, Understanding the Networking Concepts. These are Transmission Control Protocol (TCP) and User Datagram Protocol (UDP). TCP is connection-oriented, which means data can be sent just after the connection has been established. In contrast, UDP is connectionless Internet protocol, which means the protocol just sends the data directly to the destination device. In this chapter, we will only talk about TCP; therefore, we have to establish the connection first. Connection can only be established if the two parties, in this case, the client and server, accept the connection. Here, we will try to establish a connection synchronously and asynchronously.

A synchronous client

We start with establishing the synchronous connection to a remote host. It is acting as a client, which will open a connection to the Packt Publishing website (www.packtpub.com). We will use TCP protocol, as we discussed earlier in Chapter 2, Understanding the Networking Concepts. Here is the code:

/* connectsync.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.
";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".
";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".
";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.
";
  global_stream_lock.unlock();
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );
  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!
";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=2; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::asio::ip::tcp::socket sckt(*io_svc);

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query("www.packtpub.com", 
      boost::lexical_cast<std::string>(80)
    );
    boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
    boost::asio::ip::tcp::endpoint endpoint = *iterator;

    global_stream_lock.lock();
    std::cout << "Connecting to: " << endpoint << std::endl;
    global_stream_lock.unlock();

    sckt.connect(endpoint); 
    std::cout << "Connected!
";
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".
";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  sckt.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  sckt.close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

Save the preceding code as connectsync.cpp and run the following command to compile the code:

g++ -Wall -ansi -I ../boost_1_58_0 connectsync.cpp -o connectsync -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

Run the program by typing connectsync in the console, and we should get the following output:

A synchronous client

The program will exit as soon as we press the Enter key.

Now, let us analyze the code. As we can see in the preceding code, we use our previous sample code and insert a line of code in order to make it able to establish a connection. Let's draw our attention to the line we have inserted:

boost::asio::ip::tcp::socket sckt(*io_svc);

We now have a global variable, which is socket. This variable will be used to provide socket functionality. It comes from the namespace boost::asio::ip::tcp because we use TCP as our protocol:

boost::asio::ip::tcp::resolver resolver(*io_svc);
boost::asio::ip::tcp::resolver::query query("www.packtpub.com",
  boost::lexical_cast<std::string>(80)
);
boost::asio::ip::tcp::resolver::iterator iterator =
resolver.resolve(query);

We also use the namespace boost::asio::ip::tcp::resolver. It is used to get the address of the remote host we that want to connect with. With the query() class, we pass the Internet address and port as a parameter. But because we use an integer type for a port number, we have to convert it to a string by using lexical_cast. The query class is used to describe the query that can be passed to a resolver. Then, by using the iterator class, we will define iterators from the results returned by a resolver:

boost::asio::ip::tcp::endpoint endpoint = *iterator;

After the iterator is successfully created, we give it to the endpoint type variable. The endpoint will store the list of ip addresses that are generated by the resolver:

sckt.connect(endpoint);

Then, the connect() member function will connect the socket to the endpoint, which we specified before. If everything runs properly and no error or exception is thrown, the connection is now established:

boost::system::error_code ec;
sckt.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
sckt.close(ec);

To release the connection, we have to disable the sending and receiving data process on the socket first by using the shutdown() member function; then, we invoke the close() member function to close the socket.

When we run the program and get output like the preceding image, it will inform us that the connection has been established. We can change the port number, for example, to 110, which is Remote TELNET Service protocol, in the query() class like the following:

boost::asio::ip::tcp::resolver::query query("www.packtpub.com",
  boost::lexical_cast<std::string>(110)
);

Then, the program will throw an exception, and the output will be as follows:

A synchronous client

From the output, we can conclude that the connection has been refused by the target machine because the port we plan to connect to is closed. This means that by using port 80, which is Hypertext Transfer Protocol (HTTP), we can make a connection with the Packt Publishing website.

An asynchronous client

We have already been able to establish a connection synchronously. But how about if we need to connect asynchronously to the target so that the program will not freeze while trying to make a connection? Let us take a look at the following code to find the answer:

/* connectasync.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <string>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.
";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".
";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".
";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.
";
  global_stream_lock.unlock();
}

void OnConnect(const boost::system::error_code &ec) {
  if(ec) {
    global_stream_lock.lock();
    std::cout << "OnConnect Error: " << ec << ".
";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "Connected!.
";
    global_stream_lock.unlock();
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!
";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=2; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::shared_ptr<boost::asio::ip::tcp::socket> sckt(
    new boost::asio::ip::tcp::socket(*io_svc)
  );

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query("www.packtpub.com",
      boost::lexical_cast<std::string>(80)
    );
    boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve( query );
    boost::asio::ip::tcp::endpoint endpoint = *iterator;

    global_stream_lock.lock();
    std::cout << "Connecting to: " << endpoint << std::endl;
    global_stream_lock.unlock();

    sckt->async_connect(endpoint, boost::bind(OnConnect, _1));
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".
";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  sckt->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  sckt->close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

Then, save the preceding code as connectasync.cpp and run the following command to compile the code:

g++ -Wall -ansi -I ../boost_1_58_0 connectasync.cpp -o connectasync -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

Try to run the program, and you should get the following output:

An asynchronous client

As we can see in the preceding code, we add the OnConnect() function. Because the socket object is noncopyable, and we need to ensure that it is still valid while the handler is waiting to be called, we have to use the boost::shared_ptr namespace. We also use the boost::bind namespace to invoke the handler, that is, the OnConnect() function.

An asynchronous server

We already know how to connect to a remote host synchronously and asynchronously. Now, we are going to create the server program to talk with the client-side program that we created before. Because we will deal with the asynchronous program in the boost::asio namespace, we will discuss the client-side program in an asynchronous server only. Let us take a look at the following code:

/* serverasync.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <string>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.
";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".
";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".
";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.
";
  global_stream_lock.unlock();
}

void OnAccept(const boost::system::error_code &ec) {
  if(ec) {
    global_stream_lock.lock();
    std::cout << "OnAccept Error: " << ec << ".
";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "Accepted!" << ".
";
    global_stream_lock.unlock();
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!
";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=2; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::shared_ptr< boost::asio::ip::tcp::acceptor > acceptor(
    new boost::asio::ip::tcp::acceptor(*io_svc)
  );

  boost::shared_ptr<boost::asio::ip::tcp::socket> sckt(
    new boost::asio::ip::tcp::socket(*io_svc)
  );

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query(
      "127.0.0.1", 
      boost::lexical_cast<std::string>(4444)
    );
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
    acceptor->open(endpoint.protocol());
    acceptor->set_option(
      boost::asio::ip::tcp::acceptor::reuse_address(false));
    acceptor->bind(endpoint);
    acceptor->listen(boost::asio::socket_base::max_connections);
    acceptor->async_accept(*sckt, boost::bind(OnAccept, _1));

    global_stream_lock.lock();
    std::cout << "Listening on: " << endpoint << std::endl;
    global_stream_lock.unlock();
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".
";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  acceptor->close(ec);

  sckt->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  sckt->close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

Save the preceding code as serverasync.cpp and run the following command to compile the code:

g++ -Wall -ansi -I ../boost_1_58_0 serverasync.cpp -o serverasync -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58 –l mswsock

Before we run the program, let us distinguish the code. We now have a new object, which is tcp::acceptor. This object is used for accepting new socket connections. Due to the use of the accept() function, we need to add the mswsock library to our compilation process:

acptor->open(endpoint.protocol());
acptor->set_option
(boost::asio::ip::tcp::acceptor::reuse_address(false));
acptor->bind(endpoint);
acptor->listen(boost::asio::socket_base::max_connections);
acptor->async_accept(*sckt, boost::bind(OnAccept, _1));

From the preceding code snippet, we can see that the program calls the open() function to open the acceptor by using the protocol that is retrieved from the endpoint variable. Then, by using the set_option function, we set an option on the acceptor to not reuse the address. The acceptor is also bound to the endpoint using the bind() function. After that, we invoke the listen() function to put the acceptor into the state where it will listen for new connections. Finally, the acceptor will accept new connections by using the async_accept() function, which will start an asynchronous accept.

Now, it is time to run the program. We need to open two command consoles here. The first console is for the program itself and the second is for calling telnet command to make a connection to the server. We only need to run the command telnet 127.0.0.1 4444 just after we run the serverasync program (we can refer to Chapter 2, Understanding the Networking Concepts, to call the telnet command in the command prompt). The output should be like the following:

An asynchronous server

From the preceding image, we can see that the program is listening to port 4444 when it starts, and after we call the telnet command to start a connection to port 4444, the program accepts the connection. However, because we only have one socket object and invoke the async_accept() function just once, the program will accept one connection only.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset