
Examples
********

dispy can be used to distribute standalone programs or Python program
fragments (functions, classes, modules) and files (data) to nodes and
execute jobs in parallel. It supports various options to handle rather
comprehensive cases (such as fault tolerance, sharing nodes in
multiple programs simulataneously, using nodes in multiple networks
etc.); however, in common setups, the usage is simple, as done in the
demonstrative examples below.

1. dispy can be used as a command line tool (for simple cases,
   scheduling cron jobs); in this case the computations should only be
   programs and dependencies should only be files.:

      dispy.py -f /some/file1 -f file2 -a "arg11 arg12" -a "arg21 arg22" -a "arg3" /some/program

   will distribute '/some/program' with dependencies '/some/file1' and
   'file2' and then execute '/some/program' in parallel with 3
   instances: a) arg11 and arg12 (two arguments to the program), b)
   arg21 and arg22 (two arguments), and c) arg3 (one argument).

2. A simple client program that distributes a program (say,
   '/path/to/program') to all the nodes in a local network running
   *dispynode (Server)*, executes them with a sequence of numbers is:

      import dispy
      cluster = dispy.JobCluster('/path/to/program')
      for i in range(50):
          cluster.submit(i)

   The program '/path/to/program' on the client computer is
   transferred to each of the nodes, so if the program is a binary
   program then all the nodes should have same architecture as the
   client.

   In the cases above we assume that the programs execute and save the
   computation results in a database, file system etc. If we are
   interested in exit status, output from each run etc., then we need
   to collect each of the jobs submitted from which interested
   attributes can be retrieved, as done in the example below.

3. A canonical cluster that distributes computation 'compute'
   (Python function) to nodes (running *dispynode (Server)* on a local
   network), schedules jobs with the cluster, gets jobs' results and
   prints them is:

      # function 'compute' is distributed and executed with arguments
      # supplied with 'cluster.submit' below
      def compute(n):
          import time, socket
          time.sleep(n)
          host = socket.gethostname()
          return (host, n)

      if __name__ == '__main__':
          # executed on client only; variables created below, including modules imported,
          # are not available in job computations
          import dispy, random
          cluster = dispy.JobCluster(compute)
          jobs = []
          for i in range(20):
              job = cluster.submit(random.randint(5,20))
              job.id = i # associate an ID to identify jobs (if needed later)
              jobs.append(job)
          # cluster.wait() # waits until all jobs finish
          for job in jobs:
              host, n = job() # waits for job to finish and returns results
              print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
              # other fields of 'job' that may be useful:
              # job.stdout, job.stderr, job.exception, job.ip_addr, job.end_time
          cluster.stats()  # shows which nodes executed how many jobs etc.

4. If the computation has any dependencies, such as classes,
   objects or files, they can be specified with 'depends' argument and
   dispy will distribute them along with the computation.

   Continuing trivial but illustrative examples, the program below
   distributes computation to be executed with instances of a class.:

      class C(object):
          def __init__(self, i, n):
              self.i = i
              self.n = n

          def show(self):
              print('%s: %.2f' % (self.i, self.n))

      def compute(obj):
          # obj is an instance of C
          import time
          time.sleep(obj.n)
          obj.show()  # the output is stored in job.stdout
          return obj.n

      if __name__ == '__main__':
          import random, dispy
          cluster = dispy.JobCluster(compute, depends=[C])
          jobs = []
          for i in range(10):
              c = C(i, random.uniform(1, 3)) # create object of C
              job = cluster.submit(c) # it is sent to a node for executing 'compute'
              job.id = c # store this object for later use
              jobs.append(job)
          for job in jobs:
              job() # wait for job to finish
              print('%s: %.2f / %s' % (job.id.i, job.result, job.stdout))

   Note that class C is given in 'depends' so the code for it is
   transferred to the nodes automatically and the objects created in
   client program work transparently in 'compute' on remote nodes. The
   objects are serialized using Pickle and sent over the to the nodes,
   so the objects must be serializable. If they are not serializable
   (e.g., they contain references to locks), then the class must
   provide **__getstate__** and **__setstate__** methods; see Python
   object serialization for details. In addition, the objects
   shouldn't contain file descriptors, references to other objects not
   being transferred etc., which are not valid on remote nodes.

5. *setup* and *cleanup* parameters to cluster can be used to
   initialize/de-initialize a node for running jobs of that
   computaiton, e.g., to manipulate transferred files, read data into
   memory so jobs can process data efficiently, set/unset global
   variables on that node.

   This feature works with Posix systems (Linux, OS X and other Unix
   variants) without limitations - any data can assigned to variables
   declared global in *setup*, as operating system's *fork* is used to
   create child process, which shares the address space of parent
   process (where *setup* function is executed) with copy-on-write.
   This feature can be used, for example, to read large amount of data
   in file(s) so computations (jobs) can directly access the data in
   memory, instead of reading same data from file each time.

   Under Windows, though, *fork* is not available, so the global
   variables are serialized and passed to child process (see
   multiprocessing's Programming guidelines for Windows). Thus, for
   example, modules can't be loaded in global scope with *setup* under
   Windows. Moreover, as each job runs with a copy of all the global
   variables, initializing objects that require lot of memory may not
   be possible / efficient (compared to Posix systems where objects
   are not copied).

   In the example below, data in a given file (or the program itself
   if no data file is given) is transferred to the nodes. The *setup*
   function reads that file in to global variable. The jobs compute
   checksum of that data in memory (for in-memory processing).  The
   *cleanup* function deletes the global variable.:

      # executed on each node before any jobs are scheduled
      def setup(data_file):
          # read data in file to global variable
          global data, algorithms
          import hashlib
          data = open(data_file).read()
          if sys.version_info.major >= 3:
              data = data.encode()
              algorithms = list(hashlib.algorithms_guaranteed)
          else:
              algorithms = list(hashlib.algorithms)
          # it should return 0 to indicate successful initialization
          return 0

      # executed on each node when compuation is closed
      def cleanup():
          global data, algorithms
          del data, algorithms

      # job computation
      def compute(n):
          import hashlib
          # 'data' and 'algorithms' global variables are initialized in 'setup'
          alg = algorithms[n % len(algorithms)]
          csum = getattr(hashlib, alg)()
          csum.update(data)
          return (alg, csum.hexdigest())

      if __name__ == '__main__':
          import dispy, sys, functools
          data_file = sys.argv[1] if len(sys.argv) > 1 else sys.argv[0]
          cluster = dispy.JobCluster(compute, depends=[data_file],
                                     setup=functools.partial(setup, data_file), cleanup=cleanup)
          jobs = []
          for n in range(10):
              job = cluster.submit(n)
              job.id = n
              jobs.append(job)

          for job in jobs:
              job()
              if job.status == dispy.DispyJob.Finished:
                  print('%s: %s : %s' % (job.id, job.result[0], job.result[1]))
              else:
                  print(job.exception)
          cluster.stats()
          cluster.close()

   In the above program, 'hashlib' module is not a global variable
   (and loaded in 'computation') so the program works under Windows.
   If running under Posix (Linux, OS X, etc.) then the 'hashlib' can
   be declared global variable in 'setup' and loading in 'computation'
   is then not needed.

6. The example above creates global variables that can be read, but
   not update, in jobs.  *setup* and *cleanup* can also be used to
   create global variables that can be updated in jobs on a node (but
   not by jobs in other nodes) using multiprocessing module's Sharing
   state between processes. The program below creates an integer
   shared variable that is updated by jobs running on that node.:

      def setup():
          import multiprocessing, multiprocessing.sharedctypes
          global shvar
          lock = multiprocessing.Lock()
          shvar = multiprocessing.sharedctypes.Value('i', 1, lock=lock)
          return 0

      def cleanup():
          global shvar
          del shvar

      def compute():
          import random
          r = random.randint(1, 10)
          global shvar
          shvar.value += r  # update 'shvar'; all jobs on this node will
                            # see updated value
          return shvar.value

      if __name__ == '__main__':
          import dispy
          cluster = dispy.JobCluster(compute, setup=setup, cleanup=cleanup)
          jobs = []
          for n in range(10):
              job = cluster.submit()
              job.id = n
              jobs.append(job)

          for job in jobs:
              job()
              if job.status != dispy.DispyJob.Finished:
                  print('job %s failed: %s' % (job.id, job.exception))
              else:
                  print('%s: %s' % (job.id, job.result))
          cluster.stats()
          cluster.close()

   See asyncoro's Distributed Communicating Processes for an alternate
   approach, where setup can be used to initialize any data that can
   be updated in computations without any limitations, even under
   Windows. With asyncoro, computations have to be coroutines and
   client has to implement scheduling coroutines.

7. *dispy_send_file* can be used to transfer file(s) to the client.
   Assume that computaion creates files with the parameter given (in
   this case *n*) so different runs create different files (otherwise,
   file(s) sent by one computation will overwrite files sent by other
   computations). Such files can be sent to the client with:

      def compute(n):
          import time
          time.sleep(n)
          # assume that computation saves data in file n.dat
          dispy_send_file(str(n) + '.dat') # send file to client
          # ... continue further computations
          return n

      if __name__ == '__main__':
          import dispy, random
          cluster = dispy.JobCluster(compute)
          jobs = []
          for i in range(20):
              job = cluster.submit(random.randint(5,20))
              job.id = i
              jobs.append(job)
          for job in jobs:
              job()
              print('job %s results in file %s' % (job.id, str(job.id) + '.dat'))

   If the client needs to process the files as soon as they are
   transferred, Provisional/Intermediate Results feature along with
   callback can be used to notify the client.

8. *submit_node* method and *cluster_status* callback can be used
   to schedule jobs with full control over when / which node executes
   a job.  The program below schedules a job to compute sha1 checksum
   of data files whenever a processor is available.:

      # job computation runs at dispynode servers
      def compute(path):
          import hashlib, time, os
          csum = hashlib.sha1()
          with open(os.path.basename(path), 'rb') as fd:
              while True:
                  data = fd.read(1024000)
                  if not data:
                      break
                  csum.update(data)
          time.sleep(5)
          return csum.hexdigest()

      # 'cluster_status' callback function. It is called by dispy (client)
      # to indicate node / job status changes. Here node initialization and
      # job done status are used to schedule jobs, so at most one job is
      # running on a node (even if a node has more than one processor). Data
      # files are assumed to be 'data000', 'data001' etc.
      def status_cb(status, node, job):
          if status == dispy.DispyJob.Finished:
              print('sha1sum for %s: %s' % (job.id, job.result))
          elif status == dispy.DispyJob.Terminated:
              print('sha1sum for %s failed: %s' % (job.id, job.exception))
          elif status == dispy.DispyNode.Initialized:
              print('node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus))
          else:  # ignore other status messages
              return

          global submitted
          data_file = 'data%03d' % submitted
          if os.path.isfile(data_file):
              submitted += 1
              # 'node' and 'dispy_job_depends' are consumed by dispy;
              # 'compute' is called with only 'data_file' as argument(s)
              job = cluster.submit_node(node, data_file, dispy_job_depends=[data_file])
              job.id = data_file

      if __name__ == '__main__':
          import dispy, sys, os
          cluster = dispy.JobCluster(compute, cluster_status=status_cb)
          submitted = 0
          while True:
              try:
                  cmd = sys.stdin.readline().strip().lower()
              except KeyboardInterrupt:
                  break
              if cmd == 'quit' or cmd == 'exit':
                  break

          cluster.wait()
          cluster.stats()

Cluster creation can be customized for various use cases; some
examples are:

* "cluster = dispy.JobCluster(compute, depends=[ClassA, moduleB,
  'file1'])" distributes 'compute' along with ClassA (Python object),
  moduleB (Python object) and 'file1', a file on client computer.
  Presumably ClassA, moduleB and file1 are needed by 'compute'.

* "cluster = dispy.JobCluster(compute, nodes=['node20',
  '192.168.2.21', 'node24'])" sends computation to nodes 'node20',
  'node24' and node with IP address '192.168.2.21'.  These nodes could
  be in different networks, as explicit names / IP addresses are
  listed.

* If nodes are on remote network, then certain ports need to be
  forwarded as the nodes connect to the client to send status /
  results of jobs; see NAT/Firewall Forwarding. If port forwarding is
  not possible, then ssh tunneling can be used. To use this, ssh to
  each node with "ssh -R 51347:127.0.0.1:51347 node" (to possibly
  execute *dispynode (Server)* program on the node if not already
  running), then specify *ext_ip_addr=127.0.0.1,nodes=[node]* to
  JobCluster. If using more than one node, list them all in *nodes*.
  If client port 51347 is not usable, alternate port, say, 2345 can be
  forwarded with "ssh -R 2345:127.0.0.1:2345 node" (use JobCluster
  with *ext_ip_addr=127.0.0.1,port=2345,nodes=[node]*). See SSH Port
  Forwarding for more details.

* "cluster = dispy.JobCluster(compute, nodes=['192.168.2.*'])" sends
  computation to all nodes whose IP address starts with '192.168.2'.
  In this case, it is assumed that '192.168.2' is local network (since
  UDP broadcast is used to discover nodes in a network and
  broadcasting packets don't cross networks).

* "cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.3.22',"  "'172.16.11.22', 'node39', '192.168.2.*'])" sends
  computation to nodes with IP addresses '192.168.3.5',
  '192.168.3.22', '172.16.11.22' and node 'node39' (since explicit
  names / IP addresses are listed, they could be on different
  networks), all nodes whose IP address starts with '192.168.2' (local
  network).

* "cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.3.*', '192.168.2.*'])" In this case, dispy will send
  discovery messages to node with IP address '192.168.3.5'. If this
  node is running 'dispynetrelay', then all the nodes on that network
  are eligible for executing this computation, as wildcard
  '192.168.3.*' matches IP addresses of those nodes. In addition,
  computation is also sent to all nodes whose IP address starts with
  '192.168.2' (local network).

* "cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.8.20', '172.16.2.99', '*'])" In this case, dispy will send
  discovery messages to nodes with IP address '192.168.3.5',
  '192.168.8.20' and '172.16.2.99'. If these nodes all are running
  dispynetrelay, then all the nodes on those networks are eligible for
  executing this computation, as wildcard "*" matches IP addresses of
  those nodes. In addition, computation is also sent to all nodes on
  local network (since they also match wildcard "*" and discovery
  message is broadcast on local network).

* Assuming that 192.168.1.39 is the (private) IP address where dispy
  client is used, a.b.c.d is the (public) IP address of NAT
  firewall/gateway (that can be reached from outside) and dispynode is
  running at another public IP address e.f.g.h (so that a.b.c.d and
  e.f.g.h can communicate, but e.f.g.h can't communicate with
  192.168.1.39), "cluster = dispy.JobCluster(compute,
  ip_addr='192.168.1.39', ext_ip_addr='a.b.c.d', nodes=['e.f.g.h'])"
  would work if NAT firewall/gateway forwards UDP and TCP ports 51347
  to 192.168.1.39.

* "cluster = dispy.JobCluster(compute, secret='super')" distributes
  'compute' to nodes that also use secret 'super' (i.e., nodes started
  with "dispynode.py -s super"). Note that secret is used only for
  establishing communication, but not used to encrypt programs or code
  for python objects. This can be useful to prevent other users from
  (inadvertantly) using the nodes. If encryption is needed, SSL can be
  used; see below.

* "cluster = dispy.JobCluster(compute, certfile='mycert',
  keyfile='mykey')" distributes 'compute' and encrypts all
  communication using SSL certificate stored in 'mycert' file and key
  stored in 'mykey' file. In this case, dispynode must also use same
  certificate and key; i.e., each dispynode must be invoked with
  "dispynode --certfile="mycert" --keyfile="mykey"'"

  If both certificate and key are stored in same file, say,
  'mycertkey', they are expected to be in certfile: "cluster =
  dispy.JobCluster(compute, certfile='mycertkey')"

* "cluster1 = dispy.JobCluster(compute1, nodes=['192.168.3.2',
  '192.168.3.5'])"  "cluster2 = dispy.JobCluster(compute2,
  nodes=['192.168.3.10', '192.168.3.11'])"  distribute 'compute1' to
  nodes 192.168.3.2 and 192.168.3.5, and 'compute2' to nodes
  192.168.3.10 and 192.168.3.11. With this setup, specific
  computations can be scheduled on certain node(s).

A simple version of word count example from MapReduce:

   # a version of word frequency example from mapreduce tutorial

   def mapper(doc):
       # input reader and map function are combined
       import os
       words = []
       with open(os.path.join('/tmp', doc)) as fd:
           for line in fd:
               words.extend((word.lower(), 1) for word in line.split() \
                            if len(word) > 3 and word.isalpha())
       return words

   def reducer(words):
       # we should generate sorted lists which are then merged,
       # but to keep things simple, we use dicts
       word_count = {}
       for word, count in words:
           if word not in word_count:
               word_count[word] = 0
           word_count[word] += count
       # print('reducer: %s to %s' % (len(words), len(word_count)))
       return word_count

   if __name__ == '__main__':
       import dispy, logging
       # assume nodes node1 and node2 have 'doc1', 'doc2' etc. on their
       # local storage, so no need to transfer them
       map_cluster = dispy.JobCluster(mapper, nodes=['node1', 'node2'], reentrant=True)
       # any node can work on reduce
       reduce_cluster = dispy.JobCluster(reducer, nodes=['*'], reentrant=True)
       map_jobs = []
       for f in ['doc1', 'doc2', 'doc3', 'doc4', 'doc5']:
           job = map_cluster.submit(f)
           map_jobs.append(job)
       reduce_jobs = []
       for map_job in map_jobs:
           words = map_job()
           if not words:
               print(map_job.exception)
               continue
           # simple partition
           n = 0
           while n < len(words):
               m = min(len(words) - n, 1000)
               reduce_job = reduce_cluster.submit(words[n:n+m])
               reduce_jobs.append(reduce_job)
               n += m
       # reduce
       word_count = {}
       for reduce_job in reduce_jobs:
           words = reduce_job()
           if not words:
               print(reduce_job.exception)
               continue
           for word, count in words.iteritems():
               if word not in word_count:
                   word_count[word] = 0
               word_count[word] += count
       # sort words by frequency and print
       for word in sorted(word_count, key=lambda x: word_count[x], reverse=True):
           count = word_count[word]
           print(word, count)
       reduce_cluster.stats()
