In your $GOPATH/src
folder, alongside other projects, create a new folder called socialpoll
for this chapter. This folder won't be a Go package or program by itself, but will contain our three component programs. Inside socialpoll
, create a new folder called twittervotes
and add the obligatory main.go
template (this is important as main
packages without a main
function won't compile):
package main func main(){}
Our twittervotes
program is going to:
mgo
, and collect all options from the options
array in each documentIn order to use the streaming API, we will need authentication credentials from Twitter's Application Management console, much in the same way we did for our Gomniauth service providers in Chapter 3, Three Ways to Implement Profile Pictures. Head over to https://apps.twitter.com and create a new app called something like SocialPoll
(the names have to be unique, so you can have some fun here; the choice of name doesn't affect the code either way). When your app has been created, visit the API Keys tab and locate the Your access token section where you need to create a new access token. After a short delay, refresh the page and notice that you in fact have two sets of keys and secrets; an API key and a secret, and an access token and the corresponding secret. Following good coding practices, we are going to set these values as environment variables so that our program can have access to them without us having to hardcode them in our source files.
The keys we will use in this chapter are:
SP_TWITTER_KEY
SP_TWITTER_SECRET
SP_TWITTER_ACCESSTOKEN
SP_TWITTER_ACCESSSECRET
You can set the environment variables however you like, but since the app relies on them in order to work, creating a new file called setup.sh
(for bash shells) or setup.bat
(on Windows) is a good idea since you can check such files into your source code repository. Insert the following code in setup.sh
or setup.bat
by copying the appropriate values from the Twitter app page:
#!/bin/bash export SP_TWITTER_KEY=yCwwKKnuBnUBrelyTN... export SP_TWITTER_SECRET=6on0YRYniT1sI3f... export SP_TWITTER_ACCESSTOKEN=2427-13677... export SP_TWITTER_ACCESSSECRET=SpnZf336u...
Run the file with the source or call commands to have the values appropriately set, or add them to your .bashrc
or C:cmdauto.cmd
files to save you running them every time you open a new terminal window.
The Twitter streaming API supports HTTP connections that stay open for a long time, and given the design of our solution, we are going to need to access the net.Conn
object in order to close it from outside of the goroutine in which requests occur. We can achieve this by providing our own dial
method to an http.Transport
object that we will create.
Create a new file called twitter.go
inside twittervotes
(which is where all things Twitter-related will live), and insert the following code:
var conn net.Conn func dial(netw, addr string) (net.Conn, error) { if conn != nil { conn.Close() conn = nil } netc, err := net.DialTimeout(netw, addr, 5*time.Second) if err != nil { return nil, err } conn = netc return netc, nil }
Our bespoke dial
function first ensures conn
is closed, and then opens a new connection keeping the conn
variable updated with the current connection. If a connection dies (Twitter's API will do this from time to time) or is closed by us, we can redial without worrying about zombie connections.
We will periodically close the connection ourselves and initiate a new one, because we want to reload the options from the database at regular intervals. To do this, we need a function that closes the connection, and also closes an io.ReadCloser
that we will use to read the body of the responses. Add the following code to twitter.go
:
var reader io.ReadCloser func closeConn() { if conn != nil { conn.Close() } if reader != nil { reader.Close() } }
Now we can call closeConn
at any time to break the ongoing connection with Twitter and tidy things up. In most cases, our code will load the options from the database again and open a new connection right away, but if we're shutting the program down (in response to a Ctrl + C hit) then we can call closeConn
just before we exit.
Next we are going to write a function that will read the environment variables and set up the OAuth
objects we'll need in order to authenticate the requests. Add the following code in the twitter.go
file:
var ( authClient *oauth.Client creds *oauth.Credentials ) func setupTwitterAuth() { var ts struct { ConsumerKey string `env:"SP_TWITTER_KEY,required"` ConsumerSecret string `env:"SP_TWITTER_SECRET,required"` AccessToken string `env:"SP_TWITTER_ACCESSTOKEN,required"` AccessSecret string `env:"SP_TWITTER_ACCESSSECRET,required"` } if err := envdecode.Decode(&ts); err != nil { log.Fatalln(err) } creds = &oauth.Credentials{ Token: ts.AccessToken, Secret: ts.AccessSecret, } authClient = &oauth.Client{ Credentials: oauth.Credentials{ Token: ts.ConsumerKey, Secret: ts.ConsumerSecret, }, } }
Here we define a struct
type to store the environment variables that we need to authenticate with Twitter. Since we don't need to use the type elsewhere, we define it inline and creating a variable called ts
of this anonymous type (that's why we have the somewhat unusual var ts struct…
code). We then use Joe Shaw's elegant envdecode
package to pull in those environment variables for us. You will need to run go get github.com/joeshaw/envdecode
and also import the log
package. Our program will try to load appropriate values for all the fields marked required
, and return an error if it fails to do so, which reminds people that the program won't work without Twitter credentials.
The strings inside the back ticks alongside each field in struct
are called tags, and are available through a reflection interface, which is how envdecode
knows which variables to look for. Tyler Bunnell and I added the required argument to this package, which indicates that it is an error for any of the environment variables to be missing (or empty).
Once we have the keys, we use them to create oauth.Credentials
and an oauth.Client
object from Gary Burd's go-oauth
package, which will allow us to authorize requests with Twitter.
Now that we have the ability to control the underlying connection and authorize requests, we are ready to write the code that will actually build the authorized request, and return the response. In twitter.go
, add the following code:
var ( authSetupOnce sync.Once httpClient *http.Client ) func makeRequest(req *http.Request, params url.Values) (*http.Response, error) { authSetupOnce.Do(func() { setupTwitterAuth() httpClient = &http.Client{ Transport: &http.Transport{ Dial: dial, }, } }) formEnc := params.Encode() req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Content-Length", strconv.Itoa(len(formEnc))) req.Header.Set("Authorization", authClient.AuthorizationHeader(creds, "POST", req.URL, params)) return httpClient.Do(req) }
We use sync.Once
to ensure our initialization code only gets run once despite the number of times we call makeRequest
. After calling the setupTwitterAuth
method, we create a new http.Client
using an http.Transport
that uses our custom dial
method. We then set the appropriate headers needed for authorization with Twitter by encoding the specified params
object that will contain the options we are querying for.
In order to load the polls, and therefore the options to search Twitter for, we need to connect to and query MongoDB. In main.go
, add the two functions dialdb
and closedb
:
var db *mgo.Session func dialdb() error { var err error log.Println("dialing mongodb: localhost") db, err = mgo.Dial("localhost") return err } func closedb() { db.Close() log.Println("closed database connection") }
These two functions will connect to and disconnect from the locally running MongoDB instance using the mgo
package, and store mgo.Session
(the database connection object) in a global variable called db
.
Assuming MongoDB is running and our code is able to connect, we need to load the poll objects and extract all the options from the documents, which we will then use to search Twitter. Add the following Options
function to main.go
:
type poll struct { Options []string } func loadOptions() ([]string, error) { var options []string iter := db.DB("ballots").C("polls").Find(nil).Iter() var p poll for iter.Next(&p) { options = append(options, p.Options...) } iter.Close() return options, iter.Err() }
Our poll document contains more than just Options
, but our program doesn't care about anything else, so there's no need for us to bloat our poll
struct. We use the db
variable to access the polls
collection from the ballots
database, and call the mgo
package's fluent Find
method, passing nil
(meaning no filtering).
A fluent interface (first coined by Eric Evans and Martin Fowler) refers to an API design that aims to make the code more readable by allowing you to chain together method calls. This is achieved by each method returning the context object itself, so that another method can be called directly afterwards. For example, mgo
allows you to write queries such as this:
query := col.Find(q).Sort("field").Limit(10).Skip(10)
We then get an iterator by calling the Iter
method, which allows us to access each poll one by one. This is a very memory-efficient way of reading the poll data, because it only ever uses a single poll
object. If we were to use the All
method instead, the amount of memory we'd use would depend on the number of polls we had in our database, which would be out of our control.
When we have a poll, we use the append
method to build up the options slice. Of course, with millions of polls in the database, this slice too would grow large and unwieldy. For that kind of scale, we would probably run multiple twittervotes
programs, each dedicated to a portion of the poll data. A simple way to do this would be to break polls into groups based on the letters the titles begin with, such as group A-N and O-Z. A somewhat more sophisticated approach would be to add a field to the poll
document grouping it up in a more controlled manner, perhaps based on the stats for the other groups so that we are able to balance the load across many twittervotes
instances.
Finally, we close the iterator and clean up any used memory before returning the options and any errors that occurred while iterating (by calling the Err
method on the mgo.Iter
object).
Now we are able to load the options and make authorized requests to the Twitter API. We are thus ready to write the code that initiates the connection, and continuously reads from the stream until either we call our closeConn
method, or Twitter closes the connection for one reason or another. The structure contained in the stream is a complex one containing all kinds of information about the tweet—who made it and when, and even what links or mentions of users occur in the body (see Twitter's API documentation for more details). However, we are only interested in the tweet text itself so you need not worry about all the other noise; add the following structure to twitter.go
:
type tweet struct { Text string }
Using this new structure, in twitter.go
, add the following readFromTwitter
function that takes a send-only channel called votes
; this is how this function will inform the rest of our program that it has noticed a vote on twitter:
func readFromTwitter(votes chan<- string) { options, err := loadOptions() if err != nil { log.Println("failed to load options:", err) return } u, err := url.Parse("https://stream.twitter.com/1.1/statuses/filter.json") if err != nil { log.Println("creating filter request failed:", err) return } query := make(url.Values) query.Set("track", strings.Join(options, ",")) req, err := http.NewRequest("POST", u.String(), strings.NewReader(query.Encode())) if err != nil { log.Println("creating filter request failed:", err) return } resp, err := makeRequest(req, query) if err != nil { log.Println("making request failed:", err) return } reader := resp.Body decoder := json.NewDecoder(reader) for { var tweet tweet if err := decoder.Decode(&tweet); err != nil { break } for _, option := range options { if strings.Contains( strings.ToLower(tweet.Text), strings.ToLower(option), ) { log.Println("vote:", option) votes <- option } } } }
In the preceding code, after loading the options from all the polls data (by calling the loadOptions
function), we use url.Parse
to create a url.URL
object describing the appropriate endpoint on Twitter. We build a url.Values
object called query
, and set the options as a comma-separated list. As per the API, we make a new POST
request using the encoded url.Values
object as the body, and pass it to makeRequest
along with the query object itself. All being well, we make a new json.Decoder
from the body of the request, and keep reading inside an infinite for
loop by calling the Decode
method. If there is an error (probably due to the connection being closed), we simply break the loop and exit the function. If there is a tweet to read, it will be decoded into the tweet
variable, which will give us access to the Text
property (the 140 characters of the tweet itself). We then iterate over all possible options, and if the tweet has mentioned it, we send it on the votes
channel. This technique also allows a tweet to contain many votes at the same time, something you may or may not decide to change based on the rules of the election.
The votes
channel is
send-only (which means we cannot receive on it), since it is of the type chan<- string
. Think of the little "arrow" telling us which way messages will flow: either into the channel or out of it. This is a great way to express intent—it's clear that we never intend to read votes using our readFromTwitter
function; rather we will only send them on that channel.
Terminating the program whenever Decode
returns an error doesn't provide a very robust solution. This is because the Twitter API documentation states that the connection will drop from time to time, and clients should consider this when consuming the services. And remember, we are going to terminate the connection periodically too, so we need to think about a way to reconnect once the connection is dropped.
A great use of channels in Go is to signal events between code running in different goroutines. We are going to see a real-world example of this when we write our next function.
The purpose of the function is to start a goroutine that continually calls the readFromTwitter
function (with the specified votes
channel to receive the votes on), until we signal that we want it to stop. And once it has stopped, we want to be notified through another signal channel. The return of the function will be a channel of struct{}
; a signal channel.
Signal channels have some interesting properties that are worth taking a closer look at. Firstly, the type sent down the channels is an empty struct{}
, instances of which actually take up zero bytes, since it has no fields. So struct{}{}
is a great memory-efficient option for signaling events. Some people use bool
types, which is also fine, although true
and false
both take up a byte of memory.
Head over to http://play.golang.org and try this out for yourself.
The size of a bool
is 1:
fmt.Println(reflect.TypeOf(true).Size()) = 1
Whereas the size of struct{}{}
is 0
:
fmt.Println(reflect.TypeOf(struct{}{}).Size()) = 0
The signal channels also have a buffer size of 1, which means that execution will not block until something reads the signal from the channel.
We are going to employ two signal channels in our code, one that we pass into our function that tells our goroutine that it should stop, and another (provided by the function) that signals once stopping is complete.
In twitter.go
, add the following function:
func startTwitterStream(stopchan <-chan struct{}, votes chan<- string) <-chan struct{} { stoppedchan := make(chan struct{}, 1) go func() { defer func() { stoppedchan <- struct{}{} }() for { select { case <-stopchan: log.Println("stopping Twitter...") return default: log.Println("Querying Twitter...") readFromTwitter(votes) log.Println(" (waiting)") time.Sleep(10 * time.Second) // wait before reconnecting } } }() return stoppedchan }
In the preceding code, the first argument stopchan
is a channel of type <-chan struct{}
, a
receive-only signal channel. It is this channel that, outside the code, will signal on, which will tell our goroutine to stop. Remember that it's receive-only inside this function, the actual channel itself will be capable of sending. The second argument is the votes
channel on which votes will be sent. The return type of our function is also a signal channel of type <-chan struct{}
; a receive-only channel that we will use to indicate that we have stopped.
These channels are necessary because our function triggers its own goroutine, and immediately returns, so without this, calling code would have no idea if the spawned code were still running or not.
The first thing we do in the startTwitterStream
function is make our stoppedchan
, and defer the sending of a struct{}{}
to indicate that we have finished when our function exits. Notice that stoppedchan
is a normal channel so even though it is returned as a receive-only, we will be able to send on it from within this function.
We then start an infinite for
loop in which we select from one of two channels. The first is the stopchan
(the first argument), which would indicate that it was time to stop, and return (thus triggering the deferred signaling on stoppedchan
). If that hasn't happened, we will call readFromTwitter
(passing in the votes
channel), which will go and load the options from the database and open the connection to Twitter.
When the Twitter connection dies, our code will return here where we sleep for ten seconds using the time.Sleep
function. This is to give the Twitter API a rest in case it closed the connection due to overuse. Once we've rested, we re-enter the loop and check again on the stopchan
channel to see if the calling code wants us to stop or not.
To make this flow clear, we are logging out key statements that will not only help us debug our code, but also let us peek into the inner workings of this somewhat complicated mechanism.
Once our code is successfully noticing votes on Twitter and sending them down the votes
channel, we need a way to publish them into an NSQ topic; after all, this is the point of the twittervotes
program.
We will write a function called publishVotes
that will take the votes
channel, this time of type <-chan string
(a receive-only channel) and publish each string that is received from it.
In our previous functions, the votes
channel was of type chan<- string
, but this time it's of the type <-chan string
. You might think this is a mistake, or even that it means we cannot use the same channel for both but you would be wrong. The channel we create later will be made with make(chan string)
, neither receive or only send, and can act in both cases. The reason for using the <-
operator on a channel in arguments is to make clear the intent of what the channel will be used for; or in the case where it is the return type, to prevent users from accidentally sending on channels intended for receiving or vice versa. The compiler will actually produce an error if they use such a channel incorrectly.
Once the votes
channel is closed (this is how external code will tell our function to stop working), we will stop publishing and send a signal down the returned stop signal channel.
Add the publishVotes
function to main.go
:
func publishVotes(votes <-chan string) <-chan struct{} { stopchan := make(chan struct{}, 1) pub, _ := nsq.NewProducer("localhost:4150", nsq.NewConfig()) go func() { for vote := range votes { pub.Publish("votes", []byte(vote)) // publish vote } log.Println("Publisher: Stopping") pub.Stop() log.Println("Publisher: Stopped") stopchan <- struct{}{} }() return stopchan }
Again the first thing we do is to create the stopchan
, which we later return, this time not deferring the signaling but doing it inline by sending a struct{}{}
down stopchan
.
We then create an NSQ producer by calling NewProducer
and connecting to the default NSQ port on localhost
, using a default configuration. We start a goroutine, which uses another great built-in feature of the Go language that lets us continually pull values from a channel (in our case the votes
channel) just by doing a normal for…range
operation on it. Whenever the channel has no values, execution will be blocked until a value comes down the line. If the votes
channel is closed, the for
loop will exit.
To learn more about the power of channels in Go, it is highly recommended that you seek out blog posts and videos by John Graham-Cumming, in particular one entitled A Channel Compendium that he presented at Gophercon 2014 and which contains a brief history of channels, including their origin. (Interestingly, John was also the guy who successfully petitioned the British Government to officially apologize for its treatment of Alan Turing.)
When the loop exits (after the votes
channel is closed) the publisher is stopped, following which the stopchan
signal is sent.
When our program is terminated, we want to do a few things before actually exiting; namely closing our connection to Twitter and stopping the NSQ publisher (which actually deregisters its interest in the queue). To achieve this, we have to override the default Ctrl + C behavior.
Add the following code inside the main
function:
var stoplock sync.Mutex stop := false stopChan := make(chan struct{}, 1) signalChan := make(chan os.Signal, 1) go func() { <-signalChan stoplock.Lock() stop = true stoplock.Unlock() log.Println("Stopping...") stopChan <- struct{}{} closeConn() }() signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
Here we create a stop bool
with an associated sync.Mutex
so that we can access it from many goroutines at the same time. We then create two more signal channels, stopChan
and signalChan
, and use signal.Notify
to ask Go to send the signal down the signalChan
when someone tries to halt the program (either with the SIGINT
interrupt, or the SIGTERM
termination POSIX signals). The stopChan
is how we indicate that we want our processes to terminate, and we pass it as an argument to startTwitterStream
later.
We then run a goroutine that blocks waiting for the signal by trying to read from signalChan
; this is what the <-
operator does in this case (it's trying to read from the channel). Since we don't care about the type of signal, we don't bother capturing the object returned on the channel. Once a signal is received, we set stop
to true
, and close the connection. Only when one of the specified signals is sent will the rest of the goroutine code run, which is how we are able to perform teardown code before exiting the program.
Add the following piece of code (inside the main function) to open and defer the closing of the database connection:
if err := dialdb(); err != nil { log.Fatalln("failed to dial MongoDB:", err) } defer closedb()
Since the readFromTwitter
method reloads the options from the database each time, and because we want to keep our program updated without having to restart it, we are going to introduce one final goroutine. This goroutine will simply call closeConn
every minute, causing the connection to die, and cause readFromTwitter
to be called over again. Insert the following code at the bottom of the main
function to start all of these processes, and then wait for them to gracefully stop:
// start things votes := make(chan string) // chan for votes publisherStoppedChan := publishVotes(votes) twitterStoppedChan := startTwitterStream(stopChan, votes) go func() { for { time.Sleep(1 * time.Minute) closeConn() stoplock.Lock() if stop { stoplock.Unlock() break } stoplock.Unlock() } }() <-twitterStoppedChan close(votes) <-publisherStoppedChan
First we make the votes
channel that we have been talking about throughout this section, which is a simple channel of string. Notice that it is neither a send (chan<-
) or receive (<-chan
) channel; in fact, making such channels makes little sense. We then call publishVotes
, passing in the votes
channel for it to receive from, and capturing the returned stop signal channel as publisherStoppedChan
. Similarly, we call startTwitterStream
passing in our stopChan
from the beginning of the main
function, and the votes
channel for it to send to, while capturing the resulting stop signal channel as twitterStoppedChan
.
We then start our refresher goroutine, which immediately enters an infinite for
loop before sleeping for a minute and closing the connection via the call to closeConn
. If the stop bool
has been set to true (in that previous goroutine), we will break
the loop and exit, otherwise we will loop around and wait another minute before closing the connection again. The use of the stoplock
is important because we have two goroutines that might try to access the stop variable at the same time but we want to avoid collisions.
Once the goroutine has started, we then block on the twitterStoppedChan
by attempting to read from it. When successful (which means the signal was sent on the stopChan
), we close the votes
channel which will cause the publisher's for…range
loop to exit, and the publisher itself to stop, after which the signal will be sent on the publisherStoppedChan
, which we wait for before exiting.
To make sure our program works, we need to do two things: first we need to create a poll in the database, and second, we need to peer inside the messaging queue to see if the messages are indeed being generated by twittervotes
.
In a terminal, run the mongo
command to open a database shell that allows us to interact with MongoDB. Then enter the following commands to add a test poll:
> use ballots switched to db ballots > db.polls.insert({"title":"Test poll","options":["happy","sad","fail","win"]})
The preceding commands add a new item to the polls
collection in the ballots
database. We are using some common words for options that are likely to be mentioned by people on Twitter so that we can observe real tweets being translated into messages. You might notice that our poll object is missing the results
field; this is fine since we are dealing with unstructured data where documents do not have to adhere to a strict schema. The counter
program we are going to write in the next section will add and maintain the results
data for us later.
Press Ctrl + C to exit the MongoDB shell and type the following command:
nsq_tail --topic="votes" --lookupd-http-address=localhost:4161
The nsq_tail
tool connects to the specified messaging queue topic and outputs any messages that it notices. This is where we will validate that our twittervotes
program is sending messages.
In a separate terminal window, let's build and run the twittervotes
program:
go build –o twittervotes ./twittervotes
Now switch back to the window running nsq_tail
and notice that messages are indeed being generated in response to live Twitter activity.