Dask Home Cluster

imagen


Recientemente realicé un estudio en el que era necesario procesar varios cientos de miles de conjuntos de datos de entrada. Para cada conjunto: realice algunos cálculos, recopile los resultados de todos los cálculos y seleccione el "mejor" de acuerdo con algunos criterios. En esencia, este es el busto de fuerza bruta. Lo mismo sucede al seleccionar los parámetros de los modelos ML usando GridSearch.


Sin embargo, desde algún punto en adelante, el tamaño de los cálculos puede ser demasiado grande para una computadora, incluso si lo ejecuta en varios procesos usando joblib. O, más precisamente, se vuelve demasiado largo para un experimentador impaciente.


Y dado que en un apartamento moderno ahora puede encontrar más de una computadora "descargada", y la tarea es claramente adecuada para la concurrencia masiva: es hora de ensamblar su clúster doméstico y ejecutar tales tareas en él.


La biblioteca Dask ( https://dask.org/ ) es perfecta para construir un "grupo de inicio" . Es fácil de instalar y no exigente en los nodos, lo que reduce considerablemente el "nivel de entrada" en la computación en clúster.


Para configurar su clúster, necesita en todas las computadoras:


  • instalar intérprete de python
  • dask
  • (scheduler) (worker)

, , — , .



(https://docs.dask.org/) . , .


python


Dask pickle, , python.
3.6 3.7 , . 3.8 - pickle.


" ", , , .


Dask


Dask pip conda


pip install dask distributed bokeh

dask, bokeh , , "-" dask dashboard.
. .


gcc, :


  • MacOS xcode
  • docker image docker-worker, "" , python:3.6-slim-buster . , python:3.6.

dask


- . . — .


$ dask-scheduler

- , .


$ dask-worker schedulerhost:8786 --nprocs 4 --nthreads 1 --memory-limit 1GB --death-timeout 120 -name MyWorker --local-directory /tmp/

  • nprocs / nthreads — , , . GIL -, - , numpy. .
  • memory-limit — , . — - , . , .
  • death-timeout — , - , . -. , , .
  • name — -, . , "" -.
  • local-directory — ,

- Windows


, dask-worker . , , dask-worker .


" " . NSSM (https://www.nssm.cc/).


NSSM, , , . , , - . NSSM .


NSSM . " "


Firewall


firewall: -.


, , -, — . , — . , , .


- . , .


Dask



:


from dask.distributed import Client

client = Client('scheduler_host:port')

— "" , .



, , . pandas, numpy, scikit-learn, tensorflow.
, .


, ? — pip


def install_packages():
    try:
        import sys, subprocess
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'mypackage'])
        return (0)
    except:
        return (1)

from dask.distributed import Client

client = Client('scheduler:8786')
client.run(install_packages)

, , . . "" , , .



, , — .
, , Dask .


. Client upload_file(). , .


- , zip.


from dask.distributed import Client 
import numpy as np
from my_module import foo
from my_package import bar 

def zoo(x)
  return (x**2 + 2*x + 1)

x = np.random.rand(1000000)

client = Client('scheduler:8786')

#        . 
#     
r3 = client.map(zoo, x) 

#  foo  bar     ,
#            
client.upload_file('my_module.py')
client.upload_file('my_package.zip')

#    
r1 = client.map(foo, x)
r2 = client.map(bar, x) 

joblib


joblib . joblib — :


joblib


from joblib import Parallel, delayed
...
res = Parallel(n_jobs=-1)(delayed(my_proc)(c, ref_data) for c in candidates)

joblib + dask


# 
from joblib import Parallel, delayed, parallel_backend
from dask.distributed import Client
...
client = Client('scheduler:8786')

with parallel_backend('dask'): #  ""    
  res = Parallel(n_jobs=-1)(delayed(my_proc)(c, ref_data) for c in candidates)

, , . , — :



16 , .



— 10-20 , 200.


, - .


# 
from joblib import Parallel, delayed, parallel_backend
from dask.distributed import Client
...
client = Client('scheduler:8786')

with parallel_backend('dask', scatter = [ref_data]):
  res = Parallel(n_jobs=-1, batch_size=<N>, pre_dispatch='3*n_jobs')(delayed(my_proc)(c, ref_data) for c in candidates)

batch_size. — , , .
pre_dispatch.



, .



  • — . , .
  • — ( )

3.5-4 , . : , - , , .


, batch_size pre_dispatch . 8-10 .


, , - (, , ), scatter. .


, .


GridSearchCV


scikit-learn joblib , — dask


:


...

lr = LogisticRegression(C=1, solver="liblinear", penalty='l1', max_iter=300)

grid = {"C": 10.0 ** np.arange(-2, 3)}

cv = GridSearchCV(lr, param_grid=grid, n_jobs=-1, cv=3, 
                  scoring='f1_weighted', 
                  verbose=True, return_train_score=True )

client = Client('scheduler:8786')

with joblib.parallel_backend('dask'):
    cv.fit(x1, y)

clf = cv.best_estimator_
print("Best params:", cv.best_params_)
print("Best score:", cv.best_score_)

:


Fitting 3 folds for each of 5 candidates, totalling 15 fits
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done   8 out of  15 | elapsed:  2.0min remaining:  1.7min
[Parallel(n_jobs=-1)]: Done  15 out of  15 | elapsed: 16.1min finished
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/sklearn/linear_model/_logistic.py:1539: UserWarning: 'n_jobs' > 1 does not have any effect when 'solver' is set to 'liblinear'. Got 'n_jobs' = 16.
  " = {}.".format(effective_n_jobs(self.n_jobs)))
Best params: {'C': 10.0}
Best score: 0.9748830491726451

dask. -.
— .


, ( ) — . — .




La biblioteca Dask es una gran herramienta para escalar para una clase específica de tareas. Incluso si usa solo el dask.distributed básico y deja de lado las extensiones especializadas dask.dataframe, dask.array, dask.ml, puede acelerar significativamente los experimentos. En algunos casos, se puede lograr una aceleración casi lineal de los cálculos.


Y todo esto se basa en lo que ya tienes en casa, y se usa para mirar videos, desplazarte por un sinfín de noticias o juegos. ¡Usa estos recursos al máximo!


All Articles