Now that we have discussed the design of Concierge in detail, let us implement Concierge based on these design points. We will discuss the implementation of api/query.go and Dockerfile in Chapter 8, Deploying Goophr. Let's look at the project structure & source code:
$ tree . └── goophr └── concierge ├── api │ ├── feeder.go │ ├── feeder_test.go │ └── query.go ├── common │ ├── helpers.go ├── Dockerfile └── main.go 4 directories, 6 files
Now let's look at the source code for each of the files:
main.go:
package main import ( "net/http" "github.com/last-ent/distributed-go/chapter6/goophr/concierge/api" "github.com/last-ent/distributed-go/chapter6/goophr/concierge/common" ) func main() { common.Log("Adding API handlers...") http.HandleFunc("/api/feeder", api.FeedHandler) common.Log("Starting feeder...") api.StartFeederSystem() common.Log("Starting Goophr Concierge server on port :8080...") http.ListenAndServe(":8080", nil) }
common/helpers.go:
package common import ( "fmt" "log" "regexp" "strings" ) // Log is used for simple logging to console. func Log(msg string) { log.Println("INFO - ", msg) } // Warn is used to log warning messages to console. func Warn(msg string) { log.Println("---------------------------") log.Println(fmt.Sprintf("WARN: %s", msg)) log.Println("---------------------------") } var punctuations = regexp.MustCompile('^p{P}+|p{P}+$') // List of stop words that we want to ignore in our index. var stopWords = []string{ "a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "aren't", "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "can't", "cannot", "could", "couldn't", "did", "didn't", "do", "does", "doesn't", "doing", "don't", "down", "during", "each", "few", "for", "from", "further", "had", "hadn't", "has", "hasn't", "have", "haven't", "having", "he", "he'd", "he'll", "he's", "her", "here", "here's", "hers", "herself", "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm", "i've", "if", "in", "into", "is", "isn't", "it", "it's", "its", "itself", "let's", "me", "more", "most", "mustn't", "my", "myself", "no", "nor", "not", "of", "off", "on", "once", "only", "or", "other", "ought", "our", "ours", "ourselves", "out", "over", "own", "same", "shan't", "she", "she'd", "she'll", "she's", "should", "shouldn't", "so", "some", "such", "than", "that", "that's", "the", "their", "theirs", "them", "themselves", "then", "there", "there's", "these", "they", "they'd", "they'll", "they're", "they've", "this", "those", "through", "to", "too", "under", "until", "up", "very", "was", "wasn't", "we", "we'd", "we'll", "we're", "we've", "were", "weren't", "what", "what's", "when", "when's", "where", "where's", "which", "while", "who", "who's", "whom", "why", "why's", "with", "won't", "would", "wouldn't", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves"} // SimplifyToken is responsible to normalizing a string token and // also checks whether the token should be indexed or not. func SimplifyToken(token string) (string, bool) { simpleToken := strings.ToLower(punctuations.ReplaceAllString(token, "")) for _, stopWord := range stopWords { if stopWord == simpleToken { return "", false } } return simpleToken, true }
api/feeder.go:
package api import ( "crypto/sha1" "encoding/json" "fmt" "io/ioutil" "net/http" "strings" "time" "github.com/last-ent/distributed-go/chapter6/goophr/concierge/common" ) type payload struct { URL string 'json:"url"' Title string 'json:"title"' } type document struct { Doc string 'json:"-"' Title string 'json:"title"' DocID string 'json:"DocID"'
} type token struct { Line string 'json:"-"' Token string 'json:"token"' Title string 'json:"title"' DocID string 'json:"doc_id"' LIndex int 'json:"line_index"' Index int 'json:"token_index"' } type dMsg struct { DocID string Ch chan document } type lMsg struct { LIndex int DocID string Ch chan string } type lMeta struct { LIndex int DocID string Line string } type dAllMsg struct { Ch chan []document } // done signals all listening goroutines to stop. var done chan bool // dGetCh is used to retrieve a single document from store. var dGetCh chan dMsg // lGetCh is used to retrieve a single line from store. var lGetCh chan lMsg // lStoreCh is used to put a line into store. var lStoreCh chan lMeta // iAddCh is used to add token to index (Librarian). var iAddCh chan token // dStoreCh is used to put a document into store. var dStoreCh chan document // dProcessCh is used to process a document and convert it to tokens. var dProcessCh chan document // dGetAllCh is used to retrieve all documents in store. var dGetAllCh chan dAllMsg // pProcessCh is used to process the /feeder's payload and start the indexing process. var pProcessCh chan payload // StartFeederSystem initializes all channels and starts all goroutines. // We are using a standard function instead of 'init()' // because we don't want the channels & goroutines to be initialized during testing. // Unless explicitly required by a particular test. func StartFeederSystem() { done = make(chan bool) dGetCh = make(chan dMsg, 8) dGetAllCh = make(chan dAllMsg) iAddCh = make(chan token, 8) pProcessCh = make(chan payload, 8) dStoreCh = make(chan document, 8) dProcessCh = make(chan document, 8) lGetCh = make(chan lMsg) lStoreCh = make(chan lMeta, 8) for i := 0; i < 4; i++ { go indexAdder(iAddCh, done) go docProcessor(pProcessCh, dStoreCh, dProcessCh, done) go indexProcessor(dProcessCh, lStoreCh, iAddCh, done) } go docStore(dStoreCh, dGetCh, dGetAllCh, done) go lineStore(lStoreCh, lGetCh, done) } // indexAdder adds token to index (Librarian). func indexAdder(ch chan token, done chan bool) { for { select { case tok := <-ch: fmt.Println("adding to librarian:", tok.Token) case <-done: common.Log("Exiting indexAdder.") return } } } // lineStore maintains a catalog of all lines for all documents being indexed. func lineStore(ch chan lMeta, callback chan lMsg, done chan bool) { store := map[string]string{} for { select { case line := <-ch: id := fmt.Sprintf("%s-%d", line.DocID, line.LIndex) store[id] = line.Line case ch := <-callback: line := "" id := fmt.Sprintf("%s-%d", ch.DocID, ch.LIndex) if l, exists := store[id]; exists { line = l } ch.Ch <- line case <-done: common.Log("Exiting docStore.") return } } } // indexProcessor is responsible for converting a document into tokens for indexing. func indexProcessor(ch chan document, lStoreCh chan lMeta, iAddCh chan token, done chan bool) { for { select { case doc := <-ch: docLines := strings.Split(doc.Doc, " ") lin := 0 for _, line := range docLines { if strings.TrimSpace(line) == "" { continue } lStoreCh <- lMeta{ LIndex: lin, Line: line, DocID: doc.DocID, } index := 0 words := strings.Fields(line) for _, word := range words { if tok, valid := common.SimplifyToken(word); valid { iAddCh <- token{ Token: tok, LIndex: lin, Line: line, Index: index, DocID: doc.DocID, Title: doc.Title, } index++ } } lin++ } case <-done: common.Log("Exiting indexProcessor.") return } } } // docStore maintains a catalog of all documents being indexed. func docStore(add chan document, get chan dMsg, dGetAllCh chan dAllMsg, done chan bool) { store := map[string]document{} for { select { case doc := <-add: store[doc.DocID] = doc case m := <-get: m.Ch <- store[m.DocID] case ch := <-dGetAllCh: docs := []document{} for _, doc := range store { docs = append(docs, doc) } ch.Ch <- docs case <-done: common.Log("Exiting docStore.") return } } } // docProcessor processes new document payloads. func docProcessor(in chan payload, dStoreCh chan document, dProcessCh chan document, done chan bool) { for { select { case newDoc := <-in: var err error doc := "" if doc, err = getFile(newDoc.URL); err != nil { common.Warn(err.Error()) continue } titleID := getTitleHash(newDoc.Title) msg := document{ Doc: doc, DocID: titleID, Title: newDoc.Title, } dStoreCh <- msg dProcessCh <- msg case <-done: common.Log("Exiting docProcessor.") return } } } // getTitleHash returns a new hash ID everytime it is called. // Based on: https://gobyexample.com/sha1-hashes
func getTitleHash(title string) string {
hash := sha1.New() title = strings.ToLower(title) str := fmt.Sprintf("%s-%s", time.Now(), title) hash.Write([]byte(str)) hByte := hash.Sum(nil) return fmt.Sprintf("%x", hByte) } // getFile returns file content after retrieving it from URL. func getFile(URL string) (string, error) { var res *http.Response var err error if res, err = http.Get(URL); err != nil { errMsg := fmt.Errorf("Unable to retrieve URL: %s. Error: %s", URL, err) return "", errMsg } if res.StatusCode > 200 { errMsg := fmt.Errorf("Unable to retrieve URL: %s. Status Code: %d", URL, res.StatusCode) return "", errMsg } body, err := ioutil.ReadAll(res.Body) defer res.Body.Close() if err != nil { errMsg := fmt.Errorf("Error while reading response: URL: %s. Error: %s", URL, res.StatusCode, err.Error()) return "", errMsg } return string(body), nil } // FeedHandler start processing the payload which contains the file to index. func FeedHandler(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { ch := make(chan []document) dGetAllCh <- dAllMsg{Ch: ch} docs := <-ch close(ch) if serializedPayload, err := json.Marshal(docs); err == nil { w.Write(serializedPayload) } else { common.Warn("Unable to serialize all docs: " + err.Error()) w.WriteHeader(http.StatusInternalServerError) w.Write([]byte('{"code": 500, "msg": "Error occurred while trying to retrieve documents."}')) } return } else if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}')) return } decoder := json.NewDecoder(r.Body) defer r.Body.Close() var newDoc payload decoder.Decode(&newDoc) pProcessCh <- newDoc w.Write([]byte('{"code": 200, "msg": "Request is being processed."}')) }
api/feeder_test.go:
package api import ( "fmt" "net/http" "net/http/httptest" "testing" ) func TestGetTitleHash(t *testing.T) { h1 := getTitleHash("A-Title") h2 := getTitleHash("Diff Title") hDup := getTitleHash("A-Title") for _, tc := range []struct { name string hashes []string expected bool }{ {"Different Titles", []string{h1, h2}, false}, {"Duplicate Titles", []string{h1, hDup}, false}, {"Same hashes", []string{h2, h2}, true}, } { t.Run(tc.name, func(t *testing.T) { actual := tc.hashes[0] == tc.hashes[1] if actual != tc.expected { t.Error(actual, tc.expected, tc.hashes) } }) } } func TestGetFile(t *testing.T) { doc := "Server returned text!" testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(doc)) })) defer testServer.Close() rDoc, err := getFile(testServer.URL) if err != nil { t.Error("Error while retrieving document", err) } if doc != rDoc { t.Error(doc, "!=", rDoc) } } func TestIndexProcessor(t *testing.T) { ch1 := make(chan document, 1) ch2 := make(chan lMeta, 1) ch3 := make(chan token, 3) done := make(chan bool) go indexProcessor(ch1, ch2, ch3, done) ch1 <- document{ DocID: "a-hash", Title: "a-title", Doc: "Golang Programming rocks!", } for i, tc := range []string{ "golang", "programming", "rocks", } { t.Run(fmt.Sprintf("Testing if '%s' is returned. at index: %d", tc, i), func(t *testing.T) { tok := <-ch3 if tok.Token != tc { t.Error(tok.Token, "!=", tc) } if tok.Index != i { t.Error(tok.Index, "!=", i) } }) } close(done) }