When we use the STM, we are trying to coordinate and maintain consistency between several values, all of which keep changing. However, we'll sometimes want to maintain consistency with those references that won't change and therefore won't be included in the transaction. We can signal that the STM should include these other references in the transaction by using the ensure
function.
This helps simplify the data processing system by ensuring that the data structures stay synchronized and consistent. The ensure
function allows us to have more control over what gets managed by the STM.
For this recipe, we'll use a slightly contrived example. We'll process a set of text files and compute the frequency of a term as well as the total number of words. We'll do this concurrently, and we'll be able to watch the results get updated as we progress.
For the set of text files, we'll use the Brown corpus. Constructed in the 1960s, this was one of the first digital collections of texts (or corpora) assembled for linguists to use to study language. At that time, its size (one million words) was huge. Today, similar corpora contain 100 million words or more.
We'll need to include the clojure.string
library and have easy access to the File
class:
(require '[clojure.string :as string]) (import '[java.io File])
We'll also need to download the Brown corpus. We can download it at http://www.nltk.org/nltk_data/. Actually, you can use any large collection of texts, but the Brown corpus has each word's part of speech listed in the file, so we'll need to parse it specially. If you use a different corpus, you can just change the tokenize-brown
function, as explained in the next section, to work with your texts.
For this recipe, we'll go from preprocessing the data to performing the counts in parallel and looking at the results.
(def input-files (filter #(.isFile %) (file-seq (File. "./data/brown"))))
finished
will indicate whether processing is done or not, total-docs
and total-words
will keep running totals, freqs
will map the tokens to their frequencies as a whole, and running-report
is an agent that contains the current state of the report for the term we're interested in:(def finished (ref false)) (def total-docs (ref 0)) (def total-words (ref 0)) (def freqs (ref {})) (def running-report (agent {:term nil, :frequency 0, :ratio 0.0}))
The/at Fulton/np-tl County/nn-tl Grand/jj-tl Jury/nn-tl said/vbd Friday/nr an/at investigation/nn of/in Atlanta's/np$ recent/jj primary/nn election/nn produced/vbd ``/`` no/at evidence/nn ''/'' that/cs any/dti irregularities/nns took/vbd place/nn ./.
We're not interested in the parts of speech, so our tokenizer will remove them and covert each token to a lowercase keyword:
(defn tokenize-brown [input-str] (->> (string/split input-str #"s+") (map #(first (string/split % #"/" 2))) (filter #(> (count %) 0)) (map string/lower-case) (map keyword)))
(defn accum-freq [m token] (assoc m token (inc (m token 0))))
compute-file
, which does the primary processing for each file. It also uses send-off
to safely queue the next task for this agent:(defn compute-file [fs] (dosync (if-let [[s & ss] (seq fs)] (let [tokens (tokenize-brown (slurp s)) tc (count tokens) fq (reduce accum-freq {} tokens)] (commute total-docs inc) (commute total-words #(+ tc %)) (commute freqs #(merge-with + % fq)) (send-off *agent* compute-file) ss) (do (alter finished (constantly true)) '()))))
(defn compute-report [{term :term, :as report}] (dosync (when-not @finished (send *agent* compute-report)) (let [term-freq (term (ensure freqs) 0) tc (ensure total-words) r (if (zero? tc) nil (float (/ term-freq tc)))] (assoc report :frequency term-freq :ratio r))))
compute-frequencies
gets the entire thing started:(defn compute-frequencies [inputs term] (let [a (agent inputs)] (send running-report #(assoc % :term term)) (send running-report compute-report) (send-off a compute-file)))
compute-frequencies
with the inputs and a term, and then we poll finished
and running-report
to see how processing is going:user=> (compute-frequencies input-files :committee) #<Agent@1830f455: (…)> user=> [@finished @running-report] [false {:frequency 79, :ratio 6.933839E-4, :term :committee}] user=> [@finished @running-report] [false {:frequency 105, :ratio 2.5916903E-4, :term :committee}] user=> [@finished @running-report] [false {:frequency 164, :ratio 1.845714E-4, :term :committee}] user=> [@finished @running-report] [true {:frequency 168, :ratio 1.4468178E-4, :term :committee}]
We can see from the ratio of the committee
frequency to the total frequency that initially the word committee
occurred relatively often (0.07 percent, which is approximately the frequency of other common words in the overall corpus). However, by the end of processing, its frequency had settled down to about 0.014 percent of the total number of words, which is closer to what we would expect.
In this recipe, compute-frequencies
triggers everything. It creates a new agent that processes the input files one-by-one and updates most of the references in the compute-file
function.
The compute-report
function handles the updating of the running report. It bases that report on the frequency map and the total words. However, it doesn't change either of the two. But to keep everything synchronized, it calls ensure
on both. Otherwise, there's a chance that the count of total words comes from one set of documents and the term frequency from another set. This isn't likely given that only one agent is updating those values, but if we decided to have more than one agent processing the files, that would be a possibility. To generate a report for a new term without reading all of the files again, we can define this function:
(defn get-report [term] (send running-report #(assoc % :term term)) (send running-report compute-report) (await running-report) @running-report)