Reading and writing to the socket

We are officially able to make a client-server connection. Now, we are going to write and read the socket to make the connection more useful. We will modify our previous code, serverasync.cpp, and add the basic_stream_socket object, which provides stream-oriented socket functionality.

Note

To get more detailed information about the basic_stream_socket object, you can visit www.boost.org/doc/libs/1_58_0/doc/html/boost_asio/reference/basic_stream_socket.html.

Now, take a look at the following code containing the read and write socket process:

/* readwritesocket.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/cstdint.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
#include <string>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.
";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".
";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".
";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.
";
  global_stream_lock.unlock();
}

struct ClientContext : public boost::enable_shared_from_this<ClientContext> {
  boost::asio::ip::tcp::socket m_socket;

  std::vector<boost::uint8_t> m_recv_buffer;
  size_t m_recv_buffer_index;

  std::list<std::vector<boost::uint8_t> > m_send_buffer;

  ClientContext(boost::asio::io_service & io_service)
  : m_socket(io_service), m_recv_buffer_index(0) {
    m_recv_buffer.resize(4096);
  }

  ~ClientContext() {
  }

  void Close() {
    boost::system::error_code ec;
    m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
    m_socket.close(ec);
  }

  void OnSend(const boost::system::error_code &ec, std::list<std::vector<boost::uint8_t> >::iterator itr) {
    if(ec) {
      global_stream_lock.lock();
      std::cout << "OnSend Error: " << ec << ".
";
      global_stream_lock.unlock();

      Close();
    }
    else {
      global_stream_lock.lock();
      std::cout << "Sent " << (*itr).size() << " bytes." << std::endl;
      global_stream_lock.unlock();
    }
    m_send_buffer.erase(itr);

    // Start the next pending send
    if(!m_send_buffer.empty()) {
      boost::asio::async_write(
        m_socket,
        boost::asio::buffer(m_send_buffer.front()),
        boost::bind(
          &ClientContext::OnSend,
          shared_from_this(),
          boost::asio::placeholders::error,
          m_send_buffer.begin()
        )
      );
    }
  }

  void Send(const void * buffer, size_t length) {
    bool can_send_now = false;

    std::vector<boost::uint8_t> output;
    std::copy((const boost::uint8_t *)buffer, (const boost::uint8_t *)buffer + length, std::back_inserter(output));

    // Store if this is the only current send or not
    can_send_now = m_send_buffer.empty();

    // Save the buffer to be sent
    m_send_buffer.push_back(output);

    // Only send if there are no more pending buffers waiting!
    if(can_send_now) {
      // Start the next pending send
      boost::asio::async_write(
        m_socket,
        boost::asio::buffer(m_send_buffer.front()),
        boost::bind(
          &ClientContext::OnSend,
          shared_from_this(),
          boost::asio::placeholders::error,
          m_send_buffer.begin()
        )
      );
    }
  }

  void OnRecv(const boost::system::error_code &ec, size_t bytes_transferred) {
    if(ec) {
      global_stream_lock.lock();
      std::cout << "OnRecv Error: " << ec << ".
";
      global_stream_lock.unlock();

      Close();
    }
    else 	{
      // Increase how many bytes we have saved up
      m_recv_buffer_index += bytes_transferred;

      // Debug information
      global_stream_lock.lock();
      std::cout << "Recv " << bytes_transferred << " bytes." << std::endl;
      global_stream_lock.unlock();

      // Dump all the data
      global_stream_lock.lock();
      for(size_t x = 0; x < m_recv_buffer_index; ++x) {

        std::cout << (char)m_recv_buffer[x] << " ";
        if((x + 1) % 16 == 0) {
          std::cout << std::endl;
        }
      }
      std::cout << std::endl << std::dec;
      global_stream_lock.unlock();

      // Clear all the data
      m_recv_buffer_index = 0;

      // Start the next receive cycle
      Recv();
    }
  }

  void Recv() {
    m_socket.async_read_some(
      boost::asio::buffer(
        &m_recv_buffer[m_recv_buffer_index],
        m_recv_buffer.size() - m_recv_buffer_index),
      boost::bind(&ClientContext::OnRecv, shared_from_this(), _1, _2)
    );
  }
};

void OnAccept(const boost::system::error_code &ec, boost::shared_ptr<ClientContext> clnt) {
  if(ec) {
    global_stream_lock.lock();
    std::cout << "OnAccept Error: " << ec << ".
";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "Accepted!" << ".
";
    global_stream_lock.unlock();

    // 2 bytes message size, followed by the message
    clnt->Send("Hi there!", 9);
    clnt->Recv();
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!
";
  global_stream_lock.unlock();

  // We just use one worker thread 
  // in order that no thread safety issues
  boost::thread_group threads;
  threads.create_thread(boost::bind(&WorkerThread, io_svc, 1));

  boost::shared_ptr< boost::asio::ip::tcp::acceptor > acceptor(
    new boost::asio::ip::tcp::acceptor(*io_svc)
  );

  boost::shared_ptr<ClientContext> client(
    new ClientContext(*io_svc)
  );

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query(
      "127.0.0.1",
      boost::lexical_cast<std::string>(4444)
    );
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
    acceptor->open(endpoint.protocol());
    acceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(false));
    acceptor->bind(endpoint);
    acceptor->listen(boost::asio::socket_base::max_connections);
    acceptor->async_accept(client->m_socket, boost::bind(OnAccept, _1, client));

    global_stream_lock.lock();
    std::cout << "Listening on: " << endpoint << std::endl;
    global_stream_lock.unlock();
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".
";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  acceptor->close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

Save the preceding code as readwritesocket.cpp and compile the code using the following command:

g++ -Wall -ansi -I ../boost_1_58_0 readwritesocket.cpp -o readwritesocket -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58 -l mswsock

If we compare the code of the readwritesocket.cpp file with the serverasync.cpp file, we will find that we add a new class called ClientContext. It contains five member functions: Send(), OnSend(), Recv(), OnRecv(), and Close().

The Send() and OnSend() functions

In the Send() function, we input an array of characters and their length. Before the function sends the array of characters, it has to check whether or not the m_send_buffer parameter is empty. The sending process can only occur if the buffer is not empty.

The boost::asio::async_write namespace writes the socket and invokes the OnSend() function handler. Then, it erases the buffer and sends the next pending data if there is any. Now, every time we press any key in the telnet window, it will display what we have typed because the readwritesocket project sends back what we type to the telnet window.

The Recv() and OnRecv() functions

In contrast to the Send() function, the Recv() function will call the async_read_some() function to receive the set of data, and the OnRecv() function handler will format the received data to hexadecimal formatting.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset