Votes from Twitter

In your $GOPATH/src folder, alongside other projects, create a new folder called socialpoll for this chapter. This folder won't be a Go package or program by itself, but will contain our three component programs. Inside socialpoll, create a new folder called twittervotes and add the obligatory main.go template (this is important as main packages without a main function won't compile):

package main
func main(){}

Our twittervotes program is going to:

  • Load all polls from the MongoDB database using mgo, and collect all options from the options array in each document
  • Open and maintain a connection to Twitter's streaming APIs looking for any mention of the options
  • For each tweet that matches the filter, figure out which option is mentioned and push that option through to NSQ
  • If the connection to Twitter is dropped (which is common in long-running connections as it is actually part of Twitter's streaming API specification) after a short delay (so we do not bombard Twitter with connection requests), reconnect and continue
  • Periodically re-query MongoDB for the latest polls and refresh the connection to Twitter to make sure we are always looking out for the right options
  • When the user terminates the program by hitting Ctrl + C, it will gracefully stop itself

Authorization with Twitter

In order to use the streaming API, we will need authentication credentials from Twitter's Application Management console, much in the same way we did for our Gomniauth service providers in Chapter 3, Three Ways to Implement Profile Pictures. Head over to https://apps.twitter.com and create a new app called something like SocialPoll (the names have to be unique, so you can have some fun here; the choice of name doesn't affect the code either way). When your app has been created, visit the API Keys tab and locate the Your access token section where you need to create a new access token. After a short delay, refresh the page and notice that you in fact have two sets of keys and secrets; an API key and a secret, and an access token and the corresponding secret. Following good coding practices, we are going to set these values as environment variables so that our program can have access to them without us having to hardcode them in our source files.

The keys we will use in this chapter are:

  • SP_TWITTER_KEY
  • SP_TWITTER_SECRET
  • SP_TWITTER_ACCESSTOKEN
  • SP_TWITTER_ACCESSSECRET

You can set the environment variables however you like, but since the app relies on them in order to work, creating a new file called setup.sh (for bash shells) or setup.bat (on Windows) is a good idea since you can check such files into your source code repository. Insert the following code in setup.sh or setup.bat by copying the appropriate values from the Twitter app page:

#!/bin/bash
export SP_TWITTER_KEY=yCwwKKnuBnUBrelyTN...
export SP_TWITTER_SECRET=6on0YRYniT1sI3f...
export SP_TWITTER_ACCESSTOKEN=2427-13677...
export SP_TWITTER_ACCESSSECRET=SpnZf336u...

Run the file with the source or call commands to have the values appropriately set, or add them to your .bashrc or C:cmdauto.cmd files to save you running them every time you open a new terminal window.

Extracting the connection

The Twitter streaming API supports HTTP connections that stay open for a long time, and given the design of our solution, we are going to need to access the net.Conn object in order to close it from outside of the goroutine in which requests occur. We can achieve this by providing our own dial method to an http.Transport object that we will create.

Create a new file called twitter.go inside twittervotes (which is where all things Twitter-related will live), and insert the following code:

var conn net.Conn
func dial(netw, addr string) (net.Conn, error) {
  if conn != nil {
    conn.Close()
    conn = nil
  }
  netc, err := net.DialTimeout(netw, addr, 5*time.Second)
  if err != nil {
    return nil, err
  }
  conn = netc
  return netc, nil
}

Our bespoke dial function first ensures conn is closed, and then opens a new connection keeping the conn variable updated with the current connection. If a connection dies (Twitter's API will do this from time to time) or is closed by us, we can redial without worrying about zombie connections.

We will periodically close the connection ourselves and initiate a new one, because we want to reload the options from the database at regular intervals. To do this, we need a function that closes the connection, and also closes an io.ReadCloser that we will use to read the body of the responses. Add the following code to twitter.go:

var reader io.ReadCloser
func closeConn() {
  if conn != nil {
    conn.Close()
  }
  if reader != nil {
    reader.Close()
  }
}

Now we can call closeConn at any time to break the ongoing connection with Twitter and tidy things up. In most cases, our code will load the options from the database again and open a new connection right away, but if we're shutting the program down (in response to a Ctrl + C hit) then we can call closeConn just before we exit.

Reading environment variables

Next we are going to write a function that will read the environment variables and set up the OAuth objects we'll need in order to authenticate the requests. Add the following code in the twitter.go file:

var (
  authClient *oauth.Client
  creds *oauth.Credentials
)
func setupTwitterAuth() {
  var ts struct {
    ConsumerKey    string `env:"SP_TWITTER_KEY,required"`
    ConsumerSecret string `env:"SP_TWITTER_SECRET,required"`
    AccessToken    string `env:"SP_TWITTER_ACCESSTOKEN,required"`
    AccessSecret   string `env:"SP_TWITTER_ACCESSSECRET,required"`
  }
  if err := envdecode.Decode(&ts); err != nil {
    log.Fatalln(err)
  }
  creds = &oauth.Credentials{
    Token:  ts.AccessToken,
    Secret: ts.AccessSecret,
  }
  authClient = &oauth.Client{
    Credentials: oauth.Credentials{
      Token:  ts.ConsumerKey,
      Secret: ts.ConsumerSecret,
    },
  }
}

Here we define a struct type to store the environment variables that we need to authenticate with Twitter. Since we don't need to use the type elsewhere, we define it inline and creating a variable called ts of this anonymous type (that's why we have the somewhat unusual var ts struct… code). We then use Joe Shaw's elegant envdecode package to pull in those environment variables for us. You will need to run go get github.com/joeshaw/envdecode and also import the log package. Our program will try to load appropriate values for all the fields marked required, and return an error if it fails to do so, which reminds people that the program won't work without Twitter credentials.

The strings inside the back ticks alongside each field in struct are called tags, and are available through a reflection interface, which is how envdecode knows which variables to look for. Tyler Bunnell and I added the required argument to this package, which indicates that it is an error for any of the environment variables to be missing (or empty).

Once we have the keys, we use them to create oauth.Credentials and an oauth.Client object from Gary Burd's go-oauth package, which will allow us to authorize requests with Twitter.

Now that we have the ability to control the underlying connection and authorize requests, we are ready to write the code that will actually build the authorized request, and return the response. In twitter.go, add the following code:

var (
  authSetupOnce sync.Once
  httpClient    *http.Client
)
func makeRequest(req *http.Request, params url.Values) (*http.Response, error) {
  authSetupOnce.Do(func() {
    setupTwitterAuth()
    httpClient = &http.Client{
      Transport: &http.Transport{
        Dial: dial,
      },
    }
  })
  formEnc := params.Encode()
  req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  req.Header.Set("Content-Length", strconv.Itoa(len(formEnc)))
  req.Header.Set("Authorization", authClient.AuthorizationHeader(creds, "POST", req.URL, params))
  return httpClient.Do(req)
}

We use sync.Once to ensure our initialization code only gets run once despite the number of times we call makeRequest. After calling the setupTwitterAuth method, we create a new http.Client using an http.Transport that uses our custom dial method. We then set the appropriate headers needed for authorization with Twitter by encoding the specified params object that will contain the options we are querying for.

Reading from MongoDB

In order to load the polls, and therefore the options to search Twitter for, we need to connect to and query MongoDB. In main.go, add the two functions dialdb and closedb:

var db *mgo.Session
func dialdb() error {
  var err error
  log.Println("dialing mongodb: localhost")
  db, err = mgo.Dial("localhost")
  return err
}
func closedb() {
  db.Close()
  log.Println("closed database connection")
}

These two functions will connect to and disconnect from the locally running MongoDB instance using the mgo package, and store mgo.Session (the database connection object) in a global variable called db.

Tip

As an additional assignment, see if you can find an elegant way to make the location of the MongoDB instance configurable so that you don't need to run it locally.

Assuming MongoDB is running and our code is able to connect, we need to load the poll objects and extract all the options from the documents, which we will then use to search Twitter. Add the following Options function to main.go:

type poll struct {
  Options []string
}
func loadOptions() ([]string, error) {
  var options []string
  iter := db.DB("ballots").C("polls").Find(nil).Iter()
  var p poll
  for iter.Next(&p) {
    options = append(options, p.Options...)
  }
  iter.Close()
  return options, iter.Err()
}

Our poll document contains more than just Options, but our program doesn't care about anything else, so there's no need for us to bloat our poll struct. We use the db variable to access the polls collection from the ballots database, and call the mgo package's fluent Find method, passing nil (meaning no filtering).

Note

A fluent interface (first coined by Eric Evans and Martin Fowler) refers to an API design that aims to make the code more readable by allowing you to chain together method calls. This is achieved by each method returning the context object itself, so that another method can be called directly afterwards. For example, mgo allows you to write queries such as this:

query := col.Find(q).Sort("field").Limit(10).Skip(10)

We then get an iterator by calling the Iter method, which allows us to access each poll one by one. This is a very memory-efficient way of reading the poll data, because it only ever uses a single poll object. If we were to use the All method instead, the amount of memory we'd use would depend on the number of polls we had in our database, which would be out of our control.

When we have a poll, we use the append method to build up the options slice. Of course, with millions of polls in the database, this slice too would grow large and unwieldy. For that kind of scale, we would probably run multiple twittervotes programs, each dedicated to a portion of the poll data. A simple way to do this would be to break polls into groups based on the letters the titles begin with, such as group A-N and O-Z. A somewhat more sophisticated approach would be to add a field to the poll document grouping it up in a more controlled manner, perhaps based on the stats for the other groups so that we are able to balance the load across many twittervotes instances.

Tip

The append built-in function is actually a variadic function, which means you can pass multiple elements for it to append. If you have a slice of the correct type, you can add ... to the end, which simulates the passing of each item of the slice as a different argument.

Finally, we close the iterator and clean up any used memory before returning the options and any errors that occurred while iterating (by calling the Err method on the mgo.Iter object).

Reading from Twitter

Now we are able to load the options and make authorized requests to the Twitter API. We are thus ready to write the code that initiates the connection, and continuously reads from the stream until either we call our closeConn method, or Twitter closes the connection for one reason or another. The structure contained in the stream is a complex one containing all kinds of information about the tweet—who made it and when, and even what links or mentions of users occur in the body (see Twitter's API documentation for more details). However, we are only interested in the tweet text itself so you need not worry about all the other noise; add the following structure to twitter.go:

type tweet struct {
  Text string
}

Tip

This may feel incomplete, but think about how clear it makes our intentions to other programmers who might see our code: a tweet has some text, and that is all we care about.

Using this new structure, in twitter.go, add the following readFromTwitter function that takes a send-only channel called votes; this is how this function will inform the rest of our program that it has noticed a vote on twitter:

func readFromTwitter(votes chan<- string) {
  options, err := loadOptions()
  if err != nil {
    log.Println("failed to load options:", err)
    return
  }
  u, err := url.Parse("https://stream.twitter.com/1.1/statuses/filter.json")
  if err != nil {
    log.Println("creating filter request failed:", err)
    return
  }
  query := make(url.Values)
  query.Set("track", strings.Join(options, ","))
  req, err := http.NewRequest("POST", u.String(), strings.NewReader(query.Encode()))
  if err != nil {
    log.Println("creating filter request failed:", err)
    return
  }
  resp, err := makeRequest(req, query)
  if err != nil {
    log.Println("making request failed:", err)
    return
  }
  reader := resp.Body
  decoder := json.NewDecoder(reader)
  for {
    var tweet tweet
    if err := decoder.Decode(&tweet); err != nil {
      break
    }
    for _, option := range options {
      if strings.Contains(
        strings.ToLower(tweet.Text),
        strings.ToLower(option),
      ) {
        log.Println("vote:", option)
        votes <- option
      }
    }
  }
}

In the preceding code, after loading the options from all the polls data (by calling the loadOptions function), we use url.Parse to create a url.URL object describing the appropriate endpoint on Twitter. We build a url.Values object called query, and set the options as a comma-separated list. As per the API, we make a new POST request using the encoded url.Values object as the body, and pass it to makeRequest along with the query object itself. All being well, we make a new json.Decoder from the body of the request, and keep reading inside an infinite for loop by calling the Decode method. If there is an error (probably due to the connection being closed), we simply break the loop and exit the function. If there is a tweet to read, it will be decoded into the tweet variable, which will give us access to the Text property (the 140 characters of the tweet itself). We then iterate over all possible options, and if the tweet has mentioned it, we send it on the votes channel. This technique also allows a tweet to contain many votes at the same time, something you may or may not decide to change based on the rules of the election.

Note

The votes channel is send-only (which means we cannot receive on it), since it is of the type chan<- string. Think of the little "arrow" telling us which way messages will flow: either into the channel or out of it. This is a great way to express intent—it's clear that we never intend to read votes using our readFromTwitter function; rather we will only send them on that channel.

Terminating the program whenever Decode returns an error doesn't provide a very robust solution. This is because the Twitter API documentation states that the connection will drop from time to time, and clients should consider this when consuming the services. And remember, we are going to terminate the connection periodically too, so we need to think about a way to reconnect once the connection is dropped.

Signal channels

A great use of channels in Go is to signal events between code running in different goroutines. We are going to see a real-world example of this when we write our next function.

The purpose of the function is to start a goroutine that continually calls the readFromTwitter function (with the specified votes channel to receive the votes on), until we signal that we want it to stop. And once it has stopped, we want to be notified through another signal channel. The return of the function will be a channel of struct{}; a signal channel.

Signal channels have some interesting properties that are worth taking a closer look at. Firstly, the type sent down the channels is an empty struct{}, instances of which actually take up zero bytes, since it has no fields. So struct{}{} is a great memory-efficient option for signaling events. Some people use bool types, which is also fine, although true and false both take up a byte of memory.

Note

Head over to http://play.golang.org and try this out for yourself.

The size of a bool is 1:

fmt.Println(reflect.TypeOf(true).Size())
= 1

Whereas the size of struct{}{} is 0:

fmt.Println(reflect.TypeOf(struct{}{}).Size())
= 0

The signal channels also have a buffer size of 1, which means that execution will not block until something reads the signal from the channel.

We are going to employ two signal channels in our code, one that we pass into our function that tells our goroutine that it should stop, and another (provided by the function) that signals once stopping is complete.

In twitter.go, add the following function:

func startTwitterStream(stopchan <-chan struct{}, votes chan<- string) <-chan struct{} {
  stoppedchan := make(chan struct{}, 1)
  go func() {
    defer func() {
      stoppedchan <- struct{}{}
    }()
    for {
      select {
      case <-stopchan:
        log.Println("stopping Twitter...")
        return
      default:
        log.Println("Querying Twitter...")
        readFromTwitter(votes)
        log.Println("  (waiting)")
        time.Sleep(10 * time.Second) // wait before reconnecting
      }
    }
  }()
  return stoppedchan
}

In the preceding code, the first argument stopchan is a channel of type <-chan struct{}, a receive-only signal channel. It is this channel that, outside the code, will signal on, which will tell our goroutine to stop. Remember that it's receive-only inside this function, the actual channel itself will be capable of sending. The second argument is the votes channel on which votes will be sent. The return type of our function is also a signal channel of type <-chan struct{}; a receive-only channel that we will use to indicate that we have stopped.

These channels are necessary because our function triggers its own goroutine, and immediately returns, so without this, calling code would have no idea if the spawned code were still running or not.

The first thing we do in the startTwitterStream function is make our stoppedchan, and defer the sending of a struct{}{} to indicate that we have finished when our function exits. Notice that stoppedchan is a normal channel so even though it is returned as a receive-only, we will be able to send on it from within this function.

We then start an infinite for loop in which we select from one of two channels. The first is the stopchan (the first argument), which would indicate that it was time to stop, and return (thus triggering the deferred signaling on stoppedchan). If that hasn't happened, we will call readFromTwitter (passing in the votes channel), which will go and load the options from the database and open the connection to Twitter.

When the Twitter connection dies, our code will return here where we sleep for ten seconds using the time.Sleep function. This is to give the Twitter API a rest in case it closed the connection due to overuse. Once we've rested, we re-enter the loop and check again on the stopchan channel to see if the calling code wants us to stop or not.

To make this flow clear, we are logging out key statements that will not only help us debug our code, but also let us peek into the inner workings of this somewhat complicated mechanism.

Publishing to NSQ

Once our code is successfully noticing votes on Twitter and sending them down the votes channel, we need a way to publish them into an NSQ topic; after all, this is the point of the twittervotes program.

We will write a function called publishVotes that will take the votes channel, this time of type <-chan string (a receive-only channel) and publish each string that is received from it.

Note

In our previous functions, the votes channel was of type chan<- string, but this time it's of the type <-chan string. You might think this is a mistake, or even that it means we cannot use the same channel for both but you would be wrong. The channel we create later will be made with make(chan string), neither receive or only send, and can act in both cases. The reason for using the <- operator on a channel in arguments is to make clear the intent of what the channel will be used for; or in the case where it is the return type, to prevent users from accidentally sending on channels intended for receiving or vice versa. The compiler will actually produce an error if they use such a channel incorrectly.

Once the votes channel is closed (this is how external code will tell our function to stop working), we will stop publishing and send a signal down the returned stop signal channel.

Add the publishVotes function to main.go:

func publishVotes(votes <-chan string) <-chan struct{} {
  stopchan := make(chan struct{}, 1)
  pub, _ := nsq.NewProducer("localhost:4150", nsq.NewConfig())
  go func() {
    for vote := range votes {
      pub.Publish("votes", []byte(vote)) // publish vote
    }
    log.Println("Publisher: Stopping")
    pub.Stop()
    log.Println("Publisher: Stopped")
    stopchan <- struct{}{}
  }()
  return stopchan
}

Again the first thing we do is to create the stopchan, which we later return, this time not deferring the signaling but doing it inline by sending a struct{}{} down stopchan.

Note

The difference is to show alternative options: within one codebase you should pick a style you like and stick with it, until a standard emerges within the community; in which case we should all go with that.

We then create an NSQ producer by calling NewProducer and connecting to the default NSQ port on localhost, using a default configuration. We start a goroutine, which uses another great built-in feature of the Go language that lets us continually pull values from a channel (in our case the votes channel) just by doing a normal for…range operation on it. Whenever the channel has no values, execution will be blocked until a value comes down the line. If the votes channel is closed, the for loop will exit.

Tip

To learn more about the power of channels in Go, it is highly recommended that you seek out blog posts and videos by John Graham-Cumming, in particular one entitled A Channel Compendium that he presented at Gophercon 2014 and which contains a brief history of channels, including their origin. (Interestingly, John was also the guy who successfully petitioned the British Government to officially apologize for its treatment of Alan Turing.)

When the loop exits (after the votes channel is closed) the publisher is stopped, following which the stopchan signal is sent.

Gracefully starting and stopping

When our program is terminated, we want to do a few things before actually exiting; namely closing our connection to Twitter and stopping the NSQ publisher (which actually deregisters its interest in the queue). To achieve this, we have to override the default Ctrl + C behavior.

Tip

The upcoming code blocks all go inside the main function; they are broken up so we can discuss each section before continuing.

Add the following code inside the main function:

var stoplock sync.Mutex
stop := false
stopChan := make(chan struct{}, 1)
signalChan := make(chan os.Signal, 1)
go func() { 
  <-signalChan
  stoplock.Lock()
  stop = true
  stoplock.Unlock()
  log.Println("Stopping...")
  stopChan <- struct{}{}
  closeConn()
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

Here we create a stop bool with an associated sync.Mutex so that we can access it from many goroutines at the same time. We then create two more signal channels, stopChan and signalChan, and use signal.Notify to ask Go to send the signal down the signalChan when someone tries to halt the program (either with the SIGINT interrupt, or the SIGTERM termination POSIX signals). The stopChan is how we indicate that we want our processes to terminate, and we pass it as an argument to startTwitterStream later.

We then run a goroutine that blocks waiting for the signal by trying to read from signalChan; this is what the <- operator does in this case (it's trying to read from the channel). Since we don't care about the type of signal, we don't bother capturing the object returned on the channel. Once a signal is received, we set stop to true, and close the connection. Only when one of the specified signals is sent will the rest of the goroutine code run, which is how we are able to perform teardown code before exiting the program.

Add the following piece of code (inside the main function) to open and defer the closing of the database connection:

if err := dialdb(); err != nil {
  log.Fatalln("failed to dial MongoDB:", err)
}
defer closedb()

Since the readFromTwitter method reloads the options from the database each time, and because we want to keep our program updated without having to restart it, we are going to introduce one final goroutine. This goroutine will simply call closeConn every minute, causing the connection to die, and cause readFromTwitter to be called over again. Insert the following code at the bottom of the main function to start all of these processes, and then wait for them to gracefully stop:

// start things
votes := make(chan string) // chan for votes
publisherStoppedChan := publishVotes(votes)
twitterStoppedChan := startTwitterStream(stopChan, votes)
go func() {
  for {
    time.Sleep(1 * time.Minute)
    closeConn()
    stoplock.Lock()
    if stop {
      stoplock.Unlock()
      break
    }
    stoplock.Unlock()
  }
}()
<-twitterStoppedChan
close(votes)
<-publisherStoppedChan

First we make the votes channel that we have been talking about throughout this section, which is a simple channel of string. Notice that it is neither a send (chan<-) or receive (<-chan) channel; in fact, making such channels makes little sense. We then call publishVotes, passing in the votes channel for it to receive from, and capturing the returned stop signal channel as publisherStoppedChan. Similarly, we call startTwitterStream passing in our stopChan from the beginning of the main function, and the votes channel for it to send to, while capturing the resulting stop signal channel as twitterStoppedChan.

We then start our refresher goroutine, which immediately enters an infinite for loop before sleeping for a minute and closing the connection via the call to closeConn. If the stop bool has been set to true (in that previous goroutine), we will break the loop and exit, otherwise we will loop around and wait another minute before closing the connection again. The use of the stoplock is important because we have two goroutines that might try to access the stop variable at the same time but we want to avoid collisions.

Once the goroutine has started, we then block on the twitterStoppedChan by attempting to read from it. When successful (which means the signal was sent on the stopChan), we close the votes channel which will cause the publisher's for…range loop to exit, and the publisher itself to stop, after which the signal will be sent on the publisherStoppedChan, which we wait for before exiting.

Testing

To make sure our program works, we need to do two things: first we need to create a poll in the database, and second, we need to peer inside the messaging queue to see if the messages are indeed being generated by twittervotes.

In a terminal, run the mongo command to open a database shell that allows us to interact with MongoDB. Then enter the following commands to add a test poll:

> use ballots
switched to db ballots
> db.polls.insert({"title":"Test poll","options":["happy","sad","fail","win"]})

The preceding commands add a new item to the polls collection in the ballots database. We are using some common words for options that are likely to be mentioned by people on Twitter so that we can observe real tweets being translated into messages. You might notice that our poll object is missing the results field; this is fine since we are dealing with unstructured data where documents do not have to adhere to a strict schema. The counter program we are going to write in the next section will add and maintain the results data for us later.

Press Ctrl + C to exit the MongoDB shell and type the following command:

nsq_tail --topic="votes" --lookupd-http-address=localhost:4161

The nsq_tail tool connects to the specified messaging queue topic and outputs any messages that it notices. This is where we will validate that our twittervotes program is sending messages.

In a separate terminal window, let's build and run the twittervotes program:

go build –o twittervotes
./twittervotes

Now switch back to the window running nsq_tail and notice that messages are indeed being generated in response to live Twitter activity.

Tip

If you aren't seeing much activity, try looking up trending hashtags on Twitter and adding another poll containing those options.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset