Agents by themselves are pretty useful. However, if we want to still use the agent task queuing and concurrency framework, even though an agent function needs to coordinate the state beyond the agent's own data, we'll need to use both agents and the STM: send
or send-off
to coordinate the agent's state. This will need to be combined with dosync
, ref-set
, alter
, or commute
inside the agent function to coordinate with the other state.
This combination provides simplicity over complex state and data coordination problems. This is a huge help in managing the complexity of a data processing and analysis system.
For this recipe, we'll look at the same problem we did in the Managing program complexity with agents recipe. However, this time we'll structure it a little differently. The final result will be stored in a shared reference, and the agents will update it as they go.
We'll need to use the same dependencies as we did for Managing program complexity with agents, and we'll use two values and functions from that recipe: data-files
and read-file-amounts
.
For this recipe, we need to define a few functions to work through a queue of input chunks and then block until all of the processing is complete:
read-file-amounts
. However, we'll wrap it in another function that takes its output and uses commute
to update the shared-count hashmap with the counts it has just read:(defn read-update-amounts [m filename count-ref] (dosync (let [file-amounts (read-file-amounts m filename)] (commute count-ref #(merge-with + % file-amounts)))))
main
function. This creates the agents and the shared reference, sends tasks to the agents, and waits for the results before returning them:(defn main [data-files agent-count] (let [counts (ref {}) agents (map agent (repeat agent-count {}))] (dorun (map #(send %1 read-update-amounts %2 counts) (cycle agents) data-files)) (doseq [a agents] (await a)) @counts))
Using this code looks exactly like how we'd expect:
user=> (def amounts (main data-files 8)) user=> (take 5 amounts) (["" 106549] ["|N00032245|" 22750] ["|N00027812|" 150] ["|N00030973|" 9900] ["|N00005656|" 11598514])
This solution uses agents to handle the work, and it uses the STM to manage shared data structures. The main
function first assigns each input file to an agent. Each agent then reads the input file and totals the amount of contributions for each candidate. It takes those totals and uses the STM to update the shared counts.