Agents build on the STM, and each agent acts a lot like a reference. References allow you to coordinate multiple pieces of the state, but if you only have one piece of the state that you're updating, then that's a good use for agents. You use agents by sending them messages (functions that manipulate the agent's state) and these are run in the thread pool, although each agent only processes one task at a time.
We create agents with the agent
function, and we send messages to them with send
and send-off
. Whatever the function returns is the agent's new state value. This figure illustrates this process:
For this recipe, we'll again solve the same problem we did in the last recipe, Managing program complexity with STM.
We will include the same references in the project.clj
file and the same requirements in the REPL as we did in the Managing program complexity with STM recipe.
For this recipe, I'm going to use the U.S. political campaign finance data from Open Secrets (http://www.opensecrets.org/). You have to register with the site, but once you do that, the data is free to download. Once you've logged in, look for the Bulk Data link. For this, I downloaded the cycles tables for the Campaign Finance Data. I unzipped them to the data/campaign-fin
directory. For this recipe, we'll focus on the Political Action Committee (PAC) data. In this case, we'll just find the total amount of campaign contributions per candidate.
We'll use several utility functions from the last recipe: lazy-read-csv
and ->int
.
To use agents, we just need to add a few functions to the ones from the last recipe:
get-cid
and get-amount
. These take a row from the data file and return the data fields that we're interested in:(defn get-cid [row] (nth row 3)) (defn get-amount [row] (->int (nth row 4)))
(defn get-cid-amount [row] [(get-cid row) (get-amount row)])
(defn add-amount-by [m cid amount] (assoc m cid (+ amount (get m cid 0))))
(defn read-file-amounts [m filename] (reduce #(add-amount-by %1 (first %2) (second %2)) m (map get-cid-amount (lazy-read-csv filename))))
force-val
. It takes an agent and uses await
to block all of the messages currently in its queue to be processed. Then it dereferences the agent. This function will allow us to thread a series of operations on the agents:(defn force-val [a] (await a) @a)
(defn main [data-files agent-count] (let [agents (map agent (repeat agent-count {}))] (dorun (map #(send %1 read-file-amounts %2) (cycle agents) data-files)) (apply merge-with + (map force-val agents))))
And we can see this in action:
User=> (def data-files ["data/campaign-fin/pacs90.txt" "data/campaign-fin/pacs92.txt" "data/campaign-fin/pacs94.txt" "data/campaign-fin/pacs96.txt" "data/campaign-fin/pacs98.txt" "data/campaign-fin/pacs00.txt" "data/campaign-fin/pacs02.txt" "data/campaign-fin/pacs04.txt" "data/campaign-fin/pacs06.txt" "data/campaign-fin/pacs08.txt" "data/campaign-fin/pacs10.txt"]) user=> (def contribs (main data-files 5)) user=> (contribs "|N00026349|") 280 user=> (contribs "|N00001845|") 134121
Except for force-val
, all of the agent-related code is in main
. Let's walk through the lines that are of interest:
(let [agents (map agent (repeat agent-count {}))]
read-file-amounts
function. We cycle through the agents until all of the files are assigned to an agent:(dorun (map #(send %1 read-file-amounts %2) (cycle agents) data-files))
await
, and we dereference each to get its value (both of these take place inside force-val
). Once we have the data from each agent, we merge them all together into one hashmap:(apply merge-with + (map force-val agents))))