The second program we are going to implement is the
counter
tool, which will be responsible for watching out for votes in NSQ, counting them, and keeping MongoDB up to date with the latest numbers.
Create a new folder called counter
alongside twittervotes
, and add the following code to a new main.go
file:
package main import ( "flag" "fmt" "os" ) var fatalErr error func fatal(e error) { fmt.Println(e) flag.PrintDefaults() fatalErr = e } func main() { defer func() { if fatalErr != nil { os.Exit(1) } }() }
Normally when we encounter an error in our code, we use a call like log.Fatal
or os.Exit
, which immediately terminates the program. Exiting the program with a non-zero exit code is important, because it is our way of telling the operating system that something went wrong, and we didn't complete our task successfully. The problem with the normal approach is that any deferred functions we have scheduled (and therefore any tear down code we need to run), won't get a chance to execute.
The pattern employed in the preceding code snippet lets us call the fatal
function to record that an error occurred. Note that only when our main function exits will the deferred function run, which in turn calls os.Exit(1)
to exit the program with an exit code of 1
. Because the deferred statements are run in LIFO (last in, first out) order, the first function we defer will be the last function to be executed, which is why the first thing we do in the main
function is to defer the exiting code. This allows us to be sure that other functions we defer will be called before the program exits. We'll use this feature to ensure our database connection gets closed regardless of any errors.
The best time to think about cleaning up resources, such as database connections, is immediately after you have successfully obtained the resource; Go's defer
keyword makes this easy. At the bottom of the main function, add the following code:
log.Println("Connecting to database...") db, err := mgo.Dial("localhost") if err != nil { fatal(err) return } defer func() { log.Println("Closing database connection...") db.Close() }() pollData := db.DB("ballots").C("polls")
This code uses the familiar mgo.Dial
method to open a session to the locally running MongoDB instance and immediately defers a function that closes the session. We can be sure that this code will run before our previously deferred statement containing the exit code (because deferred functions are run in the reverse order in which they were called). Therefore, whatever happens in our program, we know that the database session will definitely and properly close.
At the end of the snippet, we use the mgo
fluent API to keep a reference of the ballots.polls
data collection in the pollData
variable, which we will use later to make queries.
In order to count the votes, we need to consume the messages on the votes
topic in NSQ, and we'll need a place to store them. Add the following variables to the main
function:
var counts map[string]int var countsLock sync.Mutex
A map and a lock (sync.Mutex
) is a common combination in Go, because we will have multiple goroutines trying to access the same map and we need to avoid corrupting it by trying to modify or read it at the same time.
Add the following code to the main
function:
log.Println("Connecting to nsq...") q, err := nsq.NewConsumer("votes", "counter", nsq.NewConfig()) if err != nil { fatal(err) return }
The NewConsumer
function allows us to set up an object that will listen on the votes
NSQ topic, so when twittervotes
publishes a vote on that topic, we can handle it in this program. If NewConsumer
returns an error, we'll use our fatal
function to record it and return.
Next we are going to add the code that handles messages (votes) from NSQ:
q.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error { countsLock.Lock() defer countsLock.Unlock() if counts == nil { counts = make(map[string]int) } vote := string(m.Body) counts[vote]++ return nil }))
We call the AddHandler
method on nsq.Consumer
and pass it a function that will be called for every message received on the votes
topic.
When a vote comes in, the first thing we do is lock the countsLock
mutex. Next we defer the unlocking of the mutex for when the function exits. This allows us to be sure that while NewConsumer
is running, we are the only ones allowed to modify the map; others will have to wait until our function exits before they can use it. Calls to the Lock
method block execution while the lock is in place, and it only continues when the lock is released by a call to Unlock
. This is why it's vital that every Lock
call has an Unlock
counterpart, otherwise we will deadlock our program.
Every time we receive a vote, we check if counts
is nil
and make a new map if it is, because once the database has been updated with the latest results, we want to reset everything and start at zero. Finally we increase the int
value by one for the given key, and return nil
indicating no errors.
Although we have created our NSQ consumer, and added our handler function, we still need to connect to the NSQ service, which we will do by adding the following code:
if err := q.ConnectToNSQLookupd("localhost:4161"); err != nil { fatal(err) return }
It is important to note that we are actually connecting to the HTTP port of the nsqlookupd
instance, rather than NSQ instances; this abstraction means that our program doesn't need to know where the messages are coming from in order to consume them. If we fail to connect to the server (for instance if we forget to start it), we'll get an error, which we report to our fatal function before immediately returning.
Our code will listen out for votes, and keep a map of the results in memory, but that information is so far trapped inside our program. Next, we need to add the code that will periodically push the results to the database:
log.Println("Waiting for votes on nsq...") var updater *time.Timer updater = time.AfterFunc(updateDuration, func() { countsLock.Lock() defer countsLock.Unlock() if len(counts) == 0 { log.Println("No new votes, skipping database update") } else { log.Println("Updating database...") log.Println(counts) ok := true for option, count := range counts { sel := bson.M{"options": bson.M{"$in": []string{option}}} up := bson.M{"$inc": bson.M{"results." + option: count}} if _, err := pollData.UpdateAll(sel, up); err != nil { log.Println("failed to update:", err) ok = false } } if ok { log.Println("Finished updating database...") counts = nil // reset counts } } updater.Reset(updateDuration) })
The time.AfterFunc
function calls the function after the specified duration in a goroutine of its own. At the end we call Reset
, which starts the process again; this allows us to schedule our update code to run at regular intervals.
When our update function runs, the first thing we do is lock the countsLock
, and defer its unlocking. We then check to see if there are any values in the counts map. If there aren't, we just log that we're skipping the update and wait for next time.
If there are some votes, we iterate over the counts
map pulling out the option and number of votes (since the last update), and use some MongoDB magic to update the results.
We first create the selector for our update operation using the bson.M
shortcut type, which is similar to creating map[string]interface{}
types. The selector we create will look something like this:
{ "options": { "$in": ["happy"] } }
In MongoDB, the preceding BSON specifies that we want to select polls where "happy"
is one of the items in the options
array.
Next, we use the same technique to generate the update operation, which looks something like this:
{ "$inc": { "results.happy": 3 } }
In MongoDB, the preceding BSON specifies that we want to increase the results.happy
field by 3. If there is no results
map in the poll, one will be created, and if there is no happy
key inside results
, 0
will be assumed.
We then call the UpdateAll
method on our pollsData
query to issue the command to the database, which will in turn update every poll that matches the selector (contrast this to the Update
method, which will update only one). If something goes wrong, we report it and set the ok
Boolean to false. If all goes well, we set the counts
map to nil, since we want to reset the counter.
We are going to specify the updateDuration
as a constant at the top of the file, which will make it easy for us to change when we are testing our program. Add the following code above the main
function:
const updateDuration = 1 * time.Second
The last thing to do before our program is ready is to make sure our main
function waits for operations to complete before exiting, like we did in our twittervotes
program. Add the following code at the end of the main
function:
termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) for { select { case <-termChan: updater.Stop() q.Stop() case <-q.StopChan: // finished return } }
Here we have employed a slightly different tactic than before. We trap the termination event, which will cause a signal to go down termChan
when we hit Ctrl + C. Next we start an infinite loop, inside which we use Go's select
structure to allow us to run code if we receive something on either termChan
, or the StopChan
of the consumer.
In fact, we will only ever get a termChan
signal first in response to a Ctrl+C
-press, at which point we stop the updater
timer and ask the consumer to stop listening for votes. Execution then re-enters the loop and will block until the consumer reports that it has indeed stopped by signaling on its StopChan
. When that happens, we're done and we exit, at which point our deferred statement runs, which, if you remember, tidies up the database session.