Counting votes

The second program we are going to implement is the counter tool, which will be responsible for watching out for votes in NSQ, counting them, and keeping MongoDB up to date with the latest numbers.

Create a new folder called counter alongside twittervotes, and add the following code to a new main.go file:

package main
import (
  "flag"
  "fmt"
  "os"
)
var fatalErr error
func fatal(e error) {
  fmt.Println(e)
  flag.PrintDefaults()
  fatalErr = e
}
func main() {
  defer func() {
    if fatalErr != nil {
      os.Exit(1)
    }
  }()
}

Normally when we encounter an error in our code, we use a call like log.Fatal or os.Exit, which immediately terminates the program. Exiting the program with a non-zero exit code is important, because it is our way of telling the operating system that something went wrong, and we didn't complete our task successfully. The problem with the normal approach is that any deferred functions we have scheduled (and therefore any tear down code we need to run), won't get a chance to execute.

The pattern employed in the preceding code snippet lets us call the fatal function to record that an error occurred. Note that only when our main function exits will the deferred function run, which in turn calls os.Exit(1) to exit the program with an exit code of 1. Because the deferred statements are run in LIFO (last in, first out) order, the first function we defer will be the last function to be executed, which is why the first thing we do in the main function is to defer the exiting code. This allows us to be sure that other functions we defer will be called before the program exits. We'll use this feature to ensure our database connection gets closed regardless of any errors.

Connecting to the database

The best time to think about cleaning up resources, such as database connections, is immediately after you have successfully obtained the resource; Go's defer keyword makes this easy. At the bottom of the main function, add the following code:

log.Println("Connecting to database...")
db, err := mgo.Dial("localhost")
if err != nil {
  fatal(err)
  return
}
defer func() {
  log.Println("Closing database connection...")
  db.Close()
}()
pollData := db.DB("ballots").C("polls")

This code uses the familiar mgo.Dial method to open a session to the locally running MongoDB instance and immediately defers a function that closes the session. We can be sure that this code will run before our previously deferred statement containing the exit code (because deferred functions are run in the reverse order in which they were called). Therefore, whatever happens in our program, we know that the database session will definitely and properly close.

Tip

The log statements are optional, but will help us see what's going on when we run and exit our program.

At the end of the snippet, we use the mgo fluent API to keep a reference of the ballots.polls data collection in the pollData variable, which we will use later to make queries.

Consuming messages in NSQ

In order to count the votes, we need to consume the messages on the votes topic in NSQ, and we'll need a place to store them. Add the following variables to the main function:

var counts map[string]int
var countsLock sync.Mutex

A map and a lock (sync.Mutex) is a common combination in Go, because we will have multiple goroutines trying to access the same map and we need to avoid corrupting it by trying to modify or read it at the same time.

Add the following code to the main function:

log.Println("Connecting to nsq...")
q, err := nsq.NewConsumer("votes", "counter", nsq.NewConfig())
if err != nil {
  fatal(err)
  return
}

The NewConsumer function allows us to set up an object that will listen on the votes NSQ topic, so when twittervotes publishes a vote on that topic, we can handle it in this program. If NewConsumer returns an error, we'll use our fatal function to record it and return.

Next we are going to add the code that handles messages (votes) from NSQ:

q.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
  countsLock.Lock()
  defer countsLock.Unlock()
  if counts == nil {
    counts = make(map[string]int)
  }
  vote := string(m.Body)
  counts[vote]++
  return nil
}))

We call the AddHandler method on nsq.Consumer and pass it a function that will be called for every message received on the votes topic.

When a vote comes in, the first thing we do is lock the countsLock mutex. Next we defer the unlocking of the mutex for when the function exits. This allows us to be sure that while NewConsumer is running, we are the only ones allowed to modify the map; others will have to wait until our function exits before they can use it. Calls to the Lock method block execution while the lock is in place, and it only continues when the lock is released by a call to Unlock. This is why it's vital that every Lock call has an Unlock counterpart, otherwise we will deadlock our program.

Every time we receive a vote, we check if counts is nil and make a new map if it is, because once the database has been updated with the latest results, we want to reset everything and start at zero. Finally we increase the int value by one for the given key, and return nil indicating no errors.

Although we have created our NSQ consumer, and added our handler function, we still need to connect to the NSQ service, which we will do by adding the following code:

if err := q.ConnectToNSQLookupd("localhost:4161"); err != nil {
  fatal(err)
  return
}

It is important to note that we are actually connecting to the HTTP port of the nsqlookupd instance, rather than NSQ instances; this abstraction means that our program doesn't need to know where the messages are coming from in order to consume them. If we fail to connect to the server (for instance if we forget to start it), we'll get an error, which we report to our fatal function before immediately returning.

Keeping the database updated

Our code will listen out for votes, and keep a map of the results in memory, but that information is so far trapped inside our program. Next, we need to add the code that will periodically push the results to the database:

log.Println("Waiting for votes on nsq...")
var updater *time.Timer
updater = time.AfterFunc(updateDuration, func() {
  countsLock.Lock()
  defer countsLock.Unlock()
  if len(counts) == 0 {
    log.Println("No new votes, skipping database update")
  } else {
    log.Println("Updating database...")
    log.Println(counts)
    ok := true
    for option, count := range counts {
      sel := bson.M{"options": bson.M{"$in": []string{option}}}
      up := bson.M{"$inc": bson.M{"results." + option: count}}
      if _, err := pollData.UpdateAll(sel, up); err != nil {
        log.Println("failed to update:", err)
        ok = false
      }
    }
    if ok {
      log.Println("Finished updating database...")
      counts = nil // reset counts
    }
  }
  updater.Reset(updateDuration)
})

The time.AfterFunc function calls the function after the specified duration in a goroutine of its own. At the end we call Reset, which starts the process again; this allows us to schedule our update code to run at regular intervals.

When our update function runs, the first thing we do is lock the countsLock, and defer its unlocking. We then check to see if there are any values in the counts map. If there aren't, we just log that we're skipping the update and wait for next time.

If there are some votes, we iterate over the counts map pulling out the option and number of votes (since the last update), and use some MongoDB magic to update the results.

Note

MongoDB stores BSON (short for Binary JSON) documents internally, which are easier to traverse than normal JSON documents, and is why the mgo package comes with mgo/bson encoding package. When using mgo, we will often use bson types, such as the bson.M map to describe concepts for MongoDB.

We first create the selector for our update operation using the bson.M shortcut type, which is similar to creating map[string]interface{} types. The selector we create will look something like this:

{
  "options": {
    "$in": ["happy"]
  }
}

In MongoDB, the preceding BSON specifies that we want to select polls where "happy" is one of the items in the options array.

Next, we use the same technique to generate the update operation, which looks something like this:

{
  "$inc": {
    "results.happy": 3
  }
}

In MongoDB, the preceding BSON specifies that we want to increase the results.happy field by 3. If there is no results map in the poll, one will be created, and if there is no happy key inside results, 0 will be assumed.

We then call the UpdateAll method on our pollsData query to issue the command to the database, which will in turn update every poll that matches the selector (contrast this to the Update method, which will update only one). If something goes wrong, we report it and set the ok Boolean to false. If all goes well, we set the counts map to nil, since we want to reset the counter.

We are going to specify the updateDuration as a constant at the top of the file, which will make it easy for us to change when we are testing our program. Add the following code above the main function:

const updateDuration = 1 * time.Second

Responding to Ctrl + C

The last thing to do before our program is ready is to make sure our main function waits for operations to complete before exiting, like we did in our twittervotes program. Add the following code at the end of the main function:

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
for {
  select {
  case <-termChan:
    updater.Stop()
    q.Stop()
  case <-q.StopChan:
    // finished
    return
  }
}

Here we have employed a slightly different tactic than before. We trap the termination event, which will cause a signal to go down termChan when we hit Ctrl + C. Next we start an infinite loop, inside which we use Go's select structure to allow us to run code if we receive something on either termChan, or the StopChan of the consumer.

In fact, we will only ever get a termChan signal first in response to a Ctrl+C-press, at which point we stop the updater timer and ask the consumer to stop listening for votes. Execution then re-enters the loop and will block until the consumer reports that it has indeed stopped by signaling on its StopChan. When that happens, we're done and we exit, at which point our deferred statement runs, which, if you remember, tidies up the database session.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset