Creating the streaming server

In this section, we will use Node to create the streaming server. The server will handle the client connections and the connection to the Twitter-streaming API; it will also manage the list of topics for all the clients who deliver tweets as they arrive.

We will begin by creating Twitter authentication tokens; we will learn how to use the Twit module to manage the connection to the Twitter API, and how to use Socket.IO to handle real-time communication between the server and the client application.

To follow the examples in this section, open the chirp-server project directory in the terminal and run the following command to install the project dependencies:

$ npm install

This will create the node_modules directory and download the project dependencies. If you haven't installed Node, download the binaries for your platform from http://nodejs.org/download/ and follow the instructions on the page.

Using the Twitter-streaming API

Twitter provides a streaming API that allows developers to access Twitter's global stream of tweets through several endpoints. The following endpoints allow access to different streams:

  • statuses/sample: This allows access to a small random sample of public statuses. All the applications connected to this endpoint will receive the same tweets.
  • statuses/filter: This returns public statuses that match one or more predicates. We will use this endpoint in the project.
  • statuses/firehose: This returns all the public statuses. This endpoint requires special access.

Also, there are the statuses/user and statuses/site endpoints, which allow you to access the public tweets of a particular user or website. When applications establish a connection with the Twitter endpoint, they are delivered a feed of tweets.

To run the examples in this chapter, you need to go to the Twitter Application Management page (https://apps.twitter.com/), create a new application, and generate API keys for the application. You will need the consumer key, the consumer secret, the access token, and the token secret.

The credentials.js file in the root directory of the project contains placeholders for the authentication tokens of the application. You can either replace the placeholder strings in this file with your own keys or create a new file with the same structure. In either case, make sure that the keys remain a secret. It would be a good idea to add this file to the .gitignore file in order to avoid accidentally pushing it to GitHub:

// Authentication tokens (replace with your own)
module.exports = {
    "consumer_key":        "xxx",
    "consumer_secret":     "xxx",
    "access_token":        "xxx",
    "access_token_secret": "xxx"
}

Using Twit to access the Twitter-streaming API

As mentioned earlier, we will use the Twit Node module (you can access the documentation in the project's repository at https://github.com/ttezel/twit/) to connect to the Twitter-streaming API. Twit handles both the REST and streaming APIs, keeping the connection alive and reconnecting automatically if the connection drops. The 01-twitter-sample.js file contains the code to connect to the statuses/sample stream. We begin by importing the twit module and loading the configuration module:

// Import node modules
var Twit   = require('twit'),              // Twitter API Client
    config = require('./credentials.js'),  // Credentials

The config object will contain the authentication tokens from the credentials.js file. We can now set the Twitter credentials and connect to the statuses/sample stream as follows:

// Configure the Twit object with the application credentials
var T = new Twit(config);

// Subscribe to the sample stream and begin listening
var stream = T.stream('statuses/sample'),

The stream object is an instance of EventEmitter, a built-in Node class that allows you to emit custom events and add listener functions to these events. To begin listening to tweets, we can just attach a listener to the tweet event:

// The callback will be invoked on each tweet. 
stream.on('tweet', function(tweet) { 
    // Do something with the tweet
});

The tweet object will contain information about the tweet, such as the date of creation, tweet ID in the numeric and string formats, tweet text, information about the user who generated the tweet, and language and tweet entities that may be present, such as hashtags and mentions. For a complete reference to the tweet attributes, refer to the Field Guide at https://dev.twitter.com/docs/platform-objects/tweets. A typical tweet will have the following structure, along with many other attributes:

{ 
  created_at: 'Thu May 15 22:27:37 +0000 2014',
  ...
  text: 'tweet text...',
  user: { 
    name: 'Pablo Navarro',
    screen_name: 'pnavarrc',
    ...
  },
  ...
  coordinates: { 
    type: 'Point', 
    coordinates: [ -76.786264, -33.234588 ] 
  },
  ...
  entities: { 
    hashtags: [],
    ...
  },
  lang: 'en'
}

You may have noticed that the coordinates attribute is a GeoJSON object, in particular, a GeoJSON point. This point is an approximation of where the tweet was generated. Less than 10 percent of the tweets in the sample stream contain this information, but the information is still useful and interesting to explore.

The stream object will emit other events that we might need to handle. The connect event will be emitted when Twit attempts to connect to the Twitter-streaming API:

stream.on('connect', function(msg) {
    console.log('Connection attempt.'),
});

If the connection is successful, the connected event will be triggered. Note that when the application is running, the connection to the Twitter stream can be interrupted several times. Twit will automatically try to reconnect following the reconnection guidelines from Twitter:

// The connection is successful.
stream.on('connected', function(msg) {
    console.log('Connection successful.'),
});

If a reconnection is scheduled, the reconnect event will be emitted, passing the request, response, and interval within milliseconds of the next reconnection attempt as follows:

// Emitted when a reconnection is scheduled.
stream.on('reconnect', function(req, res, interval) {
    console.log('Reconnecting in ' + (interval / 1e3) + ' seconds.'),
});

Twitter creates a queue with the tweets to be delivered to our application. If our program doesn't process the tweets fast enough, the queue will get longer, and Twitter will send a warning message to the application notifying us about the issue. If this happens, Twit will emit the warning event, passing the warning message as an argument to the callback function:

// The application is not processing the tweets fast enough.
stream.on('warning', function(msg) {
    console.warning('warning')
});

Twitter can disconnect the stream for several reasons. Before actually dropping the connection, Twitter will notify us about the disconnection, and Twit will emit the corresponding event as well. The complete list of events can be consulted in the Twit project repository at https://github.com/ttezel/twit.

In the project repository, there are examples of connections to the statuses/sample and statuses/filter streams. In the 01-twitter-sample.js file, we configure the Twitter credentials and use the statuses/sample endpoint to print the tweets in the terminal screen. To run this example (remember to add your credentials), type the following command in the root directory of the project:

$ node 01-twitter-sample.js

This will print the tweets on the console as they are received. As mentioned earlier, there are more streaming endpoints available. The statuses/filter stream allows you to track specific topics. The stream object receives a list of comma-separated strings, which should contain the words to be matched. If we pass the topics good morning and breakfast, the stream will contain tweets that match either the word breakfast or both good and morning. Twit allows us to specify the topics that need to be matched as a list of strings.

In the 02-twitter-filter.js file, we have the same setup as in the first example; however, in this case, we define the list of topics to track and configure the stream to connect to the statuses/filter endpoint, passing along the list of topics to track:

// List of topics to track
var topics = ['good morning', 'breakfast'];

// Subscribe to a specific list of topics
var stream = T.stream('statuses/filter', {track: topics});

To determine a match, Twitter will compare the tracking topics with the tweet text, the user name, screen name, and entities, such as hashtags and URLs. Note that the tweets won't include information about which term was matched; as our application will need this information, we will check which terms were matched in the tweet callback.

Using Socket.IO

Socket.IO is a JavaScript library that allows real-time communication between the client and the server. The library has two parts, the client-side library that runs in the browser and the server-side library for Node.

In this example, we will create an application that will allow you to send text messages to the server, which will send a text message back to inform you that the message was received. To follow the code in this example, open the 03-socketio-example.js file for the server-side code and the socketio-example.html file for the client-side code. A screenshot of the client-side application is shown as follows:

Using Socket.IO

We will begin by implementing the server-side code. In the server-side code, we import the socket.io module. This will expose the Socket.IO server:

// Import the Socket.IO module
var IOServer = require('socket.io'),

We can now create an instance of the Socket.IO server. We can either use the built-in server or an instance of a different server, such as those provided by the HTTP or express modules. We will use the built-in version, passing along the port number for the server:

// Start the server, listening on port 7000
var io = new IOServer(7000);

The server is ready to receive connections and messages. At this point, the server won't do anything other than listen; we need to attach a callback function for the connection event. The callback will receive a socket object as the argument. The socket is the client-side endpoint of the connection. Refer to the following code:

// Listen for connections from incoming sockets
io.on('connection', function(socket) {

    // Print the socket ID on connection
    console.log('Client ' + socket.id + ' connected.'),

    // Attach listeners to socket events... 
});

This callback will display a log when a client connects to the server, displaying the socket ID:

$ node 03-socketio-example.js
Client ID pk0XiCmUNgDVRP6zAAAC connected.

We can now attach event listeners to the socket events. We will add a callback for the disconnect event, which will display a log message:

// Displays a log message if the client disconnects.
socket.on('disconnect', function() {
    console.log('client disconnected.'),
});

We can attach listeners for custom events as well, and send JavaScript objects that can be serialized as arguments for the callback. Socket.IO also supports the sending of binary data. We will add a callback for the client-message custom event:

// The server will emit a response message.
socket.on('client-message', function (data) {
    socket.emit('server-message', {
        msg: 'Message "' + data.msg + '" received.'
    });
});

The callback for the client-message event will just send a message back to the client, indicating that the message was received. If we want to send a message to all the connected clients, we can use io.emit('event name', parameters).

Socket.IO will serve the client-side library, which can be very useful. We can use either this version or download the client-side library separately. We can use the served version by adding /socket.io/socket.io.js to the server's URL. To follow the client-side code, open the socketio-example.html file from the project directory. In this example, we will use the served version, adding the following line in the header:

<script src="http://localhost:7000/socket.io/socket.io.js"></script>

Note that if you want to use the server from an external device, you should replace localhost with the URL of the server. We will add an input element in the page, where the user will type the messages to the server. Under the input element, a list will display the messages both from the client and the server. The older messages will be displaced to the bottom as new messages are received. Refer to the following code:

<div class="container">

<h1>Socket.IO Example</h1>

<!-- Input element to send messages -->
<form role="form" class="form-horizontal" id="msgForm">
  <div class="form-group">
    <label for="msgToServer" class="col-sm-1">Message</label>
    <div class="col-sm-9">
      <input type="text" class="form-control input-sm" id="msgToServer" placeholder="Send a message to the server.">
    </div>
    <button type="submit" class="btn btn-default btn-sm">Send</button>
  </div>
</form>

<!-- List with messages -->
<ul id='msg-list' class='list-unstyled'></ul>

</div>

We will add a script with the code, which will establish the connection with the server and update the messages list. We will include D3 to handle the user input and to update the list of messages. We begin by opening a connection with the socket server:

// Open a connection with the Socket.IO server
var socket = io('http://localhost:7000'),

Note that in this case, the socket variable refers to the server's endpoint. The client API is almost identical to the server API; the socket object will also trigger the connect event once the connection with the server is established:

// The callback will be invoked when the connection establishes
socket.on('connect', function() {
    console.log('Successful connection to the server.'),
});

We will store each message, sender, and message timestamp in an array to display the messages in a list. We will also create a time formatter to display the timestamp of each message in a friendly format:

// Declare variables for the message list and the time formatter
var messages = [],
    dateFmt = d3.time.format('[%H:%M:%S]'),

We will select the input element with the #message ID and attach a listener for the submit event. The callback for the event will retrieve the content of the input element and send the message to the server. We use d3.event.preventDefault() to prevent the form from trying to submit the form values to the server:

d3.select('#msg-form).on('submit', function() {

    var inputElement = d3.select('#message).node(),
        message = inputElement.value.trim();

    // Check that the message is not empty
    if (message) {
        // Sends the message to the server...    
    }

    d3.event.preventDefault();
});

We retrieve the contents of the input element and verify that the message is not empty before we send the client-message signal. We also reset the input element value so that the user can write a new message without having to erase the message already sent:

    // Check that the message is not empty
    if (message) {
        // Sends the message to the server
        socket.emit('client-message', {msg: message});

        // Append the message to the message list
        messages.push({
            from: 'client', 
            date: new Date(), 
            msg: message
        });

        // Update the list of messages
        updateMessages();

        // Resets the form, clearing the input element
        this.reset();     
    }

In the updateMessages function, we will sort the messages by date, create a selection for the li elements, and append them on to the enter selection. The li elements will contain the time of the message, who sent the message, and the contents of the message. We will also add a class indicating who sent the message in order to set different colors for the server and the client:

// Update the message list
function updateMessages() {

    // Sort the messages, most recent first
    messages.sort(function(a, b) { return b.date - a.date; });

    // Create the selection for the list elements
    var li = d3.select('#msg-list')
        .selectAll('li').data(messages);

    // Append the list elements on enter
    li.enter().append('li'),

    // Update the list class and content.
    li
        .attr('class', function(d) { return d.from; })
        .html(function(d) {
          return [dateFmt(d.date), d.from, ':', d.msg].join(' '),
        });
}

The message list will be updated when the user sends a message to the server and when the server sends a message to the client. The application allows the sending and receiving of messages to the server, as shown in the following screenshot:

Using Socket.IO

Implementing the streaming server

In the previous sections, we learned how to use the Twit module to connect and receive tweets from the Twitter-streaming API's endpoints. We also learned how to implement bidirectional communication between a Node server and the connected clients using the Socket.IO module. In this section, we will use both the modules (Twit and Socket.IO) to create a server that allows multiple clients to track their own topics on Twitter in real time.

When the user accesses the application, a connection with the streaming server is established. Socket.IO will manage this connection, reconnecting and sending heartbeats if necessary. The user can then add up to five words to be tracked by the streaming server.

The streaming server will manage connections with the connected clients and one connection with the Twitter-streaming API. When a client adds a new topic, the streaming server will add it to the topics list. When a new tweet arrives, the server will examine its contents to check whether it matches any of the terms in the topic list and send a simplified version of the tweet to the corresponding client.

The code of the streaming server is in the chirp.js file in the top level of the project directory. We will begin by importing the Node modules and the credentials.js file, which exports the Twitter authentication tokens:

// Import the Node modules
var Twit     = require('twit'),
    IOServer = require('socket.io'),
    config   = require('./credentials.js'),

To keep track of the correspondence between the topics and clients, we will store the topic and a reference to the topic in the topics list. For instance, if the client with socket adds the word 'breakfast', we will add the {word: 'breakfast', client: socket} object to the topics list:

// List of topics to track
var topics = [];

As mentioned in the previous sections, we can use either the statuses/sample or the statuses/filter endpoints to capture tweets. We will use the statuses/filter endpoint in our application, but instead of filtering by topic, we will filter by location and language (tweets in English only). We will set the locations parameter to '-180,-90,180,90', meaning that we want results from anywhere in the world, and set the language parameter to 'en'. Passing a list of words to the statuses/filter endpoint will force us to reset the connection when the user adds a new topic. This is a waste of resources, and Twitter could apply rate limits if we open and close connections frequently. We will use the statuses/filter endpoint to listen to anything in the stream and filter the words we want. This will allow you to add or remove words from the topics list without having to reset the connection. We will initialize the Twit object, which will read the Twitter credentials and store them to create the streaming requests to Twitter. We will also create the stream to the statuses/filter endpoint and store a reference to it in the twitterStream variable. We will filter only using the language (English) and location (the world), and match the items by comparing the topic word with the tweet contents:

// Configure the Twit object with the application credentials
var T = new Twit(config);

// Filter by location (the world) and tweets in English
var filterOptions = {
    locations: '-180,-90,180,90', 
    language: 'en'};

// Creates a new stream object, tracking the updated topic list
var twitterStream = T.stream('statuses/filter', filterOptions);

We will define functions to handle the most important Twit stream events. Most of these callbacks will just log a message in the console, stating that the event has occurred. We will define a callback for the connect event, which will be triggered when a connection is attempted, and a callback for the connected event, which will be emitted when the connection to the Twitter stream is established:

// Connection attempt ('connect' event)
function twitOnConnect(req) {
    console.log('[Twitter] Connecting...'),
}

// Successful connection ('connected' event)
function twitOnConnected(res) {
    console.log('[Twitter] Connection successful.'),
}

We will display a log message if a reconnection is scheduled, indicating the interval in seconds:

// Reconnection scheduled ('reconnect' event).
function twitOnReconnect(req, res, interval) {
    var secs = Math.round(interval / 1e3);
    console.log('[Twitter] Disconnected. Reconnection scheduled in ' + secs + ' seconds.'),
}

We will also add callbacks for the disconnect and limit events, which will occur when Twitter sends a disconnect or limit message, respectively. Note that Twit will close the connection if it receives the disconnect message, but not if it receives the limit message. In the limit callback, we will display a message and stop the stream explicitly:

// Disconnect message from Twitter ('disconnect' event)
function twitOnDisconnect(disconnectMessage) {
    // Twit will stop itself before emitting the event
    console.log('[Twitter] Disconnected.'),
}

// Limit message from Twitter ('limit' event)
function twitOnLimit(limitMessage) {
    // We stop the stream explicitely.
    console.log('[Twitter] Limit message received. Stopping.'),
    twitterStream.stop();
}

Adding log messages for these events can help debug issues or help us know what is happening if we don't receive messages for a while. The event that we should certainly listen for is the tweet event, which will be emitted when a tweet is delivered by the streaming endpoint. The callback of the event will receive the tweet object as the argument.

As mentioned earlier, we will only send geotagged tweets to the connected clients. We will check whether the tweet text matches any of the terms in the topics list. If a term is found in the tweet text, we will send a simplified version of the tweet to the client who added the term:

// A tweet is received ('tweet' event)
function twitOnTweet(tweet) {

    // Exits if the tweet doesn't have geographic coordinates
    if (!tweet.coordinates) { return; }

    // Convert the tweet text to lowercase to find the topics
    var tweetText = tweet.text.toLowerCase();

    // Check if any of the topics are contained in the tweet text
    topics.forEach(function(topic) {

        // Checks if the tweet text contains the topic
        if (tweetText.indexOf(topic.word) !== -1) {

            // Sends a simplified version of the tweet to the client
            topic.socket.emit('tweet', {
                id: tweet.id,
                coordinates: tweet.coordinates,
                word: topic.word
            });
        }
    });
}

As we are not using the tweet text or its creation date, we will send just the tweet ID, the coordinates, and the matched word to the client. We can now attach the listeners to their corresponding events as follows:

// Add listeners for the stream events to the stream instance
twitterStream.on('tweet',      twitOnTweet);
twitterStream.on('connect',    twitOnConnect);
twitterStream.on('connected',  twitOnConnected);
twitterStream.on('reconnect',  twitOnReconnect);
twitterStream.on('limit',      twitOnLimit);
twitterStream.on('disconnect', twitOnDisconnect);

We have initialized the connection to the Twitter-streaming API, but as we don't have any topics in our list, nothing will happen. We need to create the Socket.IO server to handle connections with clients, which can add topics to the list. We will begin by defining the port where the Socket.IO server will listen and creating an instance of the server. Note that we can create an instance of the Socket.IO server with or without the new keyword:

// Create a new instance of the Socket.IO Server
var port = 9720,
    io = new IOServer(port);

// Displays a message at startup
console.log('Listening for incoming connections in port ' + port);

The io server will begin listening for incoming connections on port 9720. We can use other port numbers too; remember that ports between 0 and 1023 are privileged, as they require a higher level of permission to bind.

When a client connects, the connection event will be emitted by the io server, passing the socket as an argument to the event callback. In this case, we will display a log message in the console, indicating that a new connection was established, and add listeners for the socket events:

// A client's established a connection with the server
io.on('connection', function(socket) {

    // Displays a message in the console when a client connects
    console.log('Client ', socket.id, ' connected.'),

    // Add listeners for the socket events...
});

The socket object is a reference to the client endpoint in the communication. If the client adds a new topic, the add custom event will be emitted, passing the added topic to the event callback. In the callback for this event, we will append the word and a reference to the socket to the topic list and display a log message in the console:

    // The client adds a new topic
    socket.on('add', function(topic) {
        // Adds the new topic to the topic list
        topics.push({
            word: topic.word.toLowerCase(), 
            socket: socket
        });

        console.log('Adding the topic "' + topic.word + '"'),
    });

When the client disconnects, we will remove its topics from the list and display a log message in the terminal:

    // If the client disconnects, we remove its topics from the list
    socket.on('disconnect', function() {
        console.log('Client ' + socket.id + ' disconnected.'),
        topics = topics.filter(function(topic) {
            return topic.socket.id !== socket.id;
        });
    });

At this point, the server is capable of handling multiple clients connected at the same time, each adding their own terms to the topic list. When we implement (and access) the client-side application, we will have the streaming server generate logs such as the following:

$ node chirp.js
Listening for incoming connections in port 9720
[Twitter] Connecting...
[Twitter] Connection successful.
Client 4WDFIrqsbxtf_NO_AAAA connected.
Adding the topic "day"
Adding the topic "night"
Client 4WDFIrqsbxtf_NO_AAAA disconnected.
Client P8mb97GiLOhc-noLAAAB connected.
Adding the topic "coffee"
Adding the topic "tea"
Adding the topic "milk"
Adding the topic "beer"
[Twitter] Disconnected. Reconnection scheduled in 0 seconds.
[Twitter] Connecting...
[Twitter] Connection successful.
Client P8mb97GiLOhc-noLAAAB disconnected.
Client p3lFgVrxGI0bLPOFAAAC connected.
  ...

In the next section, we will use the client-side Socket.IO library, D3, and Backbone to create a visualization that shows the geographic distribution of tweets matching the user-defined topics.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset