Now we need to focus on our file listener. You may recall this is the part of the application that will accept client connections from our web server and our backup application and announce any changes to files.
The basic flow of this part is as follows:
All three happen concurrently, and the first and the third can happen without any connections in the pool, although we assume there will be a connection that is always on with both our web server and our backup application.
Another critical role the file listener will fulfill is analyzing the directory on first load and reconciling it with our data store in Couchbase. Since the Go Couchbase library handles the get, update, and add operations, we won't need any custom views. In the following code, we'll examine the file listener process and show how we listen on a folder for changes:
package main import ( "fmt" "github.com/howeyc/fsnotify" "net" "time" "io" "io/ioutil" "github.com/couchbaselabs/go-couchbase" "crypto/md5" "encoding/hex" "encoding/json" "strings" ) var listenFolder = "mnt/sharedir" type Client struct { ID int Connection *net.Conn }
Here, we've declared our shared folder as well as a connecting Client
struct. In this application, Client
is either a web listener or a backup listener, and we'll pass messages in one direction using the following JSON-encoded structure:
type File struct { Hash string "json:hash" Name string "json:file_name" Created int64 "json:created" CreatedUser int "json:created_user" LastModified int64 "json:last_modified" LastModifiedUser int "json:last_modified_user" Revisions int "json:revisions" Version int "json:version" }
If this looks familiar, it could be because it's also the example document format we set up initially.
If you're not familiar with the syntactical sugar expressed earlier, these are known as struct tags. A tag is just a piece of additional metadata that can be applied to a struct field for key/value lookups via the reflect
package. In this case, they're used to map our struct fields to JSON fields.
Let's first look at our overall Message struct
:
type Message struct { Hash string "json:hash" Action string "json:action" Location string "json:location" Name string "json:name" Version int "json:version" }
We compartmentalize our file into a message, which alerts our other two processes of changes:
func generateHash(name string) string { hash := md5.New() io.WriteString(hash,name) hashString := hex.EncodeToString(hash.Sum(nil)) return hashString }
This is a somewhat unreliable method to generate a hash reference to a file and will fail if a filename changes. However, it allows us to keep track of files that are created, deleted, or modified.
Here is the broadcast message that goes to all existing connections. We pass along our JSON-encoded Message
struct with the current version, the current location, and the hash for reference. Our other servers will then react accordingly:
func alertServers(hash string, name string, action string, location string, version int) { msg := Message{Hash:hash,Action:action,Location:location,Name:name,Version:version} msgJSON,_ := json.Marshal(msg) fmt.Println(string(msgJSON)) for i := range Clients { fmt.Println("Sending to clients") fmt.Fprintln(*Clients[i].Connection,string(msgJSON)) } }
Our backup server will create a copy of that file with the .[VERSION]
extension in the backup folder.
Our web server will simply alert the user via our web interface that the file has changed:
func startServer(listener net.Listener) { for { conn,err := listener.Accept() if err != nil { } currentClient := Client{ ID: 1, Connection: &conn} Clients = append(Clients,currentClient) for i:= range Clients { fmt.Println("Client",Clients[i].ID) } } }
Does this code look familiar? We've taken almost our exact chat server Client
handler and brought it over here nearly intact:
func removeFile(name string, bucket *couchbase.Bucket) { bucket.Delete(generateHash(name)) }
The removeFile
function does one thing only and that's removing the file from our Couchbase data store. As it's reactive, we don't need to do anything on the file-server side because the file is already deleted. Also, there's no need to delete any backups, as this allows us to recover. Next, let's look at our function that updates an existing file:
func updateExistingFile(name string, bucket *couchbase.Bucket) int { fmt.Println(name,"updated") hashString := generateHash(name) thisFile := Files[hashString] thisFile.Hash = hashString thisFile.Name = name thisFile.Version = thisFile.Version + 1 thisFile.LastModified = time.Now().Unix() Files[hashString] = thisFile bucket.Set(hashString,0,Files[hashString]) return thisFile.Version }
This function essentially overwrites any values in Couchbase with new ones, copying an existing File
struct and changing the LastModified
date:
func evalFile(event *fsnotify.FileEvent, bucket *couchbase.Bucket) { fmt.Println(event.Name,"changed") create := event.IsCreate() fileComponents := strings.Split(event.Name,"\") fileComponentSize := len(fileComponents) trueFileName := fileComponents[fileComponentSize-1] hashString := generateHash(trueFileName) if create == true { updateFile(trueFileName,bucket) alertServers(hashString,event.Name,"CREATE",event.Name,0) } delete := event.IsDelete() if delete == true { removeFile(trueFileName,bucket) alertServers(hashString,event.Name,"DELETE",event.Name,0) } modify := event.IsModify() if modify == true { newVersion := updateExistingFile(trueFileName,bucket) fmt.Println(newVersion) alertServers(hashString,trueFileName,"MODIFY",event.Name,newVersion) } rename := event.IsRename() if rename == true { } }
Here, we react to any changes to the filesystem in our watched directory. We aren't reacting to renames, but you can handle those as well. Here's how we'd approach the general
updateFile
function:
func updateFile(name string, bucket *couchbase.Bucket) { thisFile := File{} hashString := generateHash(name) thisFile.Hash = hashString thisFile.Name = name thisFile.Created = time.Now().Unix() thisFile.CreatedUser = 0 thisFile.LastModified = time.Now().Unix() thisFile.LastModifiedUser = 0 thisFile.Revisions = 0 thisFile.Version = 1 Files[hashString] = thisFile checkFile := File{} err := bucket.Get(hashString,&checkFile) if err != nil { fmt.Println("New File Added",name) bucket.Set(hashString,0,thisFile) } }
When it comes to checking for existing records against Couchbase, we check whether a hash exists in our Couchbase bucket. If it doesn't, we create it. If it does, we do nothing. To handle shutdowns more robustly, we should also ingest existing records into our application. The code for doing this is as follows:
var Clients []Client var Files map[string] File func main() { Files = make(map[string]File) endScript := make(chan bool) couchbaseClient, err := couchbase.Connect("http://localhost:8091/") if err != nil { fmt.Println("Error connecting to Couchbase", err) } pool, err := couchbaseClient.GetPool("default") if err != nil { fmt.Println("Error getting pool",err) } bucket, err := pool.GetBucket("file_manager") if err != nil { fmt.Println("Error getting bucket",err) } files, _ := ioutil.ReadDir(listenFolder) for _, file := range files { updateFile(file.Name(),bucket) } dirSpy, err := fsnotify.NewWatcher() defer dirSpy.Close() listener, err := net.Listen("tcp", ":9000") if err != nil { fmt.Println ("Could not start server!",err) } go func() { for { select { case ev := <-dirSpy.Event: evalFile(ev,bucket) case err := <-dirSpy.Error: fmt.Println("error:", err) } } }() err = dirSpy.Watch(listenFolder) startServer(listener) <-endScript }
Finally, main()
handles setting up our connections and goroutines, including a file watcher, our TCP server, and connecting to Couchbase.
Now, let's look at another step in the whole process where we will automatically create backups of our modified files.