The Concierge source code

Now that we have discussed the design of Concierge in detail, let us implement Concierge based on these design points. We will discuss the implementation of api/query.go and Dockerfile in Chapter 8, Deploying Goophr. Let's look at the project structure & source code:

$ tree 
. 
└── goophr 
    └── concierge 
        ├── api 
        │   ├── feeder.go 
        │   ├── feeder_test.go 
        │   └── query.go 
        ├── common 
        │   ├── helpers.go 
        ├── Dockerfile 
        └── main.go 
 
4 directories, 6 files 

Now let's look at the source code for each of the files:

main.go:

package main 
 
import ( 
    "net/http" 
 
    "github.com/last-ent/distributed-go/chapter6/goophr/concierge/api" 
    "github.com/last-ent/distributed-go/chapter6/goophr/concierge/common" 
) 
 
func main() { 
    common.Log("Adding API handlers...") 
    http.HandleFunc("/api/feeder", api.FeedHandler) 
 
    common.Log("Starting feeder...") 
    api.StartFeederSystem() 
 
    common.Log("Starting Goophr Concierge server on port :8080...") 
    http.ListenAndServe(":8080", nil) 
} 

common/helpers.go:

package common 
 
import ( 
    "fmt" 
    "log" 
    "regexp" 
    "strings" 
) 
 
// Log is used for simple logging to console. 
func Log(msg string) { 
    log.Println("INFO - ", msg) 
} 
 
// Warn is used to log warning messages to console. 
func Warn(msg string) { 
    log.Println("---------------------------") 
    log.Println(fmt.Sprintf("WARN: %s", msg)) 
    log.Println("---------------------------") 
} 
 
var punctuations = regexp.MustCompile('^p{P}+|p{P}+$') 
 
// List of stop words that we want to ignore in our index. 
var stopWords = []string{ 
    "a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "aren't", "as", "at", 
    "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "can't", "cannot", "could", 
    "couldn't", "did", "didn't", "do", "does", "doesn't", "doing", "don't", "down", "during", "each", "few", "for", 
    "from", "further", "had", "hadn't", "has", "hasn't", "have", "haven't", "having", "he", "he'd", "he'll", "he's", 
    "her", "here", "here's", "hers", "herself", "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm", 
    "i've", "if", "in", "into", "is", "isn't", "it", "it's", "its", "itself", "let's", "me", "more", "most", "mustn't", 
    "my", "myself", "no", "nor", "not", "of", "off", "on", "once", "only", "or", "other", "ought", "our", "ours", 
    "ourselves", "out", "over", "own", "same", "shan't", "she", "she'd", "she'll", "she's", "should", "shouldn't", 
    "so", "some", "such", "than", "that", "that's", "the", "their", "theirs", "them", "themselves", "then", "there", 
    "there's", "these", "they", "they'd", "they'll", "they're", "they've", "this", "those", "through", "to", "too", 
    "under", "until", "up", "very", "was", "wasn't", "we", "we'd", "we'll", "we're", "we've", "were", "weren't", "what", 
    "what's", "when", "when's", "where", "where's", "which", "while", "who", "who's", "whom", "why", "why's", "with", 
    "won't", "would", "wouldn't", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves"} 
 
// SimplifyToken is responsible to normalizing a string token and 
// also checks whether the token should be indexed or not. 
func SimplifyToken(token string) (string, bool) { 
    simpleToken := strings.ToLower(punctuations.ReplaceAllString(token, "")) 
 
    for _, stopWord := range stopWords { 
        if stopWord == simpleToken { 
            return "", false 
        } 
    } 
 
    return simpleToken, true 
} 

api/feeder.go:

package api 
 
import ( 
    "crypto/sha1" 
    "encoding/json" 
    "fmt" 
    "io/ioutil" 
    "net/http" 
    "strings" 
    "time" 
 
    "github.com/last-ent/distributed-go/chapter6/goophr/concierge/common" 
) 
 
type payload struct { 
    URL   string 'json:"url"' 
    Title string 'json:"title"' 
} 
 
type document struct { 
    Doc   string 'json:"-"' 
    Title string 'json:"title"' 
    DocID string 'json:"DocID"'

} type token struct { Line string 'json:"-"' Token string 'json:"token"' Title string 'json:"title"' DocID string 'json:"doc_id"' LIndex int 'json:"line_index"' Index int 'json:"token_index"' } type dMsg struct { DocID string Ch chan document } type lMsg struct { LIndex int DocID string Ch chan string } type lMeta struct { LIndex int DocID string Line string } type dAllMsg struct { Ch chan []document } // done signals all listening goroutines to stop. var done chan bool // dGetCh is used to retrieve a single document from store. var dGetCh chan dMsg // lGetCh is used to retrieve a single line from store. var lGetCh chan lMsg // lStoreCh is used to put a line into store. var lStoreCh chan lMeta // iAddCh is used to add token to index (Librarian). var iAddCh chan token // dStoreCh is used to put a document into store. var dStoreCh chan document // dProcessCh is used to process a document and convert it to tokens. var dProcessCh chan document // dGetAllCh is used to retrieve all documents in store. var dGetAllCh chan dAllMsg // pProcessCh is used to process the /feeder's payload and start the indexing process. var pProcessCh chan payload // StartFeederSystem initializes all channels and starts all goroutines. // We are using a standard function instead of 'init()' // because we don't want the channels & goroutines to be initialized during testing. // Unless explicitly required by a particular test. func StartFeederSystem() { done = make(chan bool) dGetCh = make(chan dMsg, 8) dGetAllCh = make(chan dAllMsg) iAddCh = make(chan token, 8) pProcessCh = make(chan payload, 8) dStoreCh = make(chan document, 8) dProcessCh = make(chan document, 8) lGetCh = make(chan lMsg) lStoreCh = make(chan lMeta, 8) for i := 0; i < 4; i++ { go indexAdder(iAddCh, done) go docProcessor(pProcessCh, dStoreCh, dProcessCh, done) go indexProcessor(dProcessCh, lStoreCh, iAddCh, done) } go docStore(dStoreCh, dGetCh, dGetAllCh, done) go lineStore(lStoreCh, lGetCh, done) } // indexAdder adds token to index (Librarian). func indexAdder(ch chan token, done chan bool) { for { select { case tok := <-ch: fmt.Println("adding to librarian:", tok.Token) case <-done: common.Log("Exiting indexAdder.") return } } } // lineStore maintains a catalog of all lines for all documents being indexed. func lineStore(ch chan lMeta, callback chan lMsg, done chan bool) { store := map[string]string{} for { select { case line := <-ch: id := fmt.Sprintf("%s-%d", line.DocID, line.LIndex) store[id] = line.Line case ch := <-callback: line := "" id := fmt.Sprintf("%s-%d", ch.DocID, ch.LIndex) if l, exists := store[id]; exists { line = l } ch.Ch <- line case <-done: common.Log("Exiting docStore.") return } } } // indexProcessor is responsible for converting a document into tokens for indexing. func indexProcessor(ch chan document, lStoreCh chan lMeta, iAddCh chan token, done chan bool) { for { select { case doc := <-ch: docLines := strings.Split(doc.Doc, " ") lin := 0 for _, line := range docLines { if strings.TrimSpace(line) == "" { continue } lStoreCh <- lMeta{ LIndex: lin, Line: line, DocID: doc.DocID, } index := 0 words := strings.Fields(line) for _, word := range words { if tok, valid := common.SimplifyToken(word); valid { iAddCh <- token{ Token: tok, LIndex: lin, Line: line, Index: index, DocID: doc.DocID, Title: doc.Title, } index++ } } lin++ } case <-done: common.Log("Exiting indexProcessor.") return } } } // docStore maintains a catalog of all documents being indexed. func docStore(add chan document, get chan dMsg, dGetAllCh chan dAllMsg, done chan bool) { store := map[string]document{} for { select { case doc := <-add: store[doc.DocID] = doc case m := <-get: m.Ch <- store[m.DocID] case ch := <-dGetAllCh: docs := []document{} for _, doc := range store { docs = append(docs, doc) } ch.Ch <- docs case <-done: common.Log("Exiting docStore.") return } } } // docProcessor processes new document payloads. func docProcessor(in chan payload, dStoreCh chan document, dProcessCh chan document, done chan bool) { for { select { case newDoc := <-in: var err error doc := "" if doc, err = getFile(newDoc.URL); err != nil { common.Warn(err.Error()) continue } titleID := getTitleHash(newDoc.Title) msg := document{ Doc: doc, DocID: titleID, Title: newDoc.Title, } dStoreCh <- msg dProcessCh <- msg case <-done: common.Log("Exiting docProcessor.") return } } } // getTitleHash returns a new hash ID everytime it is called. // Based on: https://gobyexample.com/sha1-hashes

func getTitleHash(title string) string {

hash := sha1.New() title = strings.ToLower(title) str := fmt.Sprintf("%s-%s", time.Now(), title) hash.Write([]byte(str)) hByte := hash.Sum(nil) return fmt.Sprintf("%x", hByte) } // getFile returns file content after retrieving it from URL. func getFile(URL string) (string, error) { var res *http.Response var err error if res, err = http.Get(URL); err != nil { errMsg := fmt.Errorf("Unable to retrieve URL: %s. Error: %s", URL, err) return "", errMsg } if res.StatusCode > 200 { errMsg := fmt.Errorf("Unable to retrieve URL: %s. Status Code: %d", URL, res.StatusCode) return "", errMsg } body, err := ioutil.ReadAll(res.Body) defer res.Body.Close() if err != nil { errMsg := fmt.Errorf("Error while reading response: URL: %s. Error: %s", URL, res.StatusCode, err.Error()) return "", errMsg } return string(body), nil } // FeedHandler start processing the payload which contains the file to index. func FeedHandler(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { ch := make(chan []document) dGetAllCh <- dAllMsg{Ch: ch} docs := <-ch close(ch) if serializedPayload, err := json.Marshal(docs); err == nil { w.Write(serializedPayload) } else { common.Warn("Unable to serialize all docs: " + err.Error()) w.WriteHeader(http.StatusInternalServerError) w.Write([]byte('{"code": 500, "msg": "Error occurred while trying to retrieve documents."}')) } return } else if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}')) return } decoder := json.NewDecoder(r.Body) defer r.Body.Close() var newDoc payload decoder.Decode(&newDoc) pProcessCh <- newDoc w.Write([]byte('{"code": 200, "msg": "Request is being processed."}')) }

api/feeder_test.go:

package api 
 
import ( 
    "fmt" 
    "net/http" 
    "net/http/httptest" 
    "testing" 
) 
 
func TestGetTitleHash(t *testing.T) { 
 
    h1 := getTitleHash("A-Title") 
    h2 := getTitleHash("Diff Title") 
    hDup := getTitleHash("A-Title") 
 
    for _, tc := range []struct { 
        name     string 
        hashes   []string 
        expected bool 
    }{ 
        {"Different Titles", []string{h1, h2}, false}, 
        {"Duplicate Titles", []string{h1, hDup}, false}, 
        {"Same hashes", []string{h2, h2}, true}, 
    } { 
        t.Run(tc.name, func(t *testing.T) { 
            actual := tc.hashes[0] == tc.hashes[1] 
            if actual != tc.expected { 
                t.Error(actual, tc.expected, tc.hashes) 
            } 
        }) 
    } 
} 
 
func TestGetFile(t *testing.T) { 
    doc := "Server returned text!" 
    testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 
        w.Write([]byte(doc)) 
    })) 
    defer testServer.Close() 
 
    rDoc, err := getFile(testServer.URL) 
    if err != nil { 
        t.Error("Error while retrieving document", err) 
    } 
    if doc != rDoc { 
        t.Error(doc, "!=", rDoc) 
    } 
} 
 
func TestIndexProcessor(t *testing.T) { 
    ch1 := make(chan document, 1) 
    ch2 := make(chan lMeta, 1) 
    ch3 := make(chan token, 3) 
    done := make(chan bool) 
 
    go indexProcessor(ch1, ch2, ch3, done) 
 
    ch1 <- document{ 
        DocID: "a-hash", 
        Title: "a-title", 
        Doc:   "Golang Programming rocks!", 
    } 
 
    for i, tc := range []string{ 
        "golang", "programming", "rocks", 
    } { 
        t.Run(fmt.Sprintf("Testing if '%s' is returned. at index: %d", tc, i), func(t *testing.T) { 
            tok := <-ch3 
            if tok.Token != tc { 
                t.Error(tok.Token, "!=", tc) 
            } 
            if tok.Index != i { 
                t.Error(tok.Index, "!=", i) 
            } 
        }) 
    } 
    close(done) 
 
} 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset