Cluster Rumah Dask

gambar


Baru-baru ini saya melakukan penelitian di mana perlu memproses beberapa ratus ribu set data input. Untuk setiap set - lakukan beberapa perhitungan, kumpulkan hasil dari semua perhitungan bersama dan pilih "terbaik" sesuai dengan beberapa kriteria. Intinya, ini adalah bruteforce bust. Hal yang sama terjadi ketika memilih parameter yang menggunakan model ML GridSearch.


Namun, sejak beberapa saat, ukuran perhitungan mungkin menjadi terlalu besar untuk satu komputer, bahkan jika Anda menjalankannya dalam beberapa proses menggunakan joblib. Atau, lebih tepatnya, menjadi terlalu lama bagi seorang eksperimen yang tidak sabar.


Dan karena di apartemen modern sekarang Anda dapat menemukan lebih dari satu komputer "underloaded", dan tugasnya jelas cocok untuk konkurensi massal - sekarang saatnya untuk merakit cluster rumah Anda dan menjalankan tugas seperti itu di atasnya.


Perpustakaan Dask ( https://dask.org/ ) sangat cocok untuk membangun "cluster rumah" . Mudah untuk menginstal dan tidak menuntut pada node, yang secara serius menurunkan "entry level" dalam komputasi cluster.


Untuk mengkonfigurasi kluster Anda, Anda perlu di semua komputer:


  • instal juru bahasa python
  • dask
  • (scheduler) (worker)

, , β€” , .



(https://docs.dask.org/) . , .


python


Dask pickle, , python.
3.6 3.7 , . 3.8 - pickle.


" ", , , .


Dask


Dask pip conda


pip install dask distributed bokeh

dask, bokeh , , "-" dask dashboard.
. .


gcc, :


  • MacOS xcode
  • docker image docker-worker, "" , python:3.6-slim-buster . , python:3.6.

dask


- . . β€” .


$ dask-scheduler

- , .


$ dask-worker schedulerhost:8786 --nprocs 4 --nthreads 1 --memory-limit 1GB --death-timeout 120 -name MyWorker --local-directory /tmp/

  • nprocs / nthreads β€” , , . GIL -, - , numpy. .
  • memory-limit β€” , . β€” - , . , .
  • death-timeout β€” , - , . -. , , .
  • name β€” -, . , "" -.
  • local-directory β€” ,

- Windows


, dask-worker . , , dask-worker .


" " . NSSM (https://www.nssm.cc/).


NSSM, , , . , , - . NSSM .


NSSM . " "


Firewall


firewall: -.


, , -, β€” . , β€” . , , .


- . , .


Dask



:


from dask.distributed import Client

client = Client('scheduler_host:port')

β€” "" , .



, , . pandas, numpy, scikit-learn, tensorflow.
, .


, ? β€” pip


def install_packages():
    try:
        import sys, subprocess
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'mypackage'])
        return (0)
    except:
        return (1)

from dask.distributed import Client

client = Client('scheduler:8786')
client.run(install_packages)

, , . . "" , , .



, , β€” .
, , Dask .


. Client upload_file(). , .


- , zip.


from dask.distributed import Client 
import numpy as np
from my_module import foo
from my_package import bar 

def zoo(x)
  return (x**2 + 2*x + 1)

x = np.random.rand(1000000)

client = Client('scheduler:8786')

#        . 
#     
r3 = client.map(zoo, x) 

#  foo  bar     ,
#            
client.upload_file('my_module.py')
client.upload_file('my_package.zip')

#    
r1 = client.map(foo, x)
r2 = client.map(bar, x) 

joblib


joblib . joblib β€” :


joblib


from joblib import Parallel, delayed
...
res = Parallel(n_jobs=-1)(delayed(my_proc)(c, ref_data) for c in candidates)

joblib + dask


# 
from joblib import Parallel, delayed, parallel_backend
from dask.distributed import Client
...
client = Client('scheduler:8786')

with parallel_backend('dask'): #  ""    
  res = Parallel(n_jobs=-1)(delayed(my_proc)(c, ref_data) for c in candidates)

, , . , β€” :



16 , .



β€” 10-20 , 200.


, - .


# 
from joblib import Parallel, delayed, parallel_backend
from dask.distributed import Client
...
client = Client('scheduler:8786')

with parallel_backend('dask', scatter = [ref_data]):
  res = Parallel(n_jobs=-1, batch_size=<N>, pre_dispatch='3*n_jobs')(delayed(my_proc)(c, ref_data) for c in candidates)

batch_size. β€” , , .
pre_dispatch.



, .



  • β€” . , .
  • β€” ( )
  • β€”

3.5-4 , . : , - , , .


, batch_size pre_dispatch . 8-10 .


, , - (, , ), scatter. .


, .


GridSearchCV


scikit-learn joblib , β€” dask


:


...

lr = LogisticRegression(C=1, solver="liblinear", penalty='l1', max_iter=300)

grid = {"C": 10.0 ** np.arange(-2, 3)}

cv = GridSearchCV(lr, param_grid=grid, n_jobs=-1, cv=3, 
                  scoring='f1_weighted', 
                  verbose=True, return_train_score=True )

client = Client('scheduler:8786')

with joblib.parallel_backend('dask'):
    cv.fit(x1, y)

clf = cv.best_estimator_
print("Best params:", cv.best_params_)
print("Best score:", cv.best_score_)

:


Fitting 3 folds for each of 5 candidates, totalling 15 fits
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done   8 out of  15 | elapsed:  2.0min remaining:  1.7min
[Parallel(n_jobs=-1)]: Done  15 out of  15 | elapsed: 16.1min finished
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/sklearn/linear_model/_logistic.py:1539: UserWarning: 'n_jobs' > 1 does not have any effect when 'solver' is set to 'liblinear'. Got 'n_jobs' = 16.
  " = {}.".format(effective_n_jobs(self.n_jobs)))
Best params: {'C': 10.0}
Best score: 0.9748830491726451

dask. -.
β€” .


, ( ) β€” . β€” .




Pustaka Dask adalah alat yang hebat untuk penskalaan untuk kelas tugas tertentu. Bahkan jika Anda hanya menggunakan dask dasar yang didistribusikan dan mengabaikan ekstensi khusus dask.dataframe, dask.array, dask.ml - Anda dapat mempercepat percobaan secara signifikan. Dalam beberapa kasus, percepatan perhitungan yang hampir linier dapat dicapai.


Dan semua ini didasarkan pada apa yang sudah Anda miliki di rumah, dan digunakan untuk menonton video, gulir melalui feed berita atau game tanpa akhir. Gunakan sumber daya ini sepenuhnya!


All Articles