Metadata-Version: 2.1
Name: daskcheck
Version: 0.0.16
Summary: Automatically created environment for python package
Home-page: http://gitlab.com/jaromrax/daskcheck
Author: jaromrax
Author-email: jaromrax@gmail.com
License: GPL2
Platform: UNKNOWN
Description-Content-Type: text/markdown

Project daskcheck
=================

Tools for a more simple use of the **dask**. Dask scheduler is defined
in `~/.dask_ server` file, the content is just an ip address.

*Work is in progress...*

The idea
--------

1.  define properly the *core function* (*xcorefunc* in the example)
    with **THE proper return**
2.  `daskcheck` then will take care about:
    -   sending the task to the scheduler (but this is natural)
    -   uploads (python) single-file module to workers (via .upload)
    -   sending the parameters
    -   collecting results and saving them to **local json file**
    -   *in future* about parsing the local **json** file
    -   *in future* about sending local (bash) scripts to workers... (?)
    -   *in future* about managing (worker\'s) folders with data output
        (if output is too large)...

Files in the repo
-----------------

It is becoming a bit messy, so for the reference:

  ------------------------------------- ----------------------------------------
  batch~forworker~                      X BATCH TEST in remote \~/sand
  bin~daskcheck~.py                     X nothing will be main script
  config.py                             X module configs
  conv~json2spectra~.py                 exo~dask~ output conversion to spectra
  daskcheck                             FOLDER
  daskcheck.py                          module OPERATIONS
  dask~resultslog20230510142235~.json   log file
  docextr.py                            Xattemt to autocreate
  exo~dask~.py                          ???work on exogam
  f.py                                  ??resulting autogenerate
  OldLogs                               Previous files
  README.md                             MD
  README.org                            this file
  run~scheduler~                        RUN SCHEDULER
  run~syncversions~                     ??KEEP dasksched-workers in sync
  run~worker~                           RUN WORKER
  setup.py                              setup python
  singlemod.py                          MODULE for import
  singlexec.py                          XOBSOLETE - remote exec BASH
  unitname.py                           generic module
  version.py                            version is here
                                        
  ------------------------------------- ----------------------------------------

WAITING Instalation of daskcheck
--------------------------------

``` {.bash org-language="sh"}
pip install daskcheck
```

Installation of dask 2023
-------------------------

See <https://docs.dask.org/en/stable/install.html>

``` {.bash org-language="sh"}
pip install "dask[complete]"
```

Launching dask scheduler/workers
--------------------------------

-   *Pay attention to correct/compatible libraries on different workers*
-   *open port 8786 and 8787 on scheduler and long~range~ of ports on
    workers...*

### Testing basics on localhost - the most simple in-terminal way

-   `run_scheduler`
    -   you see it running
-   `dask worker 127.0.0.1:8786 --nworkers 1 --nthreads 1`
    -   at this stage do not run `run_worker`, it looks for server
    -   Starting established connection to tcp://127.0.0.1:8786
-   `./daskcheck.py loc`
    -   see local function output, should work in any case
-   `./daskcheck.py test -s 127.0.0.1`
    -   overide the server to 127.0.0.1 to see if everything works

### Set your server on worker(s)

`nano ~/.dask_server` and put 10.10.0.2 or whatever address your server
has

### Test basics on network

-   go to server and run `run_scheduler`
-   go to worker and run `run_worker`
-   CHECK LIBRARIES and upgrade what is needed e.g.:
    -   `pip3 install tornado --upgrade`
-   TEST with `./daskcheck.py test` from worker

### Launching worker from cmdline with *local* scheduler

``` {.bash org-language="sh"}
dask     worker 127.0.0.1:8786 --nworkers 5 --nthreads 1
```

Testing dask
------------

Just local run of get~cpuinfo~

``` {.bash org-language="sh"}
./daskcheck.py loc
```

This runs (scheduler and workers should be running) 40x get~cpuinfo~ in
cluster

``` {.bash org-language="sh"}
./daskcheck.py net
```

DOING Run single-file - (python) module ~~OR (bash) batch~~
-----------------------------------------------------------

`./daskcheck.py file` - `file` parameter tells a filename and parameters
is comming...

``` {.bash org-language="sh"}
# python function with main - example
./daskcheck.py file singlemod.py  11..33
```

How is that done?

-   importlib is used to get the module\'s MAIN,
-   UPLOAD `singlemod.py` to the scheduler,
-   chdir to `/dask_sendbox` and
-   run `tell()` and `main()` function of the singlemod.py:
    -   ~~singlemod.py - writes a file to (worker\'s) `~/dask_sendbox`~~
    -   ~~deprec\* singlexec.py - launches `./runme` - if not present in
        (worker\'s) `~/dask_sendbox`, it crashes~~
    -   ~~batch~forworker~ - bash script, writes file to (worker\'s)
        `~/dask_sendbox`~~
        -   ~~Previously, `batch_for_worker` LOAD and RUN was hardwired
            in the `signlexec.py`.~~

`singlemod.py` - whatever returns, goes to json and cvs

Monitoring dask
---------------

Have open port on scheduler:

    xdg-open http://localhost:8787

[TODO]{.todo .TODO} Recollection the data from json {#recollection-the-data-from-json}
---------------------------------------------------

*to recover...*

[TODO]{.todo .TODO} Python native check ?what? {#python-native-check-what}
----------------------------------------------

It must be checked that python works too - as before

DEVELOPMENT
===========

-   restart scheduler after updating `daskcheck`
-   check open port when run~worker~ \[ok\]

Appendix
========

run~worker~ - environment needed and command
--------------------------------------------

``` {.bash org-language="sh"}
#!/bin/bash

# PYTHON bin exports
export PATH=$PATH:$HOME/.local/bin

# ROOT exports
export PYTHONPATH=$HOME/root/lib/
export ROOTSYS=$HOME/root
export PATH=$ROOTSYS/bin:~/bin:$PATH
export LD_LIBRARY_PATH=$ROOTSYS/lib:$ROOTSYS/lib/root:$LD_LIBRARY_PATH

source $HOME/root/bin/thisroot.sh

# CONFIG FOR WORKER
export DISPLAY=:0
export DS=$HOME/.dask_server  # get IP of the scheduler
export DSER=`cat $DS`
export HOST=`hostname`
export workers=2 # how many cores to run
export PORT=8786
cd /tmp

if [ -f  "$DS" ]; then
    echo ... OK $DS exists
else
    echo ... NO $DS exists
    sleep 5
    echo ...
    exit 1
fi


echo ... I am on $HOST and trying to connect to /$DSER/ one thread per worker
dask worker ${DSER}:${PORT}      --nworkers $workers --nthreads 1

```

run~scheduler~ - script
-----------------------

``` {.bash org-language="sh"}
#!/bin/bash


#dask scheduler --port 8786
export PATH=$PATH:$HOME/.local/bin

export PORT=8786
export HOST=`hostname`

cd /tmp

if [ "$HOST" = "core6a" ]; then
    echo ... starting scheduler
    dask scheduler   --port ${PORT}   #  --bokeh-port 8787
fi
echo ... scheduler eneded or not launched at all
sleep 5
exit 0
```

Run a (python) function from python code
----------------------------------------

*This must be updated...*

`exo_dask.py` Contains a working (in the past) example, using `root`.

This is (or should be) a python code that uses `daskcheck` for sending a
function.

It is evidently crippled for the moment...

``` {.python}
from daskcheck import daskcheck

from fire import Fire
import time
import platform
import datetime as dt
import json

def main( parlist ):
    """
    Initiated by Fire. If one parameter, runs locally with local xcorefunc
    """
    parameters = daskcheck.prepare_params( parlist )

    if type(parameters)==list:
        print("i... viable for DASK ....")
        daskcheck.submit( daskcheck.get_cpu_info , parameters)
    elif type(parameters)==tuple:
        print("i... viable for DASK ....")
        daskcheck.submit( daskcheck.get_cpu_info , parameters)
    else:
        print("i... running only locally")
        my_results = xcorefunc( 1 , parameters ) # order = 1, just arbitrary number
        # Write LOG file.
        now = dt.datetime.now()
        stamp = now.strftime("%Y%m%d_%H%M%S")
        with open(f"dask_results_log_{stamp}.json", "w") as fp:
            json.dump( my_results , fp, sort_keys=True, indent='\t', separators=(',', ': '))
    return

def xcorefunc( order, param):
    """
    CORE function to be sent to dask schedule.

    :param order: order number of the call
    :param param: parameter to be sent
    """
    import ROOT # *TRICK* I need to import here to avoid breaking pickle
    start_time = time.perf_counter()

    return order, [platform.node(),  f"{time.perf_counter() - start_time:.1f} s" ]


if __name__=="__main__":
    Fire(main)

```


