Parallel Word Count With Julia, an Interesting New Language for Technical Computing

Julia is an interesting new language focused on scientific computing which “launched” to the community at large a few weeks ago. I ran across the launch post via a link on Hacker News (where there is actually a very good discussion), and was immediately impressed.  In this post I’ll provide a little background about the language, and walk through a parallelized “word count” example I wrote to play with the language’s built-in parallelism.

A little background

To quote from the launch post:

We want a language that’s open source, with a liberal license. We want the speed of C with the dynamism of Ruby. We want a language that’s homoiconic, with true macros like Lisp, but with obvious, familiar mathematical notation like Matlab. We want something as usable for general programming as Python, as easy for statistics as R, as natural for string processing as Perl, as powerful for linear algebra as Matlab, as good at gluing programs together as the shell. Something that is dirt simple to learn, yet keeps the most serious hackers happy. We want it interactive and we want it compiled.

Which I find refreshingly ambitious! Because in my own programming I have a somewhat split personality. For general programming or exploratory data analysis, my tool of choice is generally Python (or to a lesser extent Ruby and R); but as an HPC guy, my prejudice is that these tools just won’t cut it for big datasets or large simulations. That’s the point at which I break usually out Fortran, cursing at myself as I port algorithms, wishing that NumPy was just a little bit faster.

(That dramatic phrasing is not quite fair to Python; I’ve recently been playing a bit with Cython, which does perform quite nicely and is getting a lot of attention in the SciPy community. But we’re talking about Julia. ;-) )

Julia uses a JIT compiler based on LLVM and some clever language design to get some very good performance on their (admittedly selective) benchmarks, and they’ve definitely achieved the goal of a scientist-friendly syntax: it looks very much like Matlab. For example, the random matrix statistics benchmark looks like this:

function randmatstat(t)
    n = 5
    v = zeros(t)
    w = zeros(t)
    for i=1:t
        a = randn(n, n)
        b = randn(n, n)
        c = randn(n, n)
        d = randn(n, n)
        P = [a b c d]
        Q = [a b; c d]
        v[i] = trace((P.'*P)^4)
        w[i] = trace((Q.'*Q)^4)
    end
    return (std(v)/mean(v), std(w)/mean(w))
end

(s1, s2) = randmatstat(1000)
@assert 0.5 < s1 < 1.0 && 0.5 < s2 < 1.0
@timeit randmatstat(1000) "rand_mat_stat"

Pretty easy to follow if you’ve ever done any Matlab (or even if you haven’t). The language also has some good parallel programming support, and support for calling libraries written in C and Fortran. If you want to play with Julia without building it, there’s a web-based REPL up at http://tryjulia.xvm.mit.edu/

Word Count example

Julia provides some very easy built-in mechanisms for parallelising your code, so to learn a bit of Julia and test out the language I decided to write a simple map-reduce “word count” program in the vein of the classic Hadoop example.  The code that follows can also be found on Github.

Map function:

# "Map" function.
# Takes a string. Returns a HashTable with the number of times each word 
# appears in that string.
function wordcount(text)
    words=split(text,(' ','\n','\t','-','.',',',':','_','"',';','!'),false)
    counts=HashTable()
    for w = words
        counts[w]=get(counts,w,0)+1
    end
    return counts
end

Reduce function:

# "Reduce" function.
# Takes a collection of HashTables in the format returned by wordcount()
# Returns a HashTable in which words that appear in multiple inputs
# have their totals added together.
function wcreduce(wcs)
    counts=HashTable()
    for c = wcs
        for (k,v)=c
            counts[k] = get(counts,k,0)+v
        end
    end
    return counts
end

Parallel word count on a string:

# Splits input string into nprocs() equal-sized chunks (last one rounds up), 
# and @spawns wordcount() for each chunk to run in parallel. Then fetch()s
# results and performs wcreduce().
# Limitations: splitting the string and reduction step are single-threaded.
function parallel_wordcount(text)
    lines=split(text,'\n',false)
    np=nprocs()
    unitsize=ceil(length(lines)/np)
    wcounts={}
    rrefs={}
    # spawn procs
    for i=1:np
        first=unitsize*(i-1)+1
        last=unitsize*i
        if last>length(lines)
            last=length(lines)
        end
        subtext=join(lines[int(first):int(last)],"\n")
        push(rrefs, @spawn wordcount( subtext ) )
    end
    # fetch results
    while length(rrefs)>0
        push(wcounts,fetch(pop(rrefs)))
    end
    # reduce
    count=wcreduce(wcounts)
    return count
end

… on a group of files:

# Takes the name of a result file, and a list of input file names.
# Combines the contents of all files, then performs a parallel_wordcount
# on the resulting string. Writes the results to result_file.
# Limitation: Performs all file IO single-threaded.
function wordcount_files(result_file,input_file_names...)
    text=""
    for f = input_file_names
        fh=open(f)
        text=join( {text,readall(fh)}, "\n" )
        close(fh)
    end
    wc=parallel_wordcount(text)
    fo=open(result_file,"w")
    for (k,v) = wc
        with_output_stream(fo,println,k,"=",v)
    end
end

To run this example, you can launch Julia with 4 workers and launch the wordcount example with:

$ julia -p 4
julia> @everywhere load("wordcount.jl")
julia> wordcount_files("output.txt","input1.txt,"input2.txt","input3.txt")

To run with multiple compute nodes (which have Julia in the same location, or a shared filesystem), you would use a machine file with one entry per core. Note that you need to have SSH access to each of these machines in the account you run Julia with.

$ julia --machinefile file
machinefile:
  node01
  node01
  node02 
  node02
...

The limitation here is that the reduce function is still single-threaded. Hadoop has a built-in “partitioner” which can efficiently sort the key-value pairs from the Mappers based on keys and partition the to Reducers properly, but I couldn’t easily come up with anything similar in Julia which would perform well using the HashTable data structure. (Granted, I did not spend a lot of time on it.)

Also, I did manual management of the parallelization using @spawn and fetch(), partly because I was using HashTables here. But Julia does provide some more succinct parallelization macros (from the manual):

# flip a coin 200000000 times
nheads = @parallel (+) for i=1:200000000
  randbit()
end

# apply a function f to each 2d-slice of a 3d distributed array
B = darray(eltype(A), size(A), 3)
for i = 1:size(A,3)
    @spawnat owner(B,i) B[:,:,i] = f(A[:,:,i])
end

Overall impressions

Generally I’m pretty impressed with Julia. The syntax is easy to understand, it has some very easy parallelization mechanisms, and is obviously very in-touch with its target market of scientists and engineers.

It does show that this is a language still in development: getting the parallel word count working actually required some mails to the dev list and a fix in the HashTable serialization code, and there are a fair number of rough edges. But it is very much in development: the dev list and activity on Github are both very active, and the language has developed a lot just in the couple weeks it took me to get around to writing this post.

From an HPC perspective, I’m very happy that it’s taking parallelization seriously and already provides a mechanism for running on multiple compute nodes. There’s a little bit of SGE integration in the multiprocessing library, as well, which will hopefully get better. However, inter-node communication uses serialized objects over SSH, which is not exactly my first choice for high-performance networking. I’d be much happier if Julia used MPI as a backend for their parallelization, even if they don’t want to expose it in the language, so they could take advantage of RDMA and all the optimization that has already been done in that layer.

Questions, comments, interesting anecdotes? Tweet to me at @ajdecon, or send me an email at ajdecon@ajdecon.org.