Another tool that Clojure provides for working with agents is watchers. These are just functions that get a chance to peek at the agent's data. This happens after the validators have successfully run and the new data is set as the agent's state. Because of the way it's handled, the state may have changed again since then, but watchers give you the chance to look at the data and track it separately.
This can help us keep an eye on the data as it's being processed. We can use it to log progress, sample the data for manual validation, or a number of other tasks.
We'll need these dependencies:
(require '[clojure.java.io :as io] '[clojure.data.csv :as csv] '[clojure.string :as str]) (import '[java.lang Thread])
Also, we'll use the data files from the Managing program complexity with agents recipe, along with the binding to the list of those files, data-files
.
From Managing program complexity with STM, we'll use the lazy-read-csv
and ->int
functions.
In this recipe, we'll add a watcher to keep track of the number of rows that are converted, and we'll add a flag that lets us know when processing is finished:
row-ints
, a data binding to indicate which fields need to be converted to integers. Then, we'll define try->int
, which will normalize the input and attempt to convert it to an integer. If it fails, it will return the original value. Finally, coerce-row-ints
will take a row, attempt to convert the rows with integers, and send the results to the next agent:(def row-ints [0 4]) (defn try->int [v] (try (->int (str/trim (str/replace v | space))) (catch Exception ex v))) (defn coerce-row-ints [_ row indices sink] (let [cast-row (->> indices (mapcat #(vector % (try->int (nth row %)))) (apply assoc row))] (send sink conj cast-row) cast-row))
read-row
. This is mostly similar to the read-row
that we saw in the Maintaining data consistency with validators recipe. The differences are highlighted here:(defn read-row [rows caster sink done] (if-let [[item & items] (seq rows)] (do (send caster coerce-row-ints item row-ints sink) (send *agent* read-row caster sink done) items) (do (dosync (commute done (constantly true))) '())))
(defn watch-caster [counter watch-key watch-agent old-state new-state] (when-not (nil? new-state) (dosync (commute counter inc))))
(defn wait-for-it [sleep-for ref-var] (loop [] (when-not @ref-var (Thread/sleep sleep-for) (recur))))
agent-ints
in the Maintaining Data consistency with validators recipe, which is very similar:(defn watch-processing [input-files] (let [reader (agent (seque (mapcat lazy-read-csv input-files))) caster (agent nil) sink (agent []) counter (ref 0) done (ref false)] (add-watch caster :counter (partial watch-caster counter)) (send reader read-row caster sink done) (wait-for-it 250 done) {:results @sink :count-watcher @counter}))
user=> (:count-watcher (watch-processing (take 2 data-files))) 118095
This time, instead of associating a validator with the agent that coerces the integers, we called add-watch
on it. Each time the agent is updated, the watch function is called with four parameters: a key, the agent, the old state, and the new state. Our watch function first needs a counter reference, which we will supply by partially applying its parameters when we call add-watch
.
Once everything has been created, watch-processing
just sends the input agent the first message, and then it waits for the processing to finish.