
Examples
********

dispy can be used to distribute standalone programs or Python program
fragments (functions, classes, modules) and files (programs ,data) to
nodes and execute jobs in parallel. It supports various options to
handle rather comprehensive cases (such as fault tolerance, sharing
nodes in multiple programs simulataneously, using nodes in multiple
networks etc.); however, in common setups, the usage is simple, as
done in the demonstrative examples below. Some of these examples are
also available in 'examples' directory where dispy is instaleld, which
can be obtained with the program:

   import os, dispy
   print(os.path.join(os.path.dirname(dispy.__file__), 'examples'))

Short summary of examples (follow the links for details):

1. Command-line uses dispy as command line tool to distribute and
   run a standalone program.

2. Python script is a simple Python program that distributes and
   runs a standalone program.

3. Canonical program distributes and executes a Python function
   with different parameters. The program also prints some of the
   attributes of each job after it finishes (such as job's result,
   start time).

4. Distributing objects shows how to send Python objects (Class
   instances) in client program to remote nodes to execute Python
   function (which takes those objects as arguments).

5. In-memory processing is one use of *setup* (and *cleanup*)
   feature to initialize nodes before executing jobs. In this case,
   *setup* is used to load data (in files transferred from client) in
   to global variables; each job then uses the data (for read-only) in
   these variables for in-memory processing (i.e., access data loaded
   with *setup* function instead of reading data from file in each
   job).

6. Updating globals shows how to use Python's *sharedcypes* to
   initialize (using *setup* feature as above) a global variable and
   then update in jobs so that all jobs on a node will see updated
   value.

7. Sending files to client illustrates how jobs can use
   *dispy_send_file* function to send files to client.

8. Callback processing uses *callback* feature to process each
   job's result as they are finished.

9. Efficient job submission uses *callback* feature to submit just
   enough jobs to keep scheduler use all CPUs in cluster, instead of
   all at once, which can cause memory problems.

10. Port forwarding with SSH example uses *ssh* to forward port to
    use nodes in remote network (in this case Amazon EC2 cloud
    instance).

11. Cluster creation lists various ways to create clusters for
    different configurations.

12. MapReduce is a simple implementation of well-known map-reduce
    problem.


Command-Line
============

dispy can be used as a command line tool (for simple cases, scheduling
cron jobs); in this case the computations should only be programs and
dependencies should only be files.:

   dispy.py -f /some/file1 -f file2 -a "arg11 arg12" -a "arg21 arg22" -a "arg3" /some/program

will distribute '/some/program' with dependencies '/some/file1' and
'file2' and then execute '/some/program' in parallel with 3 instances:
a) arg11 and arg12 (two arguments to the program), b) arg21 and arg22
(two arguments), and c) arg3 (one argument).


Python Script
=============

A simple client program that distributes a program (say,
'/path/to/program'), executes them with a sequence of numbers as
arguments is:

   import dispy
   cluster = dispy.JobCluster('/path/to/program')
   for i in range(50):
       cluster.submit(i)

The program '/path/to/program' on the client computer is transferred
to each of the nodes, so if the program is a binary program then all
the nodes should have same architecture as the client.

In the cases above we assume that the programs execute and save the
computation results in a database, file system etc. If we are
interested in exit status, output from each run etc., then we need to
collect each of the jobs submitted from which interested attributes
can be retrieved, as done in the example below.


Canonical Program
=================

A canonical cluster that distributes computation 'compute' (Python
function) to nodes (running *dispynode (Server)* on a local network),
schedules jobs with the cluster, gets jobs' results and prints them is
"sample.py":

   # function 'compute' is distributed and executed with arguments
   # supplied with 'cluster.submit' below
   def compute(n):
       import time, socket
       time.sleep(n)
       host = socket.gethostname()
       return (host, n)

   if __name__ == '__main__':
       # executed on client only; variables created below, including modules imported,
       # are not available in job computations
       import dispy, random
       # distribute 'compute' to nodes; 'compute' does not have any dependencies (needed from client)
       cluster = dispy.JobCluster(compute)
       # run 'compute' with 20 random numbers on available CPUs
       jobs = []
       for i in range(20):
           job = cluster.submit(random.randint(5,20))
           job.id = i # associate an ID to identify jobs (if needed later)
           jobs.append(job)
       # cluster.wait() # waits until all jobs finish
       for job in jobs:
           host, n = job() # waits for job to finish and returns results
           print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
           # other fields of 'job' that may be useful:
           # job.stdout, job.stderr, job.exception, job.ip_addr, job.end_time
       cluster.print_status()  # shows which nodes executed how many jobs etc.


Distributing Objects
====================

If the computation has any dependencies, such as classes, objects or
files, they can be specified with 'depends' argument and dispy will
distribute them along with the computation.

Continuing trivial but illustrative examples, the program
"obj_instances.py" below distributes computation to be executed with
instances of a class:

   class C(object):
       def __init__(self, i, n):
           self.i = i
           self.n = n

       def show(self):
           print('%s: %.2f' % (self.i, self.n))

   def compute(obj):
       # obj is an instance of C
       import time
       time.sleep(obj.n)
       obj.show()  # the output is stored in job.stdout
       return obj.n

   if __name__ == '__main__':
       import random, dispy
       # 'compute' needs definition of class C, so include in 'depends'
       cluster = dispy.JobCluster(compute, depends=[C])
       jobs = []
       for i in range(10):
           c = C(i, random.uniform(1, 3)) # create object of C
           job = cluster.submit(c) # it is sent to a node for executing 'compute'
           job.id = c # store this object for later use
           jobs.append(job)
       for job in jobs:
           job() # wait for job to finish
           print('%s: %.2f / %s' % (job.id.i, job.result, job.stdout))

Note that class C is given in 'depends' so the code for it is
transferred to the nodes automatically and the objects created in
client program work transparently in 'compute' on remote nodes. The
objects are serialized using Pickle and sent over the to the nodes, so
the objects must be serializable. If they are not serializable (e.g.,
they contain references to locks), then the class must provide
**__getstate__** and **__setstate__** methods; see Python object
serialization for details. In addition, the objects shouldn't contain
file descriptors, references to other objects not being transferred
etc., which are not valid on remote nodes.


In-memory Processing
====================

*setup* and *cleanup* parameters to cluster can be used to initialize
/de-initialize a node for running jobs of that computaiton, e.g., to
manipulate transferred files, read data into memory so jobs can
process data efficiently, set/unset global variables on that node.

In the example below, *setup* function is used to read data in a file
transferred from client in to memory on each node and jobs that are
executed later use the data in memory (for in-memory processing)
instead of reading from file in every job. This feature works with
Posix systems (Linux, OS X and other Unix variants) without
limitations - any data can be assigned to variables declared global in
*setup*, as operating system's *fork* is used to create child process,
which shares the address space of parent process (where *setup*
function is executed) with copy-on-write. This feature can be used,
for example, to read large amount of data in file(s) so computations
(jobs) can directly access the data in memory, instead of reading same
data from file each time.

Under Windows, though, *fork* is not available, so the global
variables are serialized and passed to child process (see
multiprocessing's Programming guidelines for Windows). Thus, for
example, modules can't be loaded in global scope with *setup* under
Windows. Moreover, as each job runs with a copy of all the global
variables, initializing objects that require lot of memory may not be
possible / efficient (compared to Posix systems where objects are not
copied). See asyncoro's distributed communicating processes for an
alternate approach that doesn't have these limitations.

The *setup* function in program "node_setup.py" below reads the data
in a file (transferred with *depends*) in to global variable. The jobs
compute checksum of that data in memory The *cleanup* function deletes
the global variable:

   # executed on each node before any jobs are scheduled
   def setup(data_file):
       # read data in file to global variable
       global data, algorithms, hashlib

       import hashlib
       data = open(data_file).read()  # read file in to memory; data_file can now be deleted
       if sys.version_info.major > 2:
           data = data.encode() # convert to bytes
           algorithms = list(hashlib.algorithms_guaranteed)
       else:
           algorithms = hashlib.algorithms
       # if running under Windows, modules can't be global, as they are not
       # serializable; instead, they must be loaded in 'compute' (jobs); under
       # Posix (Linux, OS X and other Unix variants), modules declared global in
       # 'setup' will be available in 'compute'

       # 'os' module is already available (loaded by dispynode)
       if os.name == 'nt':  # remove modules under Windows
           del hashlib
       return 0

   def cleanup():
       global data, algorithms, hashlib
       del data, algorithms
       if os.name != 'nt':
           del hashlib

   def compute(n):
       global hashlib
       if os.name == 'nt': # Under Windows modules must be loaded in jobs
           import hashlib
       # 'data' and 'algorithms' global variables are initialized in 'setup'
       alg = algorithms[n % len(algorithms)]
       csum = getattr(hashlib, alg)()
       csum.update(data)
       return (alg, csum.hexdigest())

   if __name__ == '__main__':
       import dispy, sys, functools
       # if no data file name is given, use this file as data file
       data_file = sys.argv[1] if len(sys.argv) > 1 else sys.argv[0]
       cluster = dispy.JobCluster(compute, depends=[data_file],
                                  setup=functools.partial(setup, data_file), cleanup=cleanup)
       jobs = []
       for n in range(10):
           job = cluster.submit(n)
           job.id = n
           jobs.append(job)

       for job in jobs:
           job()
           if job.status == dispy.DispyJob.Finished:
               print('%s: %s : %s' % (job.id, job.result[0], job.result[1]))
           else:
               print(job.exception)
       cluster.print_status()
       cluster.close()

In the above program, 'hashlib' module is not a global variable (and
loaded in 'computation') so the program works under Windows. If
running under Posix (Linux, OS X, etc.) then the 'hashlib' can be
declared global variable in 'setup' and loading in 'computation' is
then not needed.


Updating Globals
================

The example above creates global variables that can be read (but not
update) in jobs.  *setup* and *cleanup* can also be used to create
global variables that can be updated in jobs on a node (but not by
jobs in other nodes) using multiprocessing module's Sharing state
between processes. The program "node_shvars.py" below creates an
integer shared variable that is updated by jobs running on that node:

   def setup():
       import multiprocessing, multiprocessing.sharedctypes
       global shvar
       lock = multiprocessing.Lock()
       shvar = multiprocessing.sharedctypes.Value('i', 1, lock=lock)
       return 0

   def cleanup():
       global shvar
       del shvar

   def compute():
       import random
       r = random.randint(1, 10)
       global shvar
       shvar.value += r  # update 'shvar'; all jobs on this node will
                         # see updated value
       return shvar.value

   if __name__ == '__main__':
       import dispy
       cluster = dispy.JobCluster(compute, setup=setup, cleanup=cleanup)
       jobs = []
       for n in range(10):
           job = cluster.submit()
           job.id = n
           jobs.append(job)

       for job in jobs:
           job()
           if job.status != dispy.DispyJob.Finished:
               print('job %s failed: %s' % (job.id, job.exception))
           else:
               print('%s: %s' % (job.id, job.result))
       cluster.print_status()
       cluster.close()

See asyncoro's Distributed Communicating Processes for an alternate
approach, where setup can be used to initialize any data that can be
updated in computations without any limitations, even under Windows.
With asyncoro, computations have to be coroutines and client has to
implement scheduling coroutines.


Sending Files to Client
=======================

*dispy_send_file* (see Transferring Files) can be used to transfer
file(s) to the client. Assume that computaion creates files with the
parameter given (in this case *n*) so different runs create different
files (otherwise, file(s) sent by one computation will overwrite files
sent by other computations). Such files can be sent to the client
with:

   def compute(n):
       import time
       time.sleep(n)
       # assume that computation saves data in file n.dat
       dispy_send_file(str(n) + '.dat') # send file to client
       # ... continue further computations
       return n

   if __name__ == '__main__':
       import dispy, random
       cluster = dispy.JobCluster(compute)
       jobs = []
       for i in range(20):
           job = cluster.submit(random.randint(5,20))
           job.id = i
           jobs.append(job)
       for job in jobs:
           job()
           print('job %s results in file %s' % (job.id, str(job.id) + '.dat'))

If the client needs to process the files as soon as they are
transferred, Provisional/Intermediate Results feature along with
callback can be used to notify the client.


Callback Processing
===================

*submit_node* method and *cluster_status* callback can be used to
schedule jobs with full control over when / which node executes a job.
The program "job_scheduler.py" below schedules a job to compute sha1
checksum of data files whenever a processor is available:

   # job computation runs at dispynode servers
   def compute(path):
       import hashlib, time, os
       csum = hashlib.sha1()
       with open(os.path.basename(path), 'rb') as fd:
           while True:
               data = fd.read(1024000)
               if not data:
                   break
               csum.update(data)
       time.sleep(5)
       return csum.hexdigest()

   # 'cluster_status' callback function. It is called by dispy (client)
   # to indicate node / job status changes. Here node initialization and
   # job done status are used to schedule jobs, so at most one job is
   # running on a node (even if a node has more than one processor). Data
   # files are assumed to be 'data000', 'data001' etc.
   def status_cb(status, node, job):
       if status == dispy.DispyJob.Finished:
           print('sha1sum for %s: %s' % (job.id, job.result))
       elif status == dispy.DispyJob.Terminated:
           print('sha1sum for %s failed: %s' % (job.id, job.exception))
       elif status == dispy.DispyNode.Initialized:
           print('node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus))
       else:  # ignore other status messages
           return

       global submitted
       data_file = 'data%03d' % submitted
       if os.path.isfile(data_file):
           submitted += 1
           # 'node' and 'dispy_job_depends' are consumed by dispy;
           # 'compute' is called with only 'data_file' as argument(s)
           job = cluster.submit_node(node, data_file, dispy_job_depends=[data_file])
           job.id = data_file

   if __name__ == '__main__':
       import dispy, sys, os
       cluster = dispy.JobCluster(compute, cluster_status=status_cb)
       submitted = 0
       while True:
           try:
               cmd = sys.stdin.readline().strip().lower()
           except KeyboardInterrupt:
               break
           if cmd == 'quit' or cmd == 'exit':
               break

       cluster.wait()
       cluster.print_status()


Efficient Job Submission
========================

When a job is submitted to cluster, the arguments are kept in DispyJob
structure returned. These arguments are used to resubmit the jobs if a
node fails (if computation is reentrant); they can also be used by
client while processing job results (e.g., inspect arguments and
process results accordingly). When large number of jobs are submitted,
especially with large data in arguments, the memory required to keep
arguments (at the client) may be an issue.

Note also that in many examples, jobs are kept in list and processed
in sequence later, without removing jobs from the list. This is
acceptable when job arguments dont't require too much memory, but not
suitable otherwise (as arguments are kept even after jobs are done).
It may be better to keep jobs in a dictionary (using a unique index
that is also set as job ID) and remove the job from this dictionary as
soon as it is done, or at least set *args* and *kwargs* attributes to
**None**.

The program "bounded_submit.py" below uses *callback* feature to keep
the scheduler pipeline busy but not submit all jobs at once. The
variables *lower_bound* and *upper_bound* control number of jobs that
are scheduled at any time. These can be adjusted as suggested, or
dynamically updated with either NodeAllocate or *cluster_status*
feature.:

   def compute(n):  # executed on nodes
       import time
       time.sleep(n)
       return n

   # dispy calls this function to indicate change in job status
   def job_callback(job): # executed at the client
       global pending_jobs, jobs_cond
       if (job.status == dispy.DispyJob.Finished  # most usual case
           or job.status in (dispy.DispyJob.Terminated, dispy.DispyJob.Cancelled,
                             dispy.DispyJob.Abandoned)):
           # 'pending_jobs' is shared between two threads, so access it with
           # 'jobs_cond' (see below)
           jobs_cond.acquire()
           if job.id: # job may have finished before 'main' assigned id
               pending_jobs.pop(job.id)
               # dispy.logger.info('job "%s" done with %s: %s', job.id, job.result, len(pending_jobs))
               if len(pending_jobs) <= lower_bound:
                   jobs_cond.notify()
           jobs_cond.release()

   if __name__ == '__main__':
       import dispy, threading, random

       # set lower and upper bounds as appropriate; assuming there are 30
       # processors in a cluster, bounds are set to 50 to 100
       lower_bound, upper_bound = 50, 100
       # use Condition variable to protect access to pending_jobs, as
       # 'job_callback' is executed in another thread
       jobs_cond = threading.Condition()
       cluster = dispy.JobCluster(compute, callback=job_callback)
       pending_jobs = {}
       # submit 1000 jobs
       i = 0
       while i <= 1000:
           i += 1
           job = cluster.submit(random.uniform(3, 7))
           jobs_cond.acquire()
           job.id = i
           # there is a chance the job may have finished and job_callback called by
           # this time, so put it in 'pending_jobs' only if job is pending
           if job.status == dispy.DispyJob.Created or job.status == dispy.DispyJob.Running:
               pending_jobs[i] = job
               # dispy.logger.info('job "%s" submitted: %s', i, len(pending_jobs))
               if len(pending_jobs) >= upper_bound:
                   while len(pending_jobs) > lower_bound:
                       jobs_cond.wait()
           jobs_cond.release()

       cluster.wait()
       cluster.print_status()
       cluster.close()


Port Forwarding with SSH
========================

If the nodes are on remote network and client is behind a firewall
that can't be configured to allow ports (default 51347-51349) from
nodes to client, then *ssh* can be used for port forwarding (and
security). In this case Amazon EC2 instances are used as remote nodes.
To use this feature, use *ssh* to forward 51347 (default port used by
client that the nodes send information to) with "ssh -R
51347:localhost:51347 <remote-node>", perhaps to start dispynode. In
the case of Amazon EC2, nodes can be configured to use public
(external) IP address (in the range 54.x.x.x) in addition to local IP
address (in the range 172.x.x.x). dispynode should be started to use
external IP address with "dispynode.py -i 54.204.242.185" so client
and nodes can communicate. The client should also set its IP address
to localhost (127.0.0.1) so the remote node will connect to client
over ssh tunnel. "sshportfw.py" shows the configuration and steps:

   def compute(n): # function sent to remote nodes for execution
       time.sleep(n)
       return n

   if __name__ == '__main__':
       import dispy
       # list remote nodes (here Amazon EC2 instance with external IP 54.204.242.185)
       nodes = ['54.204.242.185']
       # use ssh to forward port 51347 for each node; e.g.,
       # 'ssh -R 51347:localhost:51347 54.204.242.185'

       # start dispynode on each node with 'dispynode.py -i 54.204.242.185' (so dispynode
       # uses external IP address instead of default local IP address)
       cluster = dispy.JobCluster(compute, nodes=nodes, ip_addr='127.0.0.1')
       jobs = []
       for i in range(1, 10):
           job = cluster.submit(i)
           jobs.append(job)
       for job in jobs:
           print('result: %s' % job())


Cluster Creation
================

Cluster creation can be customized for various use cases; some
examples are:

* "cluster = dispy.JobCluster(compute, depends=[ClassA, moduleB,
  'file1'])" distributes 'compute' along with ClassA (Python object),
  moduleB (Python object) and 'file1', a file on client computer.
  Presumably ClassA, moduleB and file1 are needed by 'compute'.

* "cluster = dispy.JobCluster(compute, nodes=['node20',
  '192.168.2.21', 'node24'])" sends computation to nodes 'node20',
  'node24' and node with IP address '192.168.2.21'.  These nodes could
  be in different networks, as explicit names / IP addresses are
  listed.

* If nodes are on remote network, then certain ports need to be
  forwarded as the nodes connect to the client to send status /
  results of jobs; see NAT/Firewall Forwarding. If port forwarding is
  not possible, then ssh tunneling can be used. To use this, ssh to
  each node with "ssh -R 51347:127.0.0.1:51347 node" (to possibly
  execute *dispynode (Server)* program on the node if not already
  running), then specify *ext_ip_addr=127.0.0.1,nodes=[node]* to
  JobCluster. If using more than one node, list them all in *nodes*.
  If client port 51347 is not usable, alternate port, say, 2345 can be
  forwarded with "ssh -R 2345:127.0.0.1:2345 node" (use JobCluster
  with *ext_ip_addr=127.0.0.1,port=2345,nodes=[node]*). See SSH Port
  Forwarding for more details.

* "cluster = dispy.JobCluster(compute, nodes=['192.168.2.*'])" sends
  computation to all nodes whose IP address starts with '192.168.2'.
  In this case, it is assumed that '192.168.2' is local network (since
  UDP broadcast is used to discover nodes in a network and
  broadcasting packets don't cross networks).

* "cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.3.22',"  "'172.16.11.22', 'node39', '192.168.2.*'])" sends
  computation to nodes with IP addresses '192.168.3.5',
  '192.168.3.22', '172.16.11.22' and node 'node39' (since explicit
  names / IP addresses are listed, they could be on different
  networks), all nodes whose IP address starts with '192.168.2' (local
  network).

* "cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.3.*', '192.168.2.*'])" In this case, dispy will send
  discovery messages to node with IP address '192.168.3.5'.  If this
  node is running 'dispynetrelay', then all the nodes on that network
  are eligible for executing this computation, as wildcard
  '192.168.3.*' matches IP addresses of those nodes.  In addition,
  computation is also sent to all nodes whose IP address starts with
  '192.168.2' (local network).

* "cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.8.20', '172.16.2.99', '*'])" In this case, dispy will send
  discovery messages to nodes with IP address '192.168.3.5',
  '192.168.8.20' and '172.16.2.99'. If these nodes all are running
  dispynetrelay, then all the nodes on those networks are eligible for
  executing this computation, as wildcard "*" matches IP addresses of
  those nodes. In addition, computation is also sent to all nodes on
  local network (since they also match wildcard "*" and discovery
  message is broadcast on local network).

* Assuming that 192.168.1.39 is the (private) IP address where dispy
  client is used, a.b.c.d is the (public) IP address of NAT
  firewall/gateway (that can be reached from outside) and dispynode is
  running at another public IP address e.f.g.h (so that a.b.c.d and
  e.f.g.h can communicate, but e.f.g.h can't communicate with
  192.168.1.39), "cluster = dispy.JobCluster(compute,
  ip_addr='192.168.1.39', ext_ip_addr='a.b.c.d', nodes=['e.f.g.h'])"
  would work if NAT firewall/gateway forwards TCP port 51347 to
  192.168.1.39.

* "cluster = dispy.JobCluster(compute, secret='super')" distributes
  'compute' to nodes that also use secret 'super' (i.e., nodes started
  with "dispynode.py -s super"). Note that secret is used only for
  establishing communication, but not used to encrypt programs or code
  for python objects. This can be useful to prevent other users from
  (inadvertantly) using the nodes. If encryption is needed, SSL can be
  used; see below.

* "cluster = dispy.JobCluster(compute, certfile='mycert',
  keyfile='mykey')" distributes 'compute' and encrypts all
  communication using SSL certificate stored in 'mycert' file and key
  stored in 'mykey' file. In this case, dispynode must also use same
  certificate and key; i.e., each dispynode must be invoked with
  "dispynode --certfile="mycert" --keyfile="mykey"'"

  If both certificate and key are stored in same file, say,
  'mycertkey', they are expected to be in certfile: "cluster =
  dispy.JobCluster(compute, certfile='mycertkey')"

* "cluster1 = dispy.JobCluster(compute1, nodes=['192.168.3.2',
  '192.168.3.5'])"  "cluster2 = dispy.JobCluster(compute2,
  nodes=['192.168.3.10', '192.168.3.11'])"  distribute 'compute1' to
  nodes 192.168.3.2 and 192.168.3.5, and 'compute2' to nodes
  192.168.3.10 and 192.168.3.11. With this setup, specific
  computations can be scheduled on certain node(s).


MapReduce
=========

A simple version of word count example from MapReduce:

   # a version of word frequency example from mapreduce tutorial

   def mapper(doc):
       # input reader and map function are combined
       import os
       words = []
       with open(os.path.join('/tmp', doc)) as fd:
           for line in fd:
               words.extend((word.lower(), 1) for word in line.split() \
                            if len(word) > 3 and word.isalpha())
       return words

   def reducer(words):
       # we should generate sorted lists which are then merged,
       # but to keep things simple, we use dicts
       word_count = {}
       for word, count in words:
           if word not in word_count:
               word_count[word] = 0
           word_count[word] += count
       # print('reducer: %s to %s' % (len(words), len(word_count)))
       return word_count

   if __name__ == '__main__':
       import dispy, logging
       # assume nodes node1 and node2 have 'doc1', 'doc2' etc. on their
       # local storage, so no need to transfer them
       map_cluster = dispy.JobCluster(mapper, nodes=['node1', 'node2'], reentrant=True)
       # any node can work on reduce
       reduce_cluster = dispy.JobCluster(reducer, nodes=['*'], reentrant=True)
       map_jobs = []
       for f in ['doc1', 'doc2', 'doc3', 'doc4', 'doc5']:
           job = map_cluster.submit(f)
           map_jobs.append(job)
       reduce_jobs = []
       for map_job in map_jobs:
           words = map_job()
           if not words:
               print(map_job.exception)
               continue
           # simple partition
           n = 0
           while n < len(words):
               m = min(len(words) - n, 1000)
               reduce_job = reduce_cluster.submit(words[n:n+m])
               reduce_jobs.append(reduce_job)
               n += m
       # reduce
       word_count = {}
       for reduce_job in reduce_jobs:
           words = reduce_job()
           if not words:
               print(reduce_job.exception)
               continue
           for word, count in words.iteritems():
               if word not in word_count:
                   word_count[word] = 0
               word_count[word] += count
       # sort words by frequency and print
       for word in sorted(word_count, key=lambda x: word_count[x], reverse=True):
           count = word_count[word]
           print(word, count)
       reduce_cluster.print_status()
