Earlier in this book, we talked about concurrency patterns and a bit about workers. We even brought the workers concept into play in the previous chapter, when we were building our logging systems.
Truly speaking, "worker" is a fairly generic and ambiguous concept, not just in Go, but in general programming and development. In some languages, it's an object/instantiated class, and in others it's a concurrent actor. In functional programming languages, worker is a graduated function return passed to another.
If we go back to the preface, we will see that we have literally used the Go gopher as an example of a worker. In short, a worker is something more complex than a single function call or programmatic action that will perform a task one or more times.
So why are we talking about it now? When we build our channels, we are creating a mechanism to do work. When we have a struct or an interface, we're combining methods and values at a single place, and then doing work using that object as both a mechanism for the work as well as a place to store information about that work.
This is particularly useful in application design, as we're able to delegate various elements of an application's functionality to distinct and well-defined workers. Consider, for example, a server pinging application that has specific pieces doing specific things in a self-contained, compartmentalized manner.
We'll attempt to check for server availability via the HTTP package, check the status code and errors, and back off if we find problems with any particular server. You can probably see where this is going—this is the most basic approach to load balancing. But an important design consideration is the way in which we manage our channels.
We'll have a master channel, where all important global transactions should be accumulated and evaluated, but each individual server will also have its own channels for handling tasks that are important only to that individual struct.
The design in the following code can be considered as a rudimentary pipeline, which is roughly akin to the producer/consumer model we talked about in the previous chapters:
package main import ( "fmt" "time" "net/http" ) const INIT_DELAY = 3000 const MAX_DELAY = 60000 const MAX_RETRIES = 4 const DELAY_INCREMENT = 5000
The preceding code gives the configuration part of the application, setting scope on how frequently to check servers, the maximum amount of time for backing off, and the maximum amount of retries before giving up entirely.
The DELAY_INCREMENT
value represents how much time we will add to our server checking process each time we discover a problem. Let's take a look at how to create a server in the following section:
var Servers []Server type Server struct { Name string URI string LastChecked time.Time Status bool StatusCode int Delay int Retries int Channel chan bool }
Now, we design the basic server (using the following code), which contains its current status, the last time it was checked, the present delay between checks, its own channel for evaluating statuses and establishing the new status, and updated retry delay:
func (s *Server) checkServerStatus(sc chan *Server) { var previousStatus string if s.Status == true { previousStatus = "OK" }else { previousStatus = "down" } fmt.Println("Checking Server",s.Name) fmt.Println(" Server was",previousStatus,"on last check at",s.LastChecked) response, err := http.Get(s.URI) if err != nil { fmt.Println(" Error: ",err) s.Status = false s.StatusCode = 0 }else { fmt.Println(response.Status) s.StatusCode = response.StatusCode s.Status = true } s.LastChecked = time.Now() sc <- s }
The checkServerStatus()
method is the meat and potatoes of our application here. We pass all of our servers through this method in the main()
function to our cycleServers()
loop, after which it becomes self-fulfilling.
If our Status
is set to true
, we send the state to the console as OK
(otherwise down
) and set our Server
status code with s.StatusCode
as either the HTTP code or 0
if there was a network or other error.
Finally, set the last-checked time of Server
to Now()
and pass Server
through the serverChan
channel. In the following code, we'll demonstrate how we'll rotate through our available servers:
func cycleServers(sc chan *Server) { for i := 0; i < len(Servers); i++ { Servers[i].Channel = make(chan bool) go Servers[i].updateDelay(sc) go Servers[i].checkServerStatus(sc) } }
This is our initial loop, called from main. It simply loops through our available servers and initializes its listening goroutine as well as sending the first checkServerStatus
request.
It's worth noting two things here: first, the channel invoked by Server
will never actually die, but instead the application will stop checking the server. That's fine for all practical purposes here, but if we have thousands and thousands of servers to check, we're wasting resources on what essentially amounts to an unclosed channel and a map element that has not been removed. Later, we'll broach the concept of manually killing goroutines, something we've only been able to do through abstraction by stopping the communication channel. Let's now take a look at the following code that controls a server's status and its next steps:
func (s *Server) updateDelay(sc chan *Server) { for { select { case msg := <- s.Channel: if msg == false { s.Delay = s.Delay + DELAY_INCREMENT s.Retries++ if s.Delay > MAX_DELAY { s.Delay = MAX_DELAY } }else { s.Delay = INIT_DELAY } newDuration := time.Duration(s.Delay) if s.Retries <= MAX_RETRIES { fmt.Println(" Will check server again") time.Sleep(newDuration * time.Millisecond) s.checkServerStatus(sc) }else { fmt.Println(" Server not reachable after",MAX_RETRIES,"retries") } default: } } }
This is where each Server
will listen for changes in its status, as reported by checkServerStatus()
. When any given Server
struct receives a message that a change in status has been reported via our initial loop, it will evaluate that message and act accordingly.
If the Status
is set to false
, we know that the server was inaccessible for some reason. The Server
reference itself will then add a delay to the next time it's checked. If it's set to true
, the server was accessible and the delay will either be set or reset to the default retry value of INIT_DELAY
.
It finally sets a sleep mode on that goroutine before reinitializing the checkServerStatus()
method on itself, passing the serverChan
reference along in the initial goroutine loop in the main()
function:
func main() { endChan := make(chan bool) serverChan := make(chan *Server) Servers = []Server{ {Name: "Google", URI: "http://www.google.com", Status: true, Delay: INIT_DELAY}, {Name: "Yahoo", URI: "http://www.yahoo.com", Status: true, Delay: INIT_DELAY}, {Name: "Bad Amazon", URI: "http://amazon.zom", Status: true, Delay: INIT_DELAY} }
One quick note here—in our slice of Servers
, we intentionally introduced a typo in the last element. You'll notice amazon.zom
, which will provoke an HTTP error in the checkServerStatus()
method. The following is the function to cycle through servers to find an appropriate match:
go cycleServers(serverChan) for { select { case currentServer := <- serverChan: currentServer.Channel <- false default: } } <- endChan }
The following is an example of the output with the typo included:
Checking Server Google Server was OK on last check at 0001-01-01 00:00:00 +0000 UTC 200 OK Will check server again Checking Server Yahoo Server was OK on last check at 0001-01-01 00:00:00 +0000 UTC 200 OK Will check server again Checking Server Amazon Server was OK on last check at 0001-01-01 00:00:00 +0000 UTC Error: Get http://amazon.zom: dial tcp: GetAddrInfoW: No such host is known. Will check server again Checking Server Google Server was OK on last check at 2014-04-23 12:49:45.6575639 -0400 EDT
We'll be taking the preceding code for one last spin through some concurrency patterns later in this chapter, turning it into something a bit more practical.