September 10, 2016

Core Async and Transducers

Callbacks got me into a mess in the last post. Here's how I used core async and transducers to get out of it.

Here's a version of getting directory listings using a core async channel:

(defn dir-async
  "Place contents of directory on channel and return channel"
  [fpath]
  (let [out (chan)]
    (fs.readdir
     fpath
     (fn [err files] 
       (go-loop [files files]
         (if-let [f (first files)]
           (let [fpath (str (str/replace fpath #"/$" "") "/" f)]
             (>! out fpath)
             (recur (rest files)))
           (close! out)))))
    out))

That function will create a channel, and starting putting contents of the directory onto the channel as fast as they come from fs.readdir.

Here's a similar core async channel wrapper around fs.stat:

(defn stat-async
  "Place stat object on channel and return channel"
  [fpath]
  (let [out (chan)]
    (fs.stat 
     fpath
     (fn [err, stat]
       (go
         (put! out stat)
         (close! out))))
    out))

Remember in the last post what a pain it was to combine the two methods with all the crazy callbacks? Well, check this out. We can now simply chain channels together in order to compose these functions. In other words, all we have to do, is to plug the channel returned from dir-async and pass it into stat-async. stat-async will then happily wait for filepaths to arrive on the in channel, run the fs.stat and put the results on the out channel.

This next function accepts any channel with a list of file paths. In this case I intend to pass in the channel returned from dir-async, but it could be any channel containing filepaths:

(defn stat-filterer 
  "Place [fpath stat] on channel and return channel"
  [in]
  (let [out (chan)]
    (go-loop []
      (if-let [fpath (<! in)]
        (let [stat (<! (stat-async fpath))]
          (>! out [fpath stat])
          (recur))
        (close! out)))
    out))

Beautiful. Now the out channel from that snippet will contain pairs of [<filepath> <stat object>] for each thing found in the directory.

And it gets even better. We can pass a transducer when creating the out channel. That way, anything entering the channel will pass thru the transducer function.

Note that when passing a transducer function to the chan function, we also need to specify a buffer.

Here's a small update to the above snippet that accepts a transducer function called xform:

(defn stat-filterer 
  "Place [fpath stat] on channel and return channel. Optional
  transducer can be supplied"
  [in & [xform]]
  (let [out (if xform (chan 10 xform) (chan))]
    (go-loop []
      (if-let [fpath (<! in)]
        (let [stat (<! (stat-async fpath))]
          (>! out [fpath stat])
          (recur))
        (close! out)))
    out))

Brilliant! Now we can filter the results to our hearts content. Here's an example that filters so that we get a list of directories only.

(let [xform (filter (fn [[_ stat]] (.isDirectory stat)))
      c1    (fs/dir-async "/")
      c2    (fs/stat-filterer c1 xform)
      c3    (ca/into [] c2)]
  (go (<! c3))

A nice complete list of directories will be available on c3.

Here's how I wrote a unit test for this:

  (async done
         (let [xform (filter (fn [[_ stat]] (.isDirectory stat)))
               c1    (fs/dir-async "/")
               c2    (fs/stat-filterer c1 xform)
               c3    (ca/into [] c2)]
           (go (let [dirs (<! c3)]
                 (testing "directories-only-with-async"
                   (is (not (empty? dirs))))
                 (done)))))

What's even better, there are several pipeline functions in core.async that can make all of this code even more succinct.

If you'd like to learn about transducers, I found Tim Baldridge's screencasts here to be helpful.

Happy asynchronous transducing.

Tags: clojure clojurescript