#!/usr/bin/env bash
#
# PPSS, the Parallel Processing Shell Script
# 
# Copyright (c) 2010, Louwrentius
#
# This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    See <http://www.gnu.org/licenses/>
#    for a copy of the GNU General Public License
#
# "Patches or other contributions are always welcome!"
#

#
# Handling control-c for a clean shutdown.
#
trap 'kill_process' SIGINT 

SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.97"

#
# The first argument to this script can be a mode.
#
MODES="node start config stop pause continue deploy status erase kill"
for x in $MODES
do
    if [[ "$x" == "$1" ]] 
    then
        MODE="$1"
        shift
        break
    fi
done

#
# The working directory of PPSS can be set with
# export PPSS_DIR=/path/to/workingdir
#
if [[ -z "$PPSS_DIR" ]] 
then
    PPSS_DIR="ppss_dir"
fi

CONFIG=""
HOSTNAME="$(hostname)"
ARCH="$(uname)"
PPSS_HOME_DIR="ppss-home"
SOURCED="$0"

PID="$$"
PAUSE_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/pause_signal"      # Pause processing if this file is present.
PAUSE_DELAY="60"                                          # Polling every 1 minutes by default.
STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal"        # Stop processing if this file is present.
GLOBAL_COUNTER=1
LISTOFITEMS="$PPSS_DIR/INPUT_FILE-$PID"
JOB_LOG_DIR="$PPSS_DIR/job_log"                           # Directory containing log files of processed items.
LOGFILE="$PPSS_DIR/ppss-log-$PID.txt"                     # General PPSS log file. Contains lots of info.
FAILED_ITEMS_COUNTER=0
if [[ -z "$QUIET" ]] 
then
    QUIET="0"
fi
STOP="0"                                                  # STOP job.
MAX_DELAY="0"                                             # MAX DELAY between jobs.
MAX_LOCK_DELAY="9"                                        #
PERCENT="0"
LISTENER_PID=""
IFS_BACKUP="$IFS"
CPUINFO="/proc/cpuinfo"
PROCESSORS=""
START_KEY="start-$RANDOM$RANDOM$RANDOM$RANDOM"           # If this key is received by listener, start a new process
FAIL_KEY="fail-$RANDOM$RANDOM$RANDOM$RANDOM"            # if this key is received by listener, increase error count              
KILL_KEY="kill-$RANDOM$RANDOM$RANDOM$RANDOM"            # This is a signal to stop immediately and kill PPSS
QUEUE=""
INOTIFY=""
TRAVERSAL="1"                                           # all running processes.
START_PPSS=""
STOP_PPSS=""
SIZE_OF_INPUT=""
LOCAL_LOCKING="1"
LIST_OF_PROCESSED_ITEMS="$PPSS_DIR/LIST_OF_PROCESSED_ITEMS"
PROCESSED_ITEMS=""
UNPROCESSED_ITEMS=""
FAILED_ITEMS="$PPSS_DIR/LIST_OF_FAILED_ITEMS"
ACTIVE_WORKERS="0"
DAEMON_POLLING_INTERVAL="10"
STAT=""
DAEMON_FILE_AGE="4"
ENABLE_INPUT_LOCK="0"
PROCESSING_TIME=""
NODE_ID="NODE_ID"
USE_MD5="0"
RANDOMIZE="0"

SSH_SERVER=""                           # Remote server or 'master'.
SSH_KEY=""                              # SSH key for ssh account.
SSH_KNOWN_HOSTS=""
SSH_SOCKET="$PPSS_DIR/ppss_ssh_socket-$$"          # Multiplex multiple SSH connections over 1 master.
SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
                           -o GlobalKnownHostsFile=./known_hosts \
                           -o ControlMaster=auto \
                           -o Cipher=blowfish \
                           -o ConnectTimeout=10 "

SSH_OPTS_NOMP="-o BatchMode=yes -o GlobalKnownHostsFile=./known_hosts \
                           -o Cipher=blowfish \
                           -o ConnectTimeout=10 "
                                        # Blowfish is faster but still secure. 
SSH_MASTER_PID=""

ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR"      # Remote directory on master used for item locking.
PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR"   # Local directory on slave for local processing.
PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT"   # Local directory on slave for local output.
DOWNLOAD_TO_NODE="0"                   # Transfer item to slave via (s)cp.
UPLOAD_TO_SERVER="0"                   # Transfer output back to server via (s)cp.
SECURE_COPY="1"                         # If set, use SCP, Otherwise, use cp.
REMOTE_OUTPUT_DIR=""                    # Remote directory to which output must be uploaded.
SCRIPT=""                               # Custom user script that is executed by ppss.
ITEM_ESCAPED=""
DISABLE_SKIPPING=0
PPSS_NODE_STATUS="$PPSS_DIR/NODE_STATUS"
NODE_STATUS_FILE="$PPSS_NODE_STATUS/$HOSTNAME-status.txt"
DAEMON=0
EMAIL=""

REGISTER=""                               # For STACK
STACK=""
TMP_STACK=""

showusage_short () {

    echo
    echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
    echo
    echo "usage: $0 [[ -d <sourcedir> | -f <sourcefile> ]]  [[ -c '<command> \"\$ITEM\"' ]] "
    echo "                 [[ -C <configfile> ]] [[ -j ]] [[ -l <logfile> ]] [[ -p <# jobs> ]] "
    echo "                 [[ -q ]] [[ -D <delay> ]] [[ -h ]] [[ --help ]] [[ -r ]] [[ --daemon ]] " 
    echo
    echo "Examples:"
    echo "                 $0 -d /dir/with/some/files -c 'gzip '" 
    echo "                 $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2" 
    echo "                 $0 -f <file> -c 'wget -q -P /destination/directory \"\$ITEM\"' -p 10" 
    echo 
}

showusage_normal () {

    showusage_head
    showusage_basic
    showusage_basic_example

}

showusage_long () {

    showusage_extended_head
    showusage_basic
    showusage_extended_body
}

showusage_head () {

    echo 
    echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
    echo 
    echo "PPSS is a Bash shell script that executes commands in parallel on a set"
    echo "of items, such as files in a directory, or lines in a file. The purpose" 
    echo "of PPSS is to make it simple to benefit from multiple CPUs or CPU cores."
    echo 
    echo "This short summary only discusses options for stand-alone mode. For a"
    echo "full listing of all options, run PPSS with the options --help"
    echo
}

showusage_basic () {

    echo "Usage $0 [[ options ]] "
    echo
    echo -e "--command | -c     Command to execute. Syntax: '<command> ' including the single quotes."
    echo -e "                   Example: -c 'ls -alh '. It is also possible to specify where an item "
    echo -e "                   must be inserted: 'cp \"\$ITEM\" /somedir'."
    echo 
    echo -e "--sourcedir | -d   Directory that contains files that must be processed. Individual files" 
    echo -e "                   are fed as an argument to the command that has been specified with -c." 
    echo 
    echo -e "--sourcefile | -f  Each single line of the supplied file will be fed as an item to the"
    echo -e "                   command that has been specified with -c. Read input from stdin with"
    echo -e "                   -f -"
    echo 
    echo -e "--config | -C      If the mode is config, a config file with the specified name will be"
    echo -e "                   generated based on all the options specified. In the other modes". 
    echo -e "                   this option will result in PPSS reading the config file and start"
    echo -e "                   processing items based on the settings of this file."
    echo
    echo -e "--disable-ht | -j  Disable hyper threading. Is enabled by default."
    echo 
    echo -e "--log | -l         Sets the name of the log file. The default is ppss-log.txt."
    echo
    echo -e "--processes | -p   Start the specified number of processes. Ignore the number of available"
    echo -e "                   CPUs."
    echo
    echo -e "--quiet | -q       Shows no output except for a progress indication using percents."
    echo
    echo -e "--delay | -D       Adds an initial random delay to the start of all parallel jobs to spread"
    echo -e "                   the load. The delay (seconds) is only used at the start of all 'threads'."
    echo
    echo -e "--daemon           Daemon mode. Do not exit after items are professed, but keep looking "
    echo -e "                   for new items and process them. Read the manual how to use this!"
    echo -e "                   See --help for important additional options regarding daemon mode."
    echo
    echo -e "--disable-inotify  Linux users can use real-time inotify filesystem events when using"
    echo -e "                   daemon mode. Requires inotify-tools. Enabled by default if available."
    echo -e "                   Automatically disabled if NFS is used as the daeon source dir."
    echo
    echo -e "--no-traversal|-r  By default, PPSS uses the regular 'find' command to list all files"
    echo -e "                   within the directory specified by the -d option. If you do not wish"
    echo -e "                   for PPSS to process files in sub directories, use this option."
    echo -e "                   Only files within the specified directory will be processed. Any"
    echo -e "                   subdirectories will then be ignored."
    echo 
    echo -e "--email | -e       PPSS sends an e-mail if PPSS has finished. It is also used if processing"
    echo -e "                   of an item has failed (configurable, see -h). "
    echo
    echo -e "--debug            Enable debugging output to the |P|P|S|S| log file."
    echo 
    echo -e "--help             Extended help, including options for distributed mode."
}

showusage_basic_example () {

    echo 
    echo -e "Example: encoding some wav files to mp3 using lame:"
    echo 
    echo -e "$0 -d /path/to/wavfiles -c 'lame '" 
    echo 
    echo -e "Extended usage: use --help"
    echo
}

showusage_extended_head () {
    
    echo 
    echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
    echo 
    echo "PPSS is a Bash shell script that executes commands in parallel on a set  "
    echo "of items, such as files in a directory, or lines in a file."
    echo 
    echo "Usage: $0 [[ MODE ]] [[ options ]] "
    echo 
    echo "Modes are optional and mainly used for running in distributed mode. Modes are:"
    echo 
    echo " config       Generate a config file based on the supplied option parameters."
    echo " deploy       Deploy PPSS and related files on the specified nodes."
    echo " erase        Erase PPSS and related files from the specified nodes."
    echo
    echo " start        Starting PPSS on nodes."
    echo " pause        Pausing PPSS on all nodes."
    echo " stop         Stopping PPSS on all nodes."
    echo " continue     Continuing PPSS on all nodes."
    echo " node         Running PPSS as a node, requires additional options."
}

showusage_extended_body () {

    echo 
    echo -e "The following options are used for distributed execution of PPSS."
    echo 
    echo -e "--master | -m      Specifies the SSH server that is used for communication between nodes."
    echo -e "                   Using SSH, file locks are created, informing other nodes that an item "
    echo -e "                   is locked. If items are files that must be processed, they must reside" 
    echo -e "                   on this host. SCP is used to transfer files from this host to nodes"
    echo -e "                   for local procesing."
    echo
    echo -e "--node | -n        File containig a list of nodes that act as PPSS clients. One IP / DNS"
    echo -e "                   name per line."
    echo
    echo -e "--key | -k         The SSH key that a node uses to connect to the master."
    echo
    echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on  "
    echo -e "                   hosts that already once connected to the server. See the file "
    echo -e "                   ~/.ssh/known_hosts or else, manualy connect once and check this file."
    echo
    echo -e "--user | -u        The SSH user name that is used by the node when logging in into the"
    echo -e "                   master SSH server."
    echo 
    echo -e "--script | -S      Specifies the script/program that must be copied to the nodes for"
    echo -e "                   execution through PPSS. Only used in the deploy mode."
    echo -e "                   This option should be specified if necessary when generating a config."
    echo
    echo -e "--download         This option specifies that an item will be downloaded by the node"
    echo -e "                   from the server or share to the local node for processing."
    echo 
    echo -e "--upload           This option specifies that the output file will be copied back to"
    echo -e "                   the server, the --outputdir option is mandatory."
    echo 
    echo -e "--no-scp | -b      Do not use scp for downloading items. Use cp instead. Assumes that a"
    echo -e "                   network file system (NFS/SMB) is mounted under a local mount point."
    echo 
    echo -e "--outputdir | -o   Directory on server where processed files are put. If the result of "
    echo -e "                   encoding a wav file is an mp3 file, the mp3 file is put in the "
    echo -e "                   directory specified with this option."
    echo 
    echo -e "--homedir | -H     Directory in which PPSS is installed on the node."
    echo -e "                   Default is '$PPSS_HOME_DIR'."
    echo
    echo -e "--script | -S      Script to run on the node. PPSS must copy this script to the node."
    echo
    echo -e "--randomize | -R   Randomise which items to process by the client in distributed mode."
    echo -e "                   This makes sure that with many nodes, it is prevented that some"
    echo -e "                   clients spend all their time trying to get a lock on an item."
    echo
    echo -e "Example: encoding some wav files to mp3 using lame:"
    echo 
    echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j " 
    echo 
    echo -e "Running PPSS based on a configuration file."
    echo
    echo -e "$0 -C config.cfg"
    echo 
    echo -e "Generating a configuration file. Wavs are converted to mp3. SCP is used for data transfer."
    echo
    echo -e "$0 config -C ppss-config.cfg -d /some/dir -o output --download --upload -K known_hosts \\"
    echo -e "-k ppss-key.dsa -n nodes.txt -m 10.0.0.100 \\"
    echo -e "-c 'lame --quiet \"\$ITEM\" -o \"\$OUTPUT_DIR/\$OUTPUT_FILE\".mp3' "
    echo 
    echo -e "Running PPSS on a client as part of a cluster."
    echo 
    echo -e "$0 node -d /somedir -c 'cp \"\$ITEM\" /some/destination' -m 10.0.0.50 -u ppss -k ppss-key.key"
    echo    
}

kill_process () {

    echo "$KILL_KEY" >> "$FIFO"
}

exec_cmd () { 

    STATUS=""
    CMD="$1"
    NOMP="$2" # Disable multiplexing.

    if [[ "$ARCH" == "FreeBSD" ]]
    then
        CMD="bash $CMD"
    fi

    if [[ ! -z "$SSH_SERVER" ]] 
    then
        if [[ -z "$NOMP" ]] 
        then
            ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD
            STATUS=$?
        elif [[ "$NOMP" == "1" ]] 
        then
            ssh $SSH_OPTS_NOMP $SSH_KEY $USER@$SSH_SERVER $CMD
            STATUS=$?
        fi
    else
        eval "$CMD"
        STATUS=$?
        log DEBUG "LOCAL EXEC - status is $STATUS"
    fi
    return $STATUS
}

does_file_exist () {

    #
    # this function makes remote or local checking of existence of items transparent.
    #
    FILE="$1"
    RES=$(exec_cmd "ls -1 $FILE" 2>&1)
    if [[ "$?" == "0" ]] 
    then
        return 0
    else 
        return 1
    fi
}

check_for_interrupt () {

    #
    # PPSS can be interupted with a stop or pause command.
    #
    does_file_exist "$STOP_SIGNAL"
    if [[ "$?" == "0" ]] 
    then
        set_status "STOPPED" "$FAILED_ITEMS_COUNTER"
        log INFO "STOPPING job. Stop signal found."
        STOP="1"
        return 1
    fi

    does_file_exist "$PAUSE_SIGNAL"
    if [[ "$?" == "0" ]] 
    then
        set_status "PAUSED" "$FAILED_ITEMS_COUNTER"
        log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS."
        sleep $PAUSE_DELAY
        check_for_interrupt
    else
        set_status "RUNNING" "$FAILED_ITEMS_COUNTER"
    fi
}

cleanup () {
    
    log DEBUG "$FUNCNAME - Cleaning up all temp files and processes. $1"

    for x in $MODES
    do
        if [[ "$x" == "$MODE" ]] 
        then
            if [[ "$x" != "node" ]] 
            then
                rm -rf "$PPSS_DIR"
            fi
        fi
    done
    
    if [[ -e "$FIFO" ]] 
    then 
        rm "$FIFO"
    fi

    if [[ -e "$FIFO_LISTENER" ]] 
    then 
        rm "$FIFO_LISTENER"
    fi

    if [[ -e "$SSH_SOCKET" ]] 
    then
        rm -rf "$SSH_SOCKET"
    fi
    
    if [[ ! -z "$SSH_MASTER_PID" ]] 
    then
        kill "$SSH_MASTER_PID"
    fi
}

add_var_to_config () {
    
    if [[ "$MODE" == "config" ]] 
    then

        VAR="$1"
        VALUE="$2"

        echo -e "$VAR=$VALUE" >> $CONFIG
    fi
}

is_var_empty () {

    if [[ -z "$1" ]] 
    then
        showusage_
        cleanup
        exit 1
    fi
}

detect_source_dir_nfs_exported () {
    
    #log DEBUG "Executing $FUNCNAME"

    NFS=0
    NFS_MOUNTED_FS=$(mount | grep ":" | cut -d ":" -f 2 | awk '{ print $3 }')

    for mount in $NFS_MOUNTED_FS
    do
    
        #
        # If this for loop matches anything, the SRC_DIR is NFS exported.
        # inotify does not play well with NFS. So it must be disabled.
        #
        DIRECTORY="$SRC_DIR"
        while true
        do
            #log DEBUG "NFS TEST: $mount vs. $DIRECTORY"
            if [[ $DIRECTORY != [/.]* ]] 
            then
                if [[ "$mount" == "$DIRECTORY" ]] 
                then
                    NFS=1
                    break
                fi
            else
                break
            fi
            DIRECTORY=$(dirname "$DIRECTORY")
        done
    done
    if [[ "$NFS" == "1" ]] 
    then
        #log DEBUG "Source directory is NFS exported. Disabling inotify."
        return 1
    else
        #log DEBUG "Source directory is NOT NFS exported. Enabling inotify."
        return 0
    fi 
}

detect_inotify () {

    if [[ -e /usr/bin/inotifywait && "$INOTIFY" != "0" ]] && detect_source_dir_nfs_exported
    then
        INOTIFY=1
    else
        INOTIFY=0
    fi
}

process_arguments () {

    #
    # Process any command-line options that are specified."
    #

    if [[ "$#" == "0" ]] 
    then
        showusage_short
        exit 1
    fi

    while [[ $# -gt 0 ]] 
    do
        case $1 in

            --config|-C )
                            CONFIG="$2"
                            is_var_empty "$CONFIG" 

                            if [[ "$MODE" == "config" ]] 
                            then
                                if [[ -e "$CONFIG" ]] 
                                then
                                    echo "Do want to overwrite existing config file? [y/n]"
                                    read yn
                                    if [[ "$yn" == "y" ]] || [[ "$yn" == "yes" ]] 
                                    then
                                        rm "$CONFIG"
                                    else
                                        echo "Aborting..."
                                        cleanup
                                        exit 1
                                    fi 
                                fi
                            fi

                            if [[ "$MODE" != "config" ]] 
                            then
                                source $CONFIG
                            fi

                            if [[ ! -z "$SSH_KEY" ]] 
                            then
                                SSH_KEY="-i $SSH_KEY"
                            fi

                            if [[ ! -e "./known_hosts" ]] 
                            then
                                if [[ -e $SSH_KNOWN_HOSTS ]] 
                                then
                                    if [[ "$SSH_KNOWN_HOSTS" != "known_hosts" ]] 
                                    then
                                        cat $SSH_KNOWN_HOSTS > ./known_hosts
                                    fi
                                else
                                    echo "File $SSH_KNOWN_HOSTS does not exist."
                                    exit 1
                                fi
                            fi
                            shift 2 ;;

          --working-dir|-w )
                            PPSS_DIR="$2"
                            add_var_to_config PPSS_DIR "$PPSS_DIR"
                            shift 2 ;; 
                 --node|-n ) 
                            NODES_FILE="$2"
                            add_var_to_config NODES_FILE "$NODES_FILE"
                            shift 2 ;; 
           --sourcefile|-f )
                            INPUT_FILE="$2"
                            is_var_empty "$INPUT_FILE" 
                            add_var_to_config INPUT_FILE "$INPUT_FILE"
                            shift 2 ;;
            --sourcedir|-d ) 
                            SRC_DIR="$2"
                            is_var_empty "$SRC_DIR" 
                            add_var_to_config SRC_DIR "$SRC_DIR"
                            shift 2 ;; 
                 --delay|-D)
                            MAX_DELAY="$2"
                            add_var_to_config MAX_DELAY "$MAX_DELAY"
                            shift 2 ;;
        --disable-inotify)
                            INOTIFY=0
                            add_var_to_config INOTIFY "$INOTIFY"
                            shift 1 ;;
        --enable-input-lock)
                            ENABLE_INPUT_LOCK=1
                            add_var_to_config ENABLE_INPUT_LOCK "$ENABLE_INPUT_LOCK"
                            shift 1 ;;
                 --daemon)
                            DAEMON="1"
                            QUIET="1"
                            detect_inotify
                            add_var_to_config DAEMON "$DAEMON"
                            add_var_to_config QUIET "$QUIET"
                            add_var_to_config INOTIFY "$INOTIFY"
                            shift 1 ;;
                 --interval) 
                            is_var_empty "$2"
                            DAEMON_POLLING_INTERVAL="$2"
                            add_var_to_config DAEMON_POLLING_INTERVAL "$DAEMON_POLLING_INTERVAL"
                            shift 2 ;;
                 --file-age)
                            is_var_empty "$2"
                            add_var_to_config DAEMON_FILE_AGE "$DAEMON_FILE_AGE"
                            shift 2 ;;
                --email|-e)
                            is_var_empty "$2"
                            EMAIL="$2"
                            add_var_to_config EMAIL "$EMAIL"
                            shift 2 ;;
              --command|-c ) 
                            COMMAND="$2"
                            is_var_empty "$COMMAND"
                            if [[ "$MODE" == "config" ]] 
                            then
                                COMMAND=\'$COMMAND\'
                                add_var_to_config COMMAND "$COMMAND"
                            fi
                            shift 2 ;;

                        -h )
                            showusage_normal
                            exit 1 ;;
                     --help)
                            showusage_long
                            exit 1 ;;
              --homedir|-H )
                            is_var_empty "$2"
                            PPSS_HOME_DIR="$2"
                            add_var_to_config PPSS_DIR $PPSS_HOME_DIR
                            shift 2 ;; 
           --disable-ht|-j )
                            HYPERTHREADING=no
                            add_var_to_config HYPERTHREADING $HYPERTHREADING
                            shift 1 ;;
                  --log|-l )
                            LOGFILE="$2"
                            add_var_to_config LOGFILE "$LOGFILE"
                            shift 2 ;;
          --no-traversal|-r )
                            TRAVERSAL="0"
                            add_var_to_config LOGFILE "$TRAVERSAL"
                            shift 1 ;;
           --workingdir|-w ) 
                            WORKINGDIR="$2"
                            add_var_to_config WORKINGDIR "$WORKINGDIR"
                            shift 2 ;;
           --md5|-M ) 
                            USE_MD5="1"
                            add_var_to_config USE_MD5 "$USE_MD5"
                            shift 1 ;;
                  --key|-k )
                            SSH_KEY="$2"
                            is_var_empty "$SSH_KEY"
                            add_var_to_config SSH_KEY "$SSH_KEY"
                            if [[ ! -z "$SSH_KEY" ]] 
                            then
                                SSH_KEY="-i $SSH_KEY"
                            fi
                            shift 2 ;;
        --known-hosts | -K ) 
                            SSH_KNOWN_HOSTS="$2"
                            add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS"
                            shift 2 ;;
                                
              --no-scp |-b )
                            SECURE_COPY=0
                            add_var_to_config SECURE_COPY "$SECURE_COPY"
                            shift 1 ;;
           --randomize |-R )
                            RANDOMIZE=1
                            add_var_to_config RANDOMIZE "$RANDOMIZE"
                            shift 1 ;;
            --outputdir|-o )
                            REMOTE_OUTPUT_DIR="$2"
                            add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR"
                            shift 2 ;;
            --processes|-p )
                            is_var_empty "$2" 
                            MAX_NO_OF_RUNNING_JOBS="$2"
                            add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS" 
                            shift 2 ;;
            --master|-m ) 
                            SSH_SERVER="$2"
                            add_var_to_config SSH_SERVER "$SSH_SERVER"
                            shift 2 ;;
            --script|-S )
                            SCRIPT="$2"
                            add_var_to_config SCRIPT "$SCRIPT"
                            shift 2 ;;
            --debug)
                            PPSS_DEBUG="1"    
                            add_var_to_config PPSS_DEBUG "$PPSS_DEBUG"
                            shift 1 ;;
            --download)
                            DOWNLOAD_TO_NODE="1"    
                            add_var_to_config DOWNLOAD_TO_NODE "$DOWNLOAD_TO_NODE"
                            shift 1 ;;
            --upload)
                            if [[ -z "$REMOTE_OUTPUT_DIR" ]] 
                            then
                                echo "ERROR: no server-side output directory specified with -o"
                                exit 1
                            fi
                            UPLOAD_TO_SERVER="1"    
                            add_var_to_config UPLOAD_TO_SERVER "$UPLOAD_TO_SERVER"
                            shift 1 ;;
            --quiet|-q )    
                            QUIET="1"
                            add_var_to_config QUIET "$QUIET"
                            shift 1 ;;
            --user|-u )
                            USER="$2"
                            add_var_to_config USER "$USER"
                            shift 2 ;;
            --version|-v )
                            echo ""
                            echo "$SCRIPT_NAME version $SCRIPT_VERSION"
                            echo ""
                            exit 0 ;;
            * )
                            
                            showusage_short
                            echo
                            echo "Unknown option $1 "
                            echo
                            exit 1 ;;
        esac
    done

    if [[ -z "$SRC_DIR" && -z "$INPUT_FILE" ]] 
    then
        showusage_short
        echo
        echo "No source file or directory specified with -f or -d."
        exit 1
    fi

    if [[ ! -e "$SRC_DIR" && -z "$MODE" && -z "$INPUT_FILE" ]]
    then
        showusage_short
        echo
        echo "Source directory $SRC_DIR does not exist."
        exit 1
    fi
    
    if [[ "$SRC_DIR" == "." ]] 
    then
        echo 
        echo "|P|P|S|S| is not designed to process items from within the directory"
        echo "it is being run. PPSS will start to process its own files from"
        echo "its working directory $PPSS_DIR which is probably not wat you"
        echo "want. Are you sure you want to continue?"
        echo 
        read YN
        if [[ "$YN" != "y" && "$YN" != "Y" ]]
        then
            exit 1
        fi
    fi

    if [[ "$DAEMON" == "1" && -z "$SRC_DIR" ]]
    then
        showusage_short
        echo
        echo "Daemon monitors a specified directory (with the -d option) for files to process."
        echo "Read the on-line manual for more information."
        exit 1
    fi 
}

display_header () {

    log DSPLY ""
    log DSPLY "========================================================="
    log DSPLY "                       |P|P|S|S|                         "
    log DSPLY "$SCRIPT_NAME vers. $SCRIPT_VERSION"
    log DSPLY "========================================================="
    log DSPLY "Hostname:\t\t$HOSTNAME"
    log DSPLY "---------------------------------------------------------"
}

create_working_directory () {

    if [[ ! -e "$PPSS_DIR" ]] 
    then
        mkdir -p "$PPSS_DIR"
    fi
}

expand_str () {

    STR=$1
    LENGTH=$TYPE_LENGTH
    SPACE=" "

    while [[ "${#STR}" -lt "$LENGTH" ]] 
    do
        STR=$STR$SPACE
    done

    echo "$STR"
}

are_we_sourced () {

    RES=$(basename $SOURCED)
    
    if [[ "$RES" == "ppss" ]] 
    then
        return 1
    else
        return 0
    fi 
}

get_time_in_seconds () {

    if [[ "$ARCH" == "SunOS" ]] 
    then
        #
        # Dirty hack because this ancient operating system does not support +%s...
        #
        THE_TIME=$(truss /usr/bin/date 2>&1 | grep ^time | awk '{ print $3 }')
    else
        THE_TIME="$(date +%s)"
    fi

    echo "$THE_TIME"
}

set_md5 () {

        case $ARCH in
            "Darwin")  MD5=md5 ;;
            "FreeBSD") MD5=md5 ;;
            "SunOS")   MD5="digest -a md5" ;; 
            "Linux")   MD5=md5sum ;;
        esac

        echo "test" | $MD5 > /dev/null 2>&1
        if [[ ! "$?" ]] 
        then
            LOG ERROR "ERROR - PPSS requires $MD5. It may not be within the path or installed."
            return 1
        else
            return 0 
        fi
}

set_stat () {

    if [[ "$DAEMON" == "1" && "$INOTIFY" == "0" ]] 
    then
        case $ARCH in
            "Darwin")  STAT="stat -f%m" ;;
            "FreeBSD") STAT="stat -f%m" ;;
            "SunOS")   STAT="gstat -c%Y" ;;
            "Linux")   STAT="stat -c%Y" ;;
        esac

        $STAT . >> /dev/null 2>&1
        if [[ ! "$?" ]] 
        then
            LOG ERROR "ERROR - PPSS daemon mode requires stat. It may not be within the path or installed."
            return 1
        else
            return 0 
        fi
    else
        return 0
    fi
}

log () {

    #
    # Type 'DSPLY ERROR and WARN' is logged to the screen
    # Any other log-type is only logged to the logfile.
    #
    TYPE="$1"
    MESG="$2"
    TYPE_LENGTH=5 

    #
    # Performance hack. Don't go through all the code if not required.
    #

    if [[ "$TYPE" == "DEBUG" && "$PPSS_DEBUG" == "0" ]] 
    then
        return 
    fi

    TYPE_EXP=$(expand_str "$TYPE")

    DATE=$(date +%b\ %d\ %H:%M:%S)
    PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}"
    PREFIX_SMALL="$DATE: "

    if [[ "$TYPE" != "ERROR" ]] 
    then
        ECHO_MSG="$PREFIX_SMALL $MESG"
    else
        ECHO_MSG="$PREFIX_SMALL [ERROR] $MESG"
    fi
    LOG_MSG="$PREFIX $MESG"

    if [[ ! -z "$PPSS_DEBUG" && "$PPSS_DEBUG" != "0" ]]
    then
        echo -e "$LOG_MSG" >> "$LOGFILE"

    elif [[ "$TYPE" == "INFO" || "$TYPE" == "ERROR" || "$TYPE" == "WARN" || "$TYPE" == "DSPLY" ]]
    then
        echo -e "$LOG_MSG" >> "$LOGFILE"
    fi

    if [[ "$TYPE" == "DSPLY" || "$TYPE" == "ERROR" || "$TYPE" == "WARN" && "$QUIET" == "0" ]]
    then
        echo -e "$ECHO_MSG"

    elif [[ "$TYPE" == "ERROR" && "$QUIET" == "1" ]]
    then
        echo -e "$ECHO_MSG"
    fi
    if [[ "$TYPE" == "PRCNT" ]] 
    then
       echo -en "\r$ECHO_MSG"
       #echo "$ECHO_MSG" # for debugging.
    fi
}

init_vars () {

    #
    # Get start time to measure how long PPSS has been running.
    #
    START_PPSS=$(get_time_in_seconds)

    #
    # Check if MD5(SUM) is present on the system.
    #
    set_md5
    #
    # Chec if stat is present and works on the system if daemon mode is enabled.
    #
    set_stat

    #
    # Is PPSS run as a daemon? Then use input locking, which is not required otherwise.
    #
    if [[ "$DAEMON" == "1" ]] 
    then
        INPUT_LOCK="$SRC_DIR/INPUT_LOCK"
    fi

    #
    # For some strange reason, this value differ on different operating systems due to
    # different behaviour betwen the ps utilily acros operating systems.
    #
    if [[ "$ARCH" == "Darwin" ]] 
    then
        MIN_JOBS=4
    elif [[ "$ARCH" == "Linux" ]] 
    then
        MIN_JOBS=3
    fi

    FIFO="$PPSS_DIR"/ppss-fifo-$RANDOM-$RANDOM
    FIFO_LISTENER="$PPSS_DIR"/ppss-fifo-listener-$RANDOM-$RANDOM

    if [[ ! -e "$FIFO" ]] 
    then    
        mkfifo -m 600 $FIFO
    fi

    if [[ ! -e "$FIFO_LISTENER" ]] 
    then    
        mkfifo -m 600 $FIFO_LISTENER
    fi

    exec 42<> $FIFO
    exec 43<> $FIFO_LISTENER

    set_status "RUNNING" 

    if [[ -e "$CPUINFO" ]] 
    then
        CPU=$(cat $CPUINFO | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq)
        log DSPLY "CPU: $CPU"
    elif [[ "$ARCH" == "Darwin" ]] 
    then
        MODEL=$(system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2)
        SPEED=$(system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2)
        log DSPLY "CPU: $MODEL $SPEED"
    elif [[ "$ARCH" == "SunOS" ]] 
    then
        CPU=$(/usr/sbin/psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1)
        log DSPLY "$CPU"
    else
        log DSPLY "CPU: Cannot determine. Provide a patch for your arch!"
        log DSPLY "Arch is $ARCH"
    fi 

    if [[ -z "$MAX_NO_OF_RUNNING_JOBS" ]] 
    then 
        get_no_of_cpus $HYPERTHREADING
    fi

    if [[ ! -z "$SSH_SERVER" ]] 
    then
        does_file_exist "$PPSS_HOME_DIR/$JOB_LOG_DIR"
        if [[ ! "$?" == "0" ]]  
        then
            log DEBUG "Remote Job log directory $PPSS_HOME_DIR/$JOB_lOG_DIR does not exist. Creating."
            exec_cmd "mkdir $PPSS_HOME_DIR/$JOB_LOG_DIR"
        fi
    fi

    if [[ ! -e "$JOB_LOG_DIR" ]] 
    then
        mkdir -p "$JOB_LOG_DIR"
    fi

    if [[ ! -z "$SSH_SERVER" ]] 
    then
        ITEM_LOCK_DIR="$PPSS_HOME_DIR/$ITEM_LOCK_DIR"
    fi
    
    does_file_exist "$ITEM_LOCK_DIR"
    if [[ ! "$?" == "0" ]] 
    then
        if [[ ! -z "$SSH_SERVER" ]] 
        then
            log DEBUG "Creating remote item lock dir."
        else
            log DEBUG "Creating local item lock dir."
        fi
        exec_cmd "mkdir $ITEM_LOCK_DIR"
        if [[ ! "$?" ]] 
        then
            log DEBUG "Failed to create item lock dir."
        fi
    fi

    if [[ ! -z "$SSH_SERVER" ]] 
    then
        does_file_exist "$REMOTE_OUTPUT_DIR"
        if [[ ! "$?" == "0" ]] 
        then
            log DEBUG "Remote output dir $REMOTE_OUTPUT_DIR does not exist."
            exec_cmd "mkdir $REMOTE_OUTPUT_DIR"
        fi
    fi

    if [[ ! -e "$PPSS_LOCAL_TMPDIR" ]] 
    then
        mkdir "$PPSS_LOCAL_TMPDIR"
    fi

    if [[ ! -e "$PPSS_LOCAL_OUTPUT" ]] 
    then
        mkdir "$PPSS_LOCAL_OUTPUT"
    fi

    if [[ ! -e "$PPSS_NODE_STATUS" ]] 
    then
        mkdir -p "$PPSS_NODE_STATUS"
    fi

}

upload_status () {

    if [[ -e "$NODE_STATUS_FILE" ]] 
    then
        scp -q -o GlobalKnownHostsFile=./known_hosts -i ppss-key.dsa $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/ 
        if [[ "$?" == "0" ]] 
        then
            log DEBUG "Uploaded status to server ok."
        else
            log DEBUG "Uploaded status to server failed."
        fi
    else
        log DEBUG "Status file not found thus not uploaded."
    fi
}

set_status () {

    if [[ ! -z "$SSH_SERVER" ]] 
    then
        STATUS="$1"
        if [[ -e "$LIST_OF_PROCESSED_ITEMS" ]] 
        then
            NO_PROCESSED=$(wc -l "$LIST_OF_PROCESSED_ITEMS" | awk '{ print $1 }' )
        else
            NO_PROCESSED="0"
        fi
        NODE=$(cat $PPSS_DIR/$NODE_ID)
        FAILED="$2"

        if [[ -z "$FAILED" ]] 
        then
            FAILED=0
        fi
       
        echo "$NODE $HOSTNAME $STATUS $NO_PROCESSED" "$FAILED" > "$NODE_STATUS_FILE"
        upload_status
    fi
}

check_status () {

    ERROR="$1"
    FUNCTION="$2"
    MESSAGE="$3"

    if [[ ! "$ERROR" == "0" ]] 
    then
        log DSPLY "$FUNCTION - $MESSAGE"
        set_status ERROR 
        cleanup 
        exit "$ERROR"
    fi
}

erase_ppss () {

    SSH_SOCKET="ppss_ssh_socket-$NODE"

    SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
                           -o GlobalKnownHostsFile=./known_hosts \
                           -o ControlMaster=auto \
                           -o Cipher=blowfish \
                           -o ConnectTimeout=5 "
    
    echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)"
    read YN

    if [[ "$YN" == "yes" ]] || [[ "$YN" == "YES" ]] 
    then
        for NODE in $(cat $NODES_FILE)
        do
            log DSPLY "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE."
            ssh -q $SSH_KEY $SSH_OPTS_NODE $USER@$NODE "rm -rf $PPSS_HOME_DIR"
        done
    else
        log DSPLY "Aborting.."
    fi
}

stack_push_tmp () {

    TMP1="$1"

    if [[ -z "$TMP_STACK" ]] 
    then
        TMP_STACK="$TMP1"
    else
        TMP_STACK="$TMP_STACK"$'\n'"$TMP1"
    fi
}   

stack_push () {

    line="$1"

    if [[ -z "$STACK" ]] 
    then
        STACK="$line"
    else
        STACK="$line"$'\n'"$STACK"
    fi
}   

unprocessed_stack_push () {

    line="$1"

    if [[ -z "$PROCESSED_ITEMS" ]] 
    then
        UNPROCESSED_ITEMS="$line"
    else
        UNPROCESSED_ITEMS="$line"$'\n'"$UNPROCESSED_ITEMS"
    fi
}

processed_stack_push () {

    line="$1"

    if [[ -z "$PROCESSED_ITEMS" ]] 
    then
        PROCESSED_ITEMS="$line"
    else
        PROCESSED_ITEMS="$line"$'\n'"$PROCESSED_ITEMS"
    fi
}

stack_pop () {

    TMP_STACK=""
    i=0
    tmp=""
    for x in $STACK
    do
        if [[ "$i" == "0" ]] 
        then
            tmp="$x"
        else
            stack_push_tmp "$x"
        fi
        ((i++))
    done
    STACK="$TMP_STACK"
    REGISTER="$tmp"
    if [[ -z "$REGISTER" ]] 
    then
        return 1
    else
        return 0
    fi
}     

is_screen_installed () {

    if [[ "$DISABLE_SCREEN_TEST" == "1" ]]    
    then
        return 0
    fi

    NODE="$1"
    ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "screen -m -D -S test ls" > /dev/null 2>&1 
    if [[ ! "$?" == "0" ]] 
    then
        log ERROR "The 'Screen' command may not be installed on node $NODE."
        log ERROR "Or some other SSH related error occurred." 
        return 1
    else
        log DEBUG "'Screen' is installed on node $NODE."
    fi
}

deploy () {

    NODE="$1"

    SSH_SOCKET="ppss_ssh_socket-$NODE"

    SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
                           -o GlobalKnownHostsFile=./known_hosts \
                           -o ControlMaster=auto \
                           -o Cipher=blowfish \
                           -o ConnectTimeout=5 "

    SSH_OPTS_SLAVE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
                           -o GlobalKnownHostsFile=./known_hosts \
                           -o ControlMaster=no \
                           -o Cipher=blowfish \
                           -o ConnectTimeout=5 "

    ERROR=0
    set_error () {

        if [[ "$ERROR" == "1" ]] 
        then 
            ERROR=1 
        elif [[ ! "$1" == "0" ]] 
        then
            ERROR=1
        fi
    }
    if [[ ! -e "$SSH_SOCKET" ]] 
    then
        ssh -q -N $SSH_OPTS_NODE $SSH_KEY $USER@$NODE &
        SSH_PID=$!
    fi

    is_screen_installed "$NODE"

    KEY=$(echo $SSH_KEY | cut -d " " -f 2) 

    ssh -q  $SSH_OPTS_SLAVE $SSH_KEY $USER@$NODE "cd ~ && mkdir -p $PPSS_HOME_DIR && mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR && mkdir -p $PPSS_HOME_DIR/ITEM_LOCK_DIR >> /dev/null 2>&1" 
    set_error $?
    ssh -q $SSH_OPTS_SLAVE $SSH_KEY $USER@$NODE "cd ~ && cd $PPSS_HOME_DIR && cd $PPSS_DIR && echo $NODE > $NODE_ID"
    set_error $?
    scp -q $SSH_OPTS_SLAVE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR
    set_error $?
    scp -q $SSH_OPTS_SLAVE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR
    set_error $?
    scp -q $SSH_OPTS_SLAVE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR
    set_error $?
    scp -q $SSH_OPTS_SLAVE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR
    set_error $?

    if [[ ! -z "$SCRIPT" ]] 
    then
        scp -q $SSH_OPTS_SLAVE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR
        set_error $?
    fi

    if [[ ! -z "$INPUT_FILE" ]] 
    then
        scp -q $SSH_OPTS_SLAVE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR
        set_error $?
    fi

    if [[ "$ERROR" == "0" ]] 
    then
        log DSPLY  "PPSS installed on node $NODE."
    else
        log DSPLY  "PPSS failed to install on $NODE."
    fi

    kill $SSH_PID
}

deploy_ppss () {

    if [[ -z "$NODES_FILE" ]] || [[ ! -e "$NODES_FILE" ]] 
    then
        log ERROR "No file containing list of nodes missing / not specified."
        set_status ERROR
        cleanup 
        exit 1
    fi

    exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_NODE_STATUS"
    
    KEY=$(echo $SSH_KEY | cut -d " " -f 2) 
    if [[ -z "$KEY" ]] || [[ ! -e "$KEY" ]] 
    then
        log ERROR "Private SSH key $KEY not found."
        set_status "ERROR"
        cleanup 
        exit 1
    fi

    if [[ ! -e "$SCRIPT" && ! -z "$SCRIPT" ]] 
    then
        log ERROR "Script $SCRIPT not found."
        set_status "ERROR"
        cleanup
        exit 1
    fi

    INSTALLED_ON_SSH_SERVER=0
    for NODE in $(cat $NODES_FILE) 
    do
        deploy "$NODE" &
        if [[ "$ARCH" == "SunOS" ]] 
        then
            sleep 1
        else
            sleep 0.1
        fi
        if [[ "$NODE" == "$SSH_SERVER" ]] 
        then   
            INSTALLED_ON_SSH_SERVER=1
        fi
    done

    if [[ "$INSTALLED_ON_SSH_SERVER" == "0" ]] 
    then
        log DEBUG "SSH SERVER $SSH_SERVER is not a node."
    else
        log DEBUG "SSH SERVER $SSH_SERVER is also a node."
    fi
}

start_ppss_on_node () {

    NODE="$1"
    log DSPLY "Starting PPSS on node $NODE."
    ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 -o GlobalKnownHostsFile=./known_hosts "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG" 
    if [[ ! "$?" == "0" ]] 
    then
        log ERROR  "|P|P|S|S| failed to start on node $NODE."
    fi
}

init_ssh_server_socket () {

        if [[ ! -e "$SSH_SOCKET" ]] 
        then
            DIR=$(dirname $SSH_SOCKET)
            mkdir -p "$DIR"
        fi
}

test_server () {

    # Testing if the remote server works as expected.
    if [[ ! -z "$SSH_SERVER" ]] 
    then 
        init_ssh_server_socket
        
        exec_cmd "date >> /dev/null"
        check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached"

        ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER &
        SSH_MASTER_PID="$!"
        log DEBUG "SSH Master pid is $SSH_MASTER_PID"
        log INFO "Connected to server: $SSH_SERVER"

        does_file_exist "$PPSS_HOME_DIR/$PPSS_DIR"
        if [[ ! "$?" == "0" && ! -z "$SSH_SERVER" ]] 
        then
            log DEBUG "Remote PPSS home directory $PPSS_HOME_DIR/$PPSS_DIR does not exist. Creating."
            exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_DIR"
        fi
    else
        log DEBUG "No remote server specified, assuming stand-alone mode."
    fi
}

get_no_of_cpus () {

    # Use hyperthreading or not?
    HPT=$1
    NUMBER=""

    if [[ -z "$HPT" ]] 
    then
        HPT=yes
    fi

    got_cpu_info () {

    ERROR="$1"
    check_status "$ERROR" "$FUNCNAME" "cannot determine number of cpu cores. Specify with -p." 

    }

    if [[ "$HPT" == "yes" ]] 
    then
        if [[ "$ARCH" == "Linux" ]] 
        then
            NUMBER=$(grep -c ^processor $CPUINFO)
            got_cpu_info "$?"

        elif [[ "$ARCH" == "Darwin" ]] 
        then
            NUMBER=$(sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }')
            got_cpu_info "$?"

        elif [[ "$ARCH" == "FreeBSD" ]] 
        then
            NUMBER=$(sysctl hw.ncpu | awk '{ print $2 }')
            got_cpu_info "$?"

        elif [[ "$ARCH" == "SunOS" ]] 
        then
            NUMBER=$(psrinfo | grep -c on-line)
            got_cpu_info "$?"
        else
            if [[ -e "$CPUINFO" ]] 
            then
                NUMBER=$(grep -c ^processor $CPUINFO)
                got_cpu_info "$?"
            fi 
        fi

        if [[ ! -z "$NUMBER" ]] 
        then
            log DSPLY "Found $NUMBER logic processors."
        fi

    elif [[ "$HPT" == "no" ]] 
    then
        log DSPLY "Hyperthreading is disabled."

        if [[ "$ARCH" == "Linux" ]] 
        then
            PHYSICAL=$(grep 'physical id' $CPUINFO)
            if [[ "$?" ]] 
            then
                PHYSICAL=$(grep 'physical id' $CPUINFO | sort | uniq | wc -l)
                if [[ "$PHYSICAL" == "1" ]] 
                then
                    log DSPLY "Found $PHYSICAL physical CPU."
                else
                    log DSPLY "Found $PHYSICAL physical CPUs."
                fi

                TMP=$(grep 'core id' $CPUINFO) 
                if [[ "$?" ]] 
                then
                    log DEBUG "Starting job only for each physical core on all physical CPU(s)."
                    NUMBER=$(grep 'core id' $CPUINFO | sort | uniq | wc -l) 
                    log DSPLY "Found $NUMBER physical cores."
                else
                    log DSPLY "Single core processor(s) detected."
                    log DSPLY "Starting job for each physical CPU."
                    NUMBER=$PHYSICAL
                fi
            else
                log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus."            
                NUMBER=$(grep -c ^processor $CPUINFO)
                got_cpu_info "$?"
            fi
        elif [[ "$ARCH" == "Darwin" ]] 
        then
            NUMBER=$(sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }')
            got_cpu_info "$?"
        elif [[ "$ARCH" == "FreeBSD" ]] 
        then
            NUMBER=$(sysctl hw.ncpu | awk '{ print $2 }')
            got_cpu_info "$?"
        else
            NUMBER=$(cat $CPUINFO | grep "cpu cores" | cut -d ":" -f 2 | uniq | sed -e s/\ //g)
            got_cpu_info "$?"
        fi

    fi

    if [[ ! -z "$NUMBER" ]] 
    then
        MAX_NO_OF_RUNNING_JOBS=$NUMBER
    else
        log ERROR "Number of CPUs not obtained."
        log ERROR "Please specify manually with -p."
        set_status "ERROR"
        exit 1
    fi
}

random_delay () {

    ARGS="$1"

    if [[ -z "$ARGS" ]] 
    then
        log ERROR "$FUNCNAME Function random delay, no argument specified."
        set_status "ERROR"
        exit 1
    fi

    NUMBER=$RANDOM

    let "NUMBER %= $ARGS"
    sleep "$NUMBER"
}

escape_item () {

    TMP="$1"

    ITEM_ESCAPED=`echo "$TMP" | \
            sed s/\\ /\\\\\\\\\\\\\\ /g | \
            sed s/\\'/\\\\\\\\\\\\\\'/g | \
            sed s/\\|/\\\\\\\\\\\\\\|/g | \
            sed s/\&/\\\\\\\\\\\\\\&/g | \
            sed s/\;/\\\\\\\\\\\\\\;/g | \
            sed s/\>/\\\\\\\\\\>/g | \
            sed s/\</\\\\\\\\\\</g | \
            sed s/\(/\\\\\\\\\\(/g | \
            sed s/\)/\\\\\\\\\\)/g ` 
}

download_item () {

    if [[ ! "$DOWNLOAD_TO_NODE" == "1" ]] || [[ "$VIRTUAL" == "1" ]] 
    then
        return 1
    fi

    ITEM="$1"
    VIRTUAL="0"
    ERR_STATE="0"
    HASH=""

    if [[ "$TRAVERSAL" == "1" ]] 
    then
        escape_item "$ITEM"
        does_file_exist "$ITEM_ESCAPED"
        ERR_STATE="$?"
        DOWNLOAD_ITEM="$ITEM"
        HASH=$(echo "$DOWNLOAD_ITEM" | $MD5 | awk '{ print $1 }')
        LOCAL_DIR="$HASH"
    else
        escape_item "$ITEM"
        does_file_exist "$SRC_DIR/$ITEM_ESCAPED"
        ERR_STATE="$?"
        DOWNLOAD_ITEM="$SRC_DIR/$ITEM"
    fi

    if [[ "$ERR_STATE" == "0" ]] 
    then
        log DEBUG "$FUNCNAME Remote item $ITEM exists"
        VIRTUAL=0
        
    else
        log DEBUG "$FUNCNAME Remote item $ITEM does NOT exist"
        VIRTUAL=1
    fi

    if [[ "$DOWNLOAD_TO_NODE" == "1" && "$VIRTUAL" == "0" ]] 
    then
        log DEBUG "Transfering item $ITEM from source to local disk."
        if [[ "$SECURE_COPY" == "1" && ! -z "$SSH_SERVER" ]] 
        then
            if [[ "$TRAVERSAL" == "1" ]] 
            then
                escape_item "$DOWNLOAD_ITEM" 
                mkdir -p "$PPSS_LOCAL_TMPDIR/$LOCAL_DIR"
                log DEBUG "$SSH_SERVER:$ITEM_ESCAPED $PPSS_LOCAL_TMPDIR/$LOCAL_DIR"
                scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR/"$LOCAL_DIR"
            else
                escape_item "$DOWNLOAD_ITEM" 
                log DEBUG "$SSH_SERVER:$ITEM_ESCAPED $PPSS_LOCAL_TMPDIR"
                scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" $PPSS_LOCAL_TMPDIR 
            fi
            log DEBUG "Exit code of remote download transfer is $?"
        else
            if [[ "$TRAVERSAL" == "1" ]]
            then
                mkdir -p "$PPSS_LOCAL_TMPDIR/$LOCAL_DIR"
                cp "$DOWNLOAD_ITEM" "$PPSS_LOCAL_TMPDIR/$LOCAL_DIR" 
                log DEBUG "Exit code of local transfer is $?"
            else
                cp "$DOWNLOAD_ITEM" "$PPSS_LOCAL_TMPDIR/" 
                log DEBUG "Exit code of local transfer is $?"
            fi
        fi
    else
        log DEBUG "No transfer of item $ITEM to local workpath."
    fi
}

upload_item () {

    if [[ ! "$UPLOAD_TO_SERVER" == "1" ]] 
    then
        log DEBUG "Upload to server is disabled."
        return 1
    fi
    log DEBUG "Upload from node to server - $1 to $2"

    LOCAL_SRC_DIR="$1" # the local source directory where the file resides
    REMOTE_DEST_DIR="$2" # to recreate the directory structure
    DESTINATION=""

    if [[ "$TRAVERSAL" == "1" ]] 
    then 
        log DEBUG "Recursive copy is enabled."
        TMP_DESTINATION="$REMOTE_OUTPUT_DIR/$REMOTE_DEST_DIR" 
    else
        log DEBUG "Recursive copy is disabled."
        TMP_DESTINATION="$REMOTE_OUTPUT_DIR"
    fi

    exec_cmd "mkdir -p \"$TMP_DESTINATION\""
    escape_item "$TMP_DESTINATION"
    DESTINATION="$ITEM_ESCAPED"

    log DEBUG "Destination == $DESTINATION"

    log DEBUG "Uploading item $LOCAL_SRC_DIR."
    if [[ "$SECURE_COPY" == "1" ]] 
    then
        log DEBUG "Secure copy is enabled."
        log DEBUG "Copy contents of $LOCAL_SRC_DIR to server at $DESTINATION"
        scp $SSH_OPTS $SSH_KEY $LOCAL_SRC_DIR/* $USER@$SSH_SERVER:"$DESTINATION" 
        ERROR="$?"
        if [[ ! "$ERROR" == "0" ]] 
        then
            log ERROR "Uploading of $LOCAL_SRC_DIR via SCP to $DIR_ESCAPED failed."
        else
            log DEBUG "Upload of item $LOCAL_SRC_DIR to $DIR_ESCAPED success" 
            rm -rf ./"$LOCAL_SRC_DIR"
        fi
    else    
        log DEBUG "Secure copy is disabled - using regular cp."
        cp "$LOCAL_SRC_DIR"/* "$TMP_DESTINATION"
        if [[ ! "$?" == "0" ]] 
        then
            log DEBUG "ERROR - uploading of $LOCAL_SRC_DIR via CP to $DESTINATION failed."
        fi
    fi
}

lock_item () {

    if [[ "$INOTIFY" == "1" && "$DAEMON" == "1" ]] 
    then
        #
        # In daemon mode, there is no risk that processes try to process
        # the same item. Therefore, locking is not required.
        #
        return 0
    else
        ITEM="$1"

        if [[ "$USE_MD5" == "1" ]] 
        then
            LOCK_FILE_NAME=$(echo "$ITEM" | $MD5 | awk '{ print $1 }')
        else
            LOCK_FILE_NAME=$(echo "$ITEM" | sed s/[^[:alnum:]]/_/g)
        fi
        ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
        log DEBUG "Locking item $ITEM_LOCK_FILE"
        exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1"
        ERROR="$?"
        return "$ERROR"
    fi
}

get_input_lock () {

    while true
    do 
        exec_cmd "mkdir $INPUT_LOCK >> /dev/null 2>&1 " 
        if [[ "$?" ]] 
        then
            log DEBUG "Input lock is obtained..."
            break
        else
            log DEBUG "Input lock is present...sleeping.."
            sleep 5
        fi
    done
}

release_input_lock () {
    
    exec_cmd "rm -rf $INPUT_LOCK"
    if [[ "$?" ]] 
    then
        log DEBUG "Input lock was released..."
        return 0
    else
        log ERROR "Input lock was already gone...this should never happen..."
        return 1
    fi
}

list_all_input_items () {

    oldIFS=$IFS # save the field separator
    IFS=$'\n' # new field separator, the end of line  

    while read line 
    do
        echo "$line"
    done < "$LISTOFITEMS"
    IFS="$oldIFS"
}

remove_processed_items_from_input_file () {

    # 
    # This function removes all items that have already been processed.
    # Processed items have a lock dir in the PPPSS_ITEM_LOCK_DIR.
    #

    if [[ -e "$LIST_OF_PROCESSED_ITEMS" ]] 
    then
        PROCESSED_ITEMS=$(cat $LIST_OF_PROCESSED_ITEMS)
    fi

    log DEBUG "Running $FUNCNAME"

    if [[ -z "$PROCESSED_ITEMS" ]] 
    then
        log DEBUG "Variable processed_items is empty."
        return 1
    fi

    if [[ "$MODE" == "status" ]] 
    then 
        log DEBUG "Mode is status."
        return 1
    fi

    if [[ ! -e "$LISTOFITEMS" ]] 
    then
        echo "$LISTOFITEMS does not exist!"
        return 1
    else
        SIZE=$(wc -l "$LISTOFITEMS")
        if [[ "$SIZE" == "0" ]] 
        then
            echo "$LISTOFITEMS exists but is empty."
            return 1
        fi
    fi

    INPUTFILES=$(list_all_input_items)

    OUT_FILE=$PPSS_DIR/out-file-$RANDOM$RANDOM$RANDOM$RAMDOM
    LIST_SORTED=$PPSS_DIR/list-sorted-$RANDOM$RANDOM$RANDOM$RANDOM
    PROCESSED_SORTED=$PPSS_DIR/processed-sorted-$RANDOM$RANDOM$RANDOM$RANDOM
    cat "$LISTOFITEMS" | sort > "$LIST_SORTED"
    cat "$LIST_OF_PROCESSED_ITEMS" | sort > "$PROCESSED_SORTED"
    comm -3 $LIST_SORTED $PROCESSED_SORTED > $OUT_FILE
    mv "$OUT_FILE" "$LISTOFITEMS"
}

get_all_items () {

    if [[ "$DAEMON" == "1" && "$INOTIFY" == "0" ]]  && [[ "$ENABLE_INPUT_LOCK" == "1" ]] 
    then
        get_input_lock
    fi

    GLOBAL_COUNTER=1
    
    if [[ -e "$LISTOFITEMS" ]] 
    then
        rm "$LISTOFITEMS"
    fi

    count=0

    if [[ -z "$INPUT_FILE" ]] 
    then
        if [[ ! -z "$SSH_SERVER" ]] # Are we running stand-alone or as a node?"
        then
            if [[ "$TRAVERSAL" == "1" ]] 
            then
                $(exec_cmd "find $SRC_DIR/ ! -type d" > "$LISTOFITEMS")
                check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
            else
                log DEBUG "Recursion is disabled."
                $(exec_cmd "find $SRC_DIR/ -depth 1 ! -type d" > "$LISTOFITEMS")
                check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
            fi

        else 
            if [[ -e "$SRC_DIR" ]] 
            then
                if [[ "$TRAVERSAL" == "1" ]] 
                then
                    log DEBUG "Recursion is enabled."
                    $(find "$SRC_DIR"/ ! -type d >> "$LISTOFITEMS") 
                    check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
                else
                    log DEBUG "Recursion is disabled."
                    $(find "$SRC_DIR"/ -depth 1 ! -type d >> "$LISTOFITEMS")
                    check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
                fi
                if [[ ! -e "$LISTOFITEMS" ]] 
                then
                    log ERROR "Local input file is not created, something is wrong. Bug?"
                    set_status "ERROR"
                    cleanup
                    exit 1
                fi
                
            else
                ITEMS=""
            fi
        fi
    else  # Using an input file as the source of our items or STDIN.
        if [[ ! -z "$SSH_SERVER" ]] # Are we running stand-alone or as a slave?"
        then
            log DEBUG "Running as node, input file has been pushed (hopefully)."
        fi
        if [[ ! -e "$INPUT_FILE" && ! "$INPUT_FILE" == "-" ]] 
        then
            log ERROR "Input file $INPUT_FILE does not exist."
            set_status "ERROR"
            cleanup
            exit 1
        fi
    
        if [[ ! "$INPUT_FILE" == "-" ]] 
        then
            cp "$INPUT_FILE" "$LISTOFITEMS"
            check_status "$?" "$FUNCNAME" "Copy of input file failed!"
        else
            log DEBUG "Reading from stdin.."
            while read LINE
            do
                echo "$LINE" >> "$LISTOFITEMS"
            done
        fi

        if [[ ! -e "$LISTOFITEMS" ]] 
        then
            log ERROR "Input is empty."
            infanticide
            terminate_listener 
            cleanup
        fi
    fi

    if [[ "$RANDOMIZE" == "1" && "$MODE" != "status" ]] 
    then
        log DEBUG "Randomizing input file."
        IFS_BACK="$IFS"
        IFS=$'\n'
        TMP_FILE="$PPSS_DIR/TMP-$RANDOM$RANDOM.txt"
        for i in $(cat $LISTOFITEMS); do echo "$RANDOM $i"; done | sort | sed -E 's/^[0-9]+ //' > "$TMP_FILE"
        mv "$TMP_FILE" "$LISTOFITEMS"
        IFS="$IFS_BACK"
    else
        log DEBUG "Randomisation of input file disabled."
    fi
 

    remove_processed_items_from_input_file

    if [[ "$DAEMON" == "1" ]] 
    then
        release_input_lock
    fi

    SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')

    if [[ "$SIZE_OF_INPUT" -le "0" && "$DAEMON" == "0" ]] 
    then
        log ERROR "Source file/dir seems to be empty."
        set_status "STOPPED"
        cleanup 
        exit 1
    fi
}

are_all_items_locked () {

    SIZE="$1"
    NUMBER=$(exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l")
    log DEBUG "$NUMBER of $SIZE items are locked."
    if [[ "$NUMBER" -ge "$SIZE" ]] 
    then
        return 0
    else
        return 1
    fi
}

get_item () {

    check_for_interrupt 

    if [[ "$STOP" == "1" ]] 
    then
        log DEBUG "Found stop signal."
        return 1
    fi

    #
    # Return error if list size is empty.
    #
    if [[ -z "$SIZE_OF_INPUT" ]] 
    then
        log DEBUG "Got no size of input..."
        return 1
    fi

    #
    # Return error if the list is empty.
    #
    if [[ "$SIZE_OF_INPUT" -le "0" ]] 
    then
        return 1
    fi

    #
    # Check if all items have been processed.
    #
    if [[ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]] 
    then
        log DEBUG "Counter $GLOBAL_COUNTER is greater than sizeof input $SIZE_OF_INPUT."
        return 1
    fi

    #
    # Quit if all items have been locked.
    #
    if are_all_items_locked "$SIZE_OF_INPUT"
    then
        log DEBUG "All items have been locked."
        return 1
    else
        log DEBUG "There are still unlocked items."
    fi
    

    ITEM="$(sed -n $GLOBAL_COUNTER\p $LISTOFITEMS)"

    if [[ -z "$ITEM" ]] 
    then
        log DEBUG "Item was emtpy..."
        ((GLOBAL_COUNTER++))
        get_item
    else
        ((GLOBAL_COUNTER++))

        if [[ ! -z "$SSH_SERVER" ]]  || [[ "$LOCAL_LOCKING" == "1" ]] 
        then
            lock_item "$ITEM"
            LOCK="$?"
            if [[ ! "$LOCK" == "0" ]] 
            then
                log DEBUG "Item $ITEM is locked."
                get_item
            else
                log DEBUG "Got lock on $ITEM"
                return 0
            fi
        else
            return 0
        fi
    fi
}

start_new_worker () {

    #
    # This function kicks the listener to start a worker process.
    # 
    if ! are_we_sourced
    then
            echo "$START_KEY" >> "$FIFO"
            return $?
    fi
}

stop-ppss () {

    STOP_PPSS=$(get_time_in_seconds)
    elapsed "$START_PPSS" "$STOP_PPSS"
    log DSPLY "$PROCESSING_TIME"
}

elapsed () {

    BEFORE="$1"
    AFTER="$2"

    ELAPSED="$(expr $AFTER - $BEFORE)"

    REMAINDER="$(expr $ELAPSED % 3600)"
    HOURS="$(expr $(expr $ELAPSED - $REMAINDER) / 3600)"

    SECS="$(expr $REMAINDER % 60)"
    MINS="$(expr $(expr $REMAINDER - $SECS) / 60)"

    PROCESSING_TIME=$(printf "Total processing time (hh:mm:ss): %02d:%02d:%02d" $HOURS $MINS $SECS)
}

mail_on_error () {

    ITEM="$1"
    LOGFILE="$2"

    if [[ "$MAIL_ON_ERROR" == "1" ]] 
    then
        cat "$LOGFILE" | mail -s "$HOSTNAME - PPSS: procesing failed for item." "$EMAIL" 
        if [[ "$?" == "0" ]] 
        then
            log DEBUG "Error mail sent."
        else
            log ERROR "Sending of error email failed."
        fi
    fi
}


add_item_to_failed_list () {

    ITEM="$1"
    get_input_lock
    echo "$ITEM" >> "$FAILED_ITEMS" 
    release_input_lock
}

commando () {

    #
    # This function will start a chain reaction of events.
    #
    # The commando executes a command on an item and, when finished,
    # executes the start_new_worker. This function selects a new
    # item and sends it to the fifo. The listener process receives
    # the item and excutes this commando function on the item.
    # So in essence, the commando function keeps calling itself 
    # indirectly until no items are left. This will form a single
    # working queue. By executing multiple start_new_worker 
    # functions based on the CPU cores available, parallel processing
    # is achieved, with a queue for each core.
    #
    ERR_STATE=0
    VIRTUAL=0

    #
    # This code tests if the item exist (is physical or virtuel)
    # Example: a file is physical, a URL is virtual.
    #
    ITEM="$1"
    ORIG_ITEM="$ITEM"

    #log DEBUG "$FUNCNAME is processing item $ITEM"

    if [[ "$TRAVERSAL" == "1" ]] 
    then
        escape_item "$ITEM"
        does_file_exist "$ITEM_ESCAPED" # The item contains the full path.
        ERR_STATE="$?"
    else
        escape_item "$ITEM"
        does_file_exist "$SRC_DIR/$ITEM_ESCAPED" # The item is only the file name itself.
        ERR_STATE="$?"
    fi

    #
    # If recursion is used, a file name of an item may not be unique.
    # The same filename can be used for files in differen directories.
    # Therefore, the output directory must reflect the original directory
    # structure. If recursion is not used, this is not necessary.
    #
    # Further more, if the item is not a real world object but a string of sort
    # the item is considered virtual. No recursion. 
    #
    if [[ "$ERR_STATE" == "0" ]] 
    then
        VIRTUAL="0"
        if [[ "$TRAVERSAL" == "1" ]] 
        then
            ITEM_DIR_NAME=$(dirname "$ITEM" | sed s:$SRC_DIR::g)
            ITEM_BASE_NAME=$(basename "$ITEM")
            HASH=$(echo "$ITEM" | $MD5 | awk '{ print $1 }')
            if [[ "$UPLOAD_TO_SERVER" == "1" ]] 
            then
                OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$HASH"
            else
                if [[ -z "$REMOTE_OUTPUT_DIR" ]] 
                then
                    OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
                else
                    OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_DIR_NAME"
                fi
            fi
        else
            ITEM_DIR_NAME="$SRC_DIR"
            ITEM_BASE_NAME="$ITEM"
            escape_item "$ITEM_BASE_NAME"
            if [[ "$UPLOAD_TO_SERVER" == "1" ]] 
            then
                OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
            else
                OUTPUT_DIR="$REMOTE_OUTPUT_DIR"
            fi
        fi
    else
        VIRTUAL="1"
        ITEM_DIR_NAME=""
        ITEM_BASE_NAME="$ITEM"
        escape_item "$ITEM_BASE_NAME"
        if [[ "$UPLOAD_TO_SERVER" == "1" ]] 
        then
            OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
        else
            if [[ -z "$REMOTE_OUTPUT_DIR" ]] 
            then
                OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
            else
                OUTPUT_DIR="$REMOTE_OUTPUT_DIR"
            fi
        fi
    fi

    OUTPUT_FILE="$ITEM_BASE_NAME"

    #
    # Decide if an item must be transfered from server to the node.
    # or be processed in-place (NFS / SMB mount?)
    #
    if [[ "$DOWNLOAD_TO_NODE" == "0" ]] 
    then
        if [[ "$VIRTUAL" == "1" ]] 
        then
            log DEBUG "Item is virtual, thus not downloading."
        else
            log DEBUG "Using item straight from the server, no copy."
            if [[ "$TRAVERSAL" == "0" ]] 
            then
                ITEM="$SRC_DIR/$ITEM"
            else
                ITEM="$ITEM"
            fi
        fi
    else
        download_item "$ITEM"
        if [[ "$TRAVERSAL" == "1" ]] 
        then
            ITEM="$PPSS_LOCAL_TMPDIR/$HASH/$ITEM_BASE_NAME"
        else
            ITEM="$PPSS_LOCAL_TMPDIR/$ITEM_BASE_NAME"
        fi
    fi
    
    #
    # Create the log file containing the output of the command.
    #
    if [[ "$USE_MD5" == "1" ]] 
    then
        LOG_FILE_NAME=$(echo "$ITEM" | $MD5 | awk '{ print $1 }')
    else
        LOG_FILE_NAME=$(echo "$ITEM" | sed s/[^[:alnum:]]/_/g)
    fi
    ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"

    if [[ -e "$ITEM_LOG_FILE" && "$DISABLE_SKIPPING" == "0" ]] 
    then
        log DEBUG "Item is already processed, skipping..."  
        start_new_worker
        return 0
    fi

    #
    # Create the local output directory.
    #
    if [[ ! -z "$OUTPUT_DIR" ]] 
    then
        log DEBUG "Creating local output dir $OUTPUT_DIR"
        mkdir -p "$OUTPUT_DIR"
    fi    
    
    
    ERROR=""
    #
    # Some formatting of item log files. 
    #
    DATE=$(date +%b\ %d\ %H:%M:%S)
    echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE"
    echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE"
    echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE"
    echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE"
    echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE"
    echo -e "" >> "$ITEM_LOG_FILE"
    
#-->#
#--># The actual execution of the command as specified by
#--># the -c option.
#-->#

    # Check for "$ITEM" or "${ITEM" in the command line.
    # ${ITEM} allows the usage of string operations.

    BEFORE=$(get_time_in_seconds)
    echo "$COMMAND" | grep -E -i '\$\{?ITEM' >> /dev/null 2>&1
    RETVAL="$?"
    if [[ "$RETVAL" == "0" ]] 
    then 
        eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1
        ERROR="$?"
        MYPID="$!"
    else
        eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1'
        ERROR="$?"
        MYPID="$!"
    fi
    AFTER=$(get_time_in_seconds)

    echo -e "" >> "$ITEM_LOG_FILE"

    # Some error logging. Success or fail.
    if [[ ! "$ERROR" == "0" ]] 
    then
       mail_on_error "$ITEM" "$ITEM_LOG_FILE"
       log DEBUG "Processing Item $ITEM failed."
       echo "$FAIL_KEY" >> "$FIFO" 
       echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE"
       add_item_to_failed_list "$ORIG_ITEM"
    else
       echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE"
    fi

    #
    # If part of a cluster, remove the downloaded item after 
    # it has been processed and uploaded as not to fill up disk space.
    #
    if [[ "$DOWNLOAD_TO_NODE" == "1" ]] 
    then
        if [[ -e "$ITEM" ]] 
        then
            rm -rf "$ITEM"
        else        
            log DEBUG "There is no local file to remove.. strange..."
        fi

    fi

    #
    # Upload the output file back to the server.
    #

    if [[ "$UPLOAD_TO_SERVER" == "1" ]] 
    then
        log DEBUG "Upload $OUTPUT_DIR and ITEM DIR NAME IS $ITEM_DIR_NAME"
        if [[ "$ITEM_DIR_NAME" == "." ]] 
        then
            ITEM_DIR_NAME=""
        fi
        upload_item "$OUTPUT_DIR" "$ITEM_DIR_NAME" # local output directory / original item dir path
    else
        log DEBUG "Uploading disabled."
    fi

    #
    # Upload the log file to the server.
    #

    elapsed "$BEFORE" "$AFTER" 
    echo "$PROCESSING_TIME" >> "$ITEM_LOG_FILE"
    
    echo -e "" >> "$ITEM_LOG_FILE"

    if [[ ! -z "$SSH_SERVER" ]] 
    then
        scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:$PPSS_HOME_DIR/$JOB_LOG_DIR
        if [[ ! "$?" ]] 
        then
            log DEBUG "Uploading of item log file failed."
        fi
    fi

    start_new_worker
}

infanticide () {
    
    log DEBUG "Running $FUNCNAME"

    #
    # This code is run if ctrl+c is pressed. Very important to prevent
    # any child processes running after the parent has died. Keeps the system clean.
    #
    # This command kills all processes that are related to the master
    # process as defined by $PID. All processes that have ever been
    # spawned, although disowned or backgrounded will be killed...
    #
    PROCLIST=$(ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep)
    oldIFS=$IFS # save the field separator
    IFS=$'\n' # new field separator, the end of line  
    for x in $(echo "$PROCLIST")
    do
        MYPPID=$(echo $x | awk '{ print $3 }')
        MYPID=$(echo $x | awk '{ print $1 }')
        if [[ ! "$MYPPID" == "$PID" && ! "$MYPPID" == "1" ]] 
        then
            if [[ ! "$MYPID" == "$PID" ]] 
            then
                log DEBUG "Killing process $MYPID"
                kill $MYPID >> /dev/null 2>&1
            else
                log DEBUG "Not killing master process..$MYPID.." 
            fi
        else
                log DEBUG "Not killing listener process. $MYPID.."
        fi
    done
    IFS=$oldIFS

}

run_command () {

    INPUT="$1"
    
    log DEBUG "Current active workers is $ACTIVE_WORKERS"

    if [[ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ]] 
    then
        if [[ -z "$INPUT" ]] 
        then
            stack_pop
            INPUT="$REGISTER"
        fi

        log INFO "Now processing $INPUT"

        if [[ ! -z "$INPUT" && ! -d "$INPUT" ]] 
        then
            commando "$INPUT" &
            MYPID="$!"
            disown
            PIDS="$PIDS $MYPID"
            ((ACTIVE_WORKERS++))
            log DEBUG "Increasing active workers to $ACTIVE_WORKERS"
            echo "$INPUT" >> "$LIST_OF_PROCESSED_ITEMS"
            return 0
        else
            log DEBUG "Item is a directory or is empty."
            return 0
        fi
    else
        log DEBUG "Maximum number of workers are bussy, no more additional workers..."
    fi
}

display_jobs_remaining () {

    if [[ "$ACTIVE_WORKERS" == "1" && "$QUIET" == "0" ]] 
    then
        log PRCNT "One job is remaining.                                      "
    elif [[ "$QUIET" == "0" ]] 
    then
        if [[ "$ACTIVE_WORKERS" == "1" ]] 
        then
            echo -en "\n"
        fi
        log PRCNT "$((ACTIVE_WORKERS)) jobs are remaining.                                  "
    fi
}

show_eta () {
    
    CURRENT_PROCESSED=$((GLOBAL_COUNTER-MAX_NO_OF_RUNNING_JOBS))
    TOTAL="$SIZE_OF_INPUT"
    START_TIME=$START_PPSS
    NOW=$(get_time_in_seconds)
    MODULO=$((GLOBAL_COUNTER % 5 ))

    if [[ "$QUIET" == "1" ]] 
    then
        return 0
    fi

    if [[ "$CURRENT_PROCESSED" -le "0" ]] 
    then 
        return 0
    else
        if [[ "$MODULO" == "0" ]] 
        then
            RUNNING_TIME=$((NOW-START_TIME))
            if [[ ! "$RUNNING_TIME" -le "0" && ! "$CURRENT_PROCESSED" == "0" ]] && [[ "$CURRENT_PROCESSED" -gt "$MAX_NO_OF_RUNNING_JOBS" ]] 
            then
                TIME_PER_ITEM=$(( RUNNING_TIME / ( CURRENT_PROCESSED - MAX_NO_OF_RUNNING_JOBS ) )) 
                log DEBUG "Time per item is $TIME_PER_ITEM seconds."
                TOTAL_TIME=$(( ($TIME_PER_ITEM * SIZE_OF_INPUT) + $TIME_PER_ITEM ))
                TOTAL_TIME_IN_SECONDS=$((START_TIME+TOTAL_TIME))
                if [[ "$ARCH" == "Darwin" ]] 
                then
                    DATE=$(date -r $TOTAL_TIME_IN_SECONDS)
                else
                    DATE=$(date -d @$TOTAL_TIME_IN_SECONDS)
                fi
                echo
                log DSPLY "ETA: $DATE"
                echo -en "\033[2A"
            fi
        fi
    fi
}

display_progress () {

    if [[ "$DAEMON" == "0" ]] 
    then
        SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
        GC=0
        if [[ ! "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]] 
        then
            GC="$GLOBAL_COUNTER"
        else
            GC="$SIZE_OF_INPUT"
        fi
        PERCENT=$((100 * $GC / $SIZE_OF_INPUT ))
        if [[ ! "$ACTIVE_WORKERS" == "0" ]] # && [[ "$FINISHED" == "0" ]] 
        then
            if [[ "$QUIET" == "0" ]] 
            then
                log PRCNT "$PERCENT% complete. Processed $GC of $SIZE_OF_INPUT. Failed $FAILED_ITEMS_COUNTER/$SIZE_OF_INPUT." 
                show_eta
            elif [[ "$DAEMON" == "0" ]] 
            then
                echo -en "\r$PERCENT% --"
            fi

            if [[ "$PERCENT" == "100" ]] 
            then
                if [[ "$QUIET" == "1" ]] 
                then
                    echo
                fi
                FINISHED=1
            fi
        fi
    fi
}

terminate_listener () {

    GLOBAL_FAILED_COUNTER="$1"

    log DEBUG "Running $FUNCNAME"

    if [[ ! -z "$SSH_MASTER_PID" ]] 
    then
        kill "$SSH_MASTER_PID" 
    else
        log DEBUG "SSH master PID is empty."
    fi 

    set_status "STOPPED" "$GLOBAL_FAILED_COUNTER"
    log DEBUG "Listener stopped."

    if [[ ! "$PERCENT" == "100" ]] 
    then
        echo
	    stop-ppss
        log DSPLY "$FAILED_ITEMS_COUNTER failed items."
        log DSPLY "Finished. Consult $JOB_LOG_DIR for job output."
    else
        echo
	    stop-ppss
        log DSPLY "Finished. Consult $JOB_LOG_DIR for job output."
    fi
    if [[ "$QUIET" == "1" ]] 
    then
        echo 
    fi

    if [[ ! -z "$EMAIL" ]]  
    then
        echo  "|P|P|S|S| job finished." | mail -s "$HOSTNAME - PPSS has finished." "$EMAIL"
        if [[ ! "$?" == "0" ]] 
        then 
            log ERROR "Sending os status mail failed."
        fi
    fi

    echo "$GLOBAL_FAILED_COUNTER" >> "$FIFO_LISTENER"

}

inotify_listener () {
    inotifywait "$SRC_DIR" -m -r -e close -q --format '%w%f' | \
    while read -r line
    do
        if [[ ! -d "$line" ]] 
        then
            echo "$line" > "$FIFO"
        fi
    done
}

is_item_unprocessed () {

    VAR="$1"
    STATUS=0

    if [[ -z "$VAR" ]] 
    then
        log DEBUG "$FUNCNAME: something is wrong, no argument received."
        return 1
    fi

    for x in $PROCESSED_ITEMS
    do
        if [[ "$x" == "$VAR" ]] 
        then
            STATUS=1
        fi
    done

    log DEBUG "Is item $VAR unprocessed: $STATUS"
    
    return $STATUS
}

is_item_file_and_unmodified () {

    ITEM="$1"

    if [[ -e "$ITEM" ]] 
    then
        NOW=$(date +%s)
        FILEDATE=$($STAT "$ITEM")
        ELAPSED="$(expr $NOW - $FILEDATE)"
        if [[ "$ELAPSED" -gt "$DAEMON_FILE_AGE" ]] 
        then
            log DEBUG "$FUNCNAME File $ITEM is aged $ELAPSED"
            return 0
        else
            log DEBUG "$FUNCNAME File $ITEM too young $ELAPSED"
            return 1
        fi
    else
        log DEBUG "$FUNCNAME: file does not exist."
        return 0
    fi
}

process_item_as_daemon () {

    ITEM="$1"
    if is_item_unprocessed "$ITEM"
    then
        if is_item_file_and_unmodified "$ITEM"
        then
            echo "$ITEM" >> "$FIFO"
            processed_stack_push "$ITEM"
        else
            stack_push "$ITEM"
        fi
    fi
}

daemon_listener () {

    while true
    do
        get_all_items
        while get_item
        do
            process_item_as_daemon "$ITEM"
        done
        while stack_pop
        do 
            process_item_as_daemon "$REGISTER"  
        done
        sleep "$DAEMON_POLLING_INTERVAL"
    done
}

start_daemon_listener () {

    daemon_listener &
    MYPID="$!"
    disown
    PIDS="$PIDS $MYPID"
}

start_inotify_listener () {

    ACTIVE_WORKERS=0
    inotify_listener &
    MYPID="$!"
    disown
    PIDS="$PIDS $MYPID"
}

start_as_daemon () {

    if [[ "$DAEMON" == "1" ]] 
    then
        log DEBUG "Daemon mode enabled."
        if [[ "$INOTIFY" == "1" ]] 
        then
            log INFO "Linux inotify enabled."
            start_inotify_listener
        else
            start_daemon_listener
            log INFO "Linux inotify disabled."
        fi
    else
        log DEBUG "Daemon mode disabled."
    fi
}

decrease_active_workers () {

    if [[ "$ACTIVE_WORKERS" -gt "0" ]] 
    then
        ((ACTIVE_WORKERS--))
    fi
}

listen_for_job () {

    FINISHED=0
    ACTIVE_WORKERS="$MAX_NO_OF_RUNNING_JOBS"
    PIDS=""
    log DEBUG "Listener started."

    start_as_daemon

    while read event <& 42
    do
        log INFO "Current active workers is $ACTIVE_WORKERS"

        if [[ "$event" == "$START_KEY" ]] 
        then
            decrease_active_workers

            log DEBUG "Got a 'start-key' event"

            if [[ "$DAEMON" == "0" ]] 
            then
                if get_item
                then
                    log DEBUG "Got an item, running command..."
                    run_command "$ITEM"
                else
                    log DEBUG "No more new items..."
                    if [[ "$ACTIVE_WORKERS" == "0" ]] 
                    then
                        display_progress 
                        break
                    else
                        display_jobs_remaining
                    fi
                fi
            else
                log DEBUG "Daemon mode: a worker finished..."
                run_command 
            fi
        elif [[ "$event" == "$FAIL_KEY" ]] 
        then
            ((FAILED_ITEMS_COUNTER++))
            log DEBUG "An item failed to process. $FAILED_ITEMS_COUNTER"
        elif [[ "$event" == "$KILL_KEY" ]] 
        then
            infanticide
            break
        else
            log DEBUG "Event is an item."
            stack_push "$event"
            run_command
        fi

        display_progress 

        set_status "RUNNING" "$FAILED_ITEMS_COUNTER"

    done

    terminate_listener "$FAILED_ITEMS_COUNTER"
}

start_all_workers () {

    if [[ "$MAX_NO_OF_RUNNING_JOBS" == "1" ]] 
    then
        log DSPLY "Starting one (1) single worker."
    else
        log DSPLY "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers."
    fi

    if [[ "$DAEMON" == "0" ]] 
    then
        log DSPLY "---------------------------------------------------------"
    elif [[ "$INOTIFY" == "1" ]] 
    then
        return 0
    fi

    i=0
    while [[ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ]] 
    do
        start_new_worker
        log DEBUG "Starting worker $i"
        ((i++))

        if [[ ! "$MAX_DELAY" == "0" ]] 
        then
            random_delay "$MAX_DELAY"
        fi
    done
}

get_status_of_nodes () {

    RESULT_FILE="$1"
    FAILED=0

    ssh -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER cat "$PPSS_HOME_DIR/$PPSS_NODE_STATUS/*" > "$RESULT_FILE" 2>&1
    if [[ ! "$?" == "0" ]] 
    then
        log DSPLY "|P|P|S|S| has not been started yet on nodes."
        return 1
    fi 

    IFS=$'\n'

    for x in $(cat $RESULT_FILE)
    do
        IP=$(echo $x | awk '{ print $1 }')
        HOST=$(echo $x | awk '{ print $2 }')
        STATUS=$(echo $x | awk '{ print $3 }')
        RES=$(echo $x | awk '{ print $4 }')
        FAIL=$(echo $x | awk '{ print $5 }')
        if [[ -z "$RES" ]] 
        then
            RES="0"
        fi
        PROCESSED=$((PROCESSED+RES))
        FAILED=$((FAILED+FAIL))
        LINE=$(echo "$IP $HOST $RES $FAIL $STATUS" | awk '{ printf ("%-16s %-16s % 8s %6s %7s\n",$1,$2,$3,$4,$5) }')
        log DSPLY "$LINE"
    done    

    log DSPLY "---------------------------------------------------------"
    LINE=$(echo $PROCESSED $FAILED | awk '{ printf ("Total processed/failed: %18s %6s \n",$1,$2) }')
    log DSPLY "$LINE"

    rm "$RESULT_FILE"

}

show_status () {

    . $CONFIG

    if [[ ! -z "$SSH_KEY" ]] 
    then
        SSH_KEY="-i $SSH_KEY"
    fi
    get_all_items

    ITEMS=$(wc -l $LISTOFITEMS | awk '{ print $1 }')
    
    if [[ ! -z "$ITEMS" && ! "$ITEMS" == "0" ]] 
    then
        PROCESSED=$(exec_cmd "ls -1 $PPSS_HOME_DIR/$ITEM_LOCK_DIR 2>/dev/null | wc -l" 1) 2>&1 >> /dev/null
        check_status "$?" "Could not get number of processed items."
        TMP_STATUS=$((100 * $PROCESSED / $ITEMS))
        log DSPLY "Status:\t\t$TMP_STATUS percent complete."
    else
        log DSPLY "Status: UNKNOWN - is PPSS deployed on nodes?"
    fi

    if [[ ! -z $NODES_FILE ]] 
    then
        TMP_NO=$(cat $NODES_FILE | wc -l)
        log DSPLY "Nodes:\t $TMP_NO"
    fi
    log DSPLY "Items:\t\t$ITEMS"

    log DSPLY "---------------------------------------------------------"
    HEADER=$(echo IP-address Hostname Processed Failed Status | awk '{ printf ("%-16s %-15s % 2s %2s %2s\n",$1,$2,$3,$4,$5) }')  
    log DSPLY "$HEADER"
    log DSPLY "---------------------------------------------------------"
    PROCESSED=0

    get_status_of_nodes "RESULT_FILE"

}

main () {
    
    case $MODE in
              node )
                    create_working_directory
                    test_server
                    init_vars
                    get_all_items
                    listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
                    LISTENER_PID=$!
                    start_all_workers
                    ;;
             start )
                    # This option only starts all nodes.
                    LOGFILE=/dev/null
                    display_header
                    if [[ ! -e "$NODES_FILE" ]] 
                    then
                        log ERROR "File $NODES with list of nodes does not exist."
                        set_status "STOPPED"
                        cleanup
                        exit 1
                    else
                        for NODE in $(cat $NODES_FILE)
                        do
                            start_ppss_on_node "$NODE" &
                        done
                    fi
                    cleanup
                    ;;
        config )
                    LOGFILE=/dev/null
                    display_header
                    log DSPLY "Generating configuration file $CONFIG"
                    add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR"
                    add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT"
                    cleanup
                    ;;

        stop )
                    LOGFILE=/dev/null
                    display_header
                    log DSPLY "Stopping PPSS on all nodes."
                    test_server
                    exec_cmd "touch $STOP_SIGNAL"
                    cleanup
                    ;;
        pause )
                    LOGFILE=/dev/null
                    display_header
                    log DSPLY "Pausing PPSS on all nodes."
                    exec_cmd "touch $PAUSE_SIGNAL"
                    cleanup
                    ;;
        continue )
                    LOGFILE=/dev/null
                    display_header
                    if does_file_exist "$STOP_SIGNAL"
                    then
                        log DSPLY "Continuing processing, please use $0 start to start PPSS on al nodes."
                        exec_cmd "rm -f $STOP_SIGNAL"
                    fi
                    if does_file_exist "$PAUSE_SIGNAL"
                    then
                        log DSPLY "Continuing PPSS on all nodes."
                        exec_cmd "rm -f $PAUSE_SIGNAL"
                    fi
                    cleanup
                    ;;
        deploy )
                    LOGFILE=ppss-deploy.txt
                    if [[ -e "$LOGFILE" ]] 
                    then
                        rm "$LOGFILE"
                    fi
            
                    init_ssh_server_socket
                    display_header
                    log DSPLY "Deploying PPSS on nodes. See ppss-deploy.txt for details."
                    deploy_ppss
                    wait
                    cleanup
                    ;;
        status )
                    LOGFILE=/dev/null
                    display_header
                    test_server
                    show_status
                    cleanup
                    ;;
        erase )
                    LOGFILE=/dev/null
                    display_header
                    log DSPLY "Erasing PPSS from all nodes."
                    erase_ppss
                    cleanup
                    ;;
        kill )
                    LOGFILE=/dev/null
                    for x in $(ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }')
                    do          
                         kill "$x"
                    done
                    cleanup
                    ;;

        * )
                    create_working_directory
                    display_header
                    init_vars
                    get_all_items
                    listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
                    LISTENER_PID=$!
                    start_all_workers
                    ;;
    esac
}

#
# PPSS can be sourced. This is mainly for testing purposes (unit tests).
#
if ! are_we_sourced
then

    #
    # First step: process all command-line arguments.
    # 
    process_arguments "$@"

    #
    # This command starts the that sets the whole framework in motion.
    # But only if the file is not sourced.
    #
    main
    #
    # Exit after all processes have finished.
    #
    #wait 
    if [[ -e "$FIFO_LISTENER" ]] 
    then
        while read event <& 43
        do
            cleanup
            exit "$event"
        done
    fi
fi
