Metadata-Version: 1.0
Name: sparkit-learn
Version: 0.2.4
Summary: Scikit-learn on PySpark
Home-page: https://github.com/lensacom/sparkit-learn
Author: Krisztian Szucs, Andras Fulop
Author-email: krisztian.szucs@lensa.com, andras.fulop@lensa.com
License: Apache License, Version 2.0
Description: Sparkit-learn
        =============
        
        |Build Status| |PyPi|
        
        **PySpark + Scikit-learn = Sparkit-learn**
        
        GitHub: https://github.com/lensacom/sparkit-learn
        
        About
        =====
        
        Sparkit-learn aims to provide scikit-learn functionality and API on
        PySpark. The main goal of the library is to create an API that stays
        close to sklearn's.
        
        The driving principle was to *"Think locally, execute distributively."*
        To accomodate this concept, the basic data block is always an array or a
        (sparse) matrix and the operations are executed on block level.
        
        
        Requirements
        ============
        
        -  **Python 2.7.x or 3.4.x**
        -  **Spark[>=1.3.0]**
        -  NumPy[>=1.9.0]
        -  SciPy[>=0.14.0]
        -  Scikit-learn[>=0.16]
        
        
        
        Run IPython from notebooks directory
        ====================================
        
        .. code:: bash
        
            PYTHONPATH=${PYTHONPATH}:.. IPYTHON_OPTS="notebook" ${SPARK_HOME}/bin/pyspark --master local\[4\] --driver-memory 2G
        
        
        Run tests with
        ==============
        
        .. code:: bash
            ./runtests.sh
        
        
        Quick start
        ===========
        
        Sparkit-learn introduces three important distributed data format:
        
        -  **ArrayRDD:**
        
           A *numpy.array* like distributed array
        
           .. code:: python
        
               from splearn.rdd import ArrayRDD
        
               data = range(20)
               # PySpark RDD with 2 partitions
               rdd = sc.parallelize(data, 2) # each partition with 10 elements
               # ArrayRDD
               # each partition will contain blocks with 5 elements
               X = ArrayRDD(rdd, bsize=5) # 4 blocks, 2 in each partition
        
           Basic operations:
        
           .. code:: python
        
               len(X) # 20 - number of elements in the whole dataset
               X.blocks # 4 - number of blocks
               X.shape # (20,) - the shape of the whole dataset
        
               X # returns an ArrayRDD
               # <class 'splearn.rdd.ArrayRDD'> from PythonRDD...
        
               X.dtype # returns the type of the blocks
               # numpy.ndarray
        
               X.collect() # get the dataset
               # [array([0, 1, 2, 3, 4]),
               #  array([5, 6, 7, 8, 9]),
               #  array([10, 11, 12, 13, 14]),
               #  array([15, 16, 17, 18, 19])]
        
               X[1].collect() # indexing
               # [array([5, 6, 7, 8, 9])]
        
               X[1] # also returns an ArrayRDD!
        
               X[1::2].collect() # slicing
               # [array([5, 6, 7, 8, 9]),
               #  array([15, 16, 17, 18, 19])]
        
               X[1::2] # returns an ArrayRDD as well
        
               X.tolist() # returns the dataset as a list
               # [0, 1, 2, ... 17, 18, 19]
               X.toarray() # returns the dataset as a numpy.array
               # array([ 0,  1,  2, ... 17, 18, 19])
        
               # pyspark.rdd operations will still work
               X.getNumPartitions() # 2 - number of partitions
        
        
        - **SparseRDD:**
        
          The sparse counterpart of the *ArrayRDD*, the main difference is that the
          blocks are sparse matrices. The reason behind this split is to follow the
          distinction between *numpy.ndarray*s and *scipy.sparse* matrices.
          Usually the *SparseRDD* is created by *splearn*'s transformators, but one can
          instantiate too.
        
          .. code:: python
        
               # generate a SparseRDD from a text using SparkCountVectorizer
               from splearn.rdd import SparseRDD
               from sklearn.feature_extraction.tests.test_text import ALL_FOOD_DOCS
               ALL_FOOD_DOCS
               #(u'the pizza pizza beer copyright',
               # u'the pizza burger beer copyright',
               # u'the the pizza beer beer copyright',
               # u'the burger beer beer copyright',
               # u'the coke burger coke copyright',
               # u'the coke burger burger',
               # u'the salad celeri copyright',
               # u'the salad salad sparkling water copyright',
               # u'the the celeri celeri copyright',
               # u'the tomato tomato salad water',
               # u'the tomato salad water copyright')
        
               # ArrayRDD created from the raw data
               X = ArrayRDD(sc.parallelize(ALL_FOOD_DOCS, 4), 2)
               X.collect()
               # [array([u'the pizza pizza beer copyright',
               #         u'the pizza burger beer copyright'], dtype='<U31'),
               #  array([u'the the pizza beer beer copyright',
               #         u'the burger beer beer copyright'], dtype='<U33'),
               #  array([u'the coke burger coke copyright',
               #         u'the coke burger burger'], dtype='<U30'),
               #  array([u'the salad celeri copyright',
               #         u'the salad salad sparkling water copyright'], dtype='<U41'),
               #  array([u'the the celeri celeri copyright',
               #         u'the tomato tomato salad water'], dtype='<U31'),
               #  array([u'the tomato salad water copyright'], dtype='<U32')]
        
               # Feature extraction executed
               from splearn.feature_extraction.text import SparkCountVectorizer
               vect = SparkCountVectorizer()
               X = vect.fit_transform(X)
               # and we have a SparseRDD
               X
               # <class 'splearn.rdd.SparseRDD'> from PythonRDD...
        
               # it's type is the scipy.sparse's general parent
               X.dtype
               # scipy.sparse.base.spmatrix
        
               # slicing works just like in ArrayRDDs
               X[2:4].collect()
               # [<2x11 sparse matrix of type '<type 'numpy.int64'>'
               #   with 7 stored elements in Compressed Sparse Row format>,
               #  <2x11 sparse matrix of type '<type 'numpy.int64'>'
               #   with 9 stored elements in Compressed Sparse Row format>]
        
               # general mathematical operations are available
               X.sum(), X.mean(), X.max(), X.min()
               # (55, 0.45454545454545453, 2, 0)
        
               # even with axis parameters provided
               X.sum(axis=1)
               # matrix([[5],
               #         [5],
               #         [6],
               #         [5],
               #         [5],
               #         [4],
               #         [4],
               #         [6],
               #         [5],
               #         [5],
               #         [5]])
        
               # It can be transformed to dense ArrayRDD
               X.todense()
               # <class 'splearn.rdd.ArrayRDD'> from PythonRDD...
               X.todense().collect()
               # [array([[1, 0, 0, 0, 1, 2, 0, 0, 1, 0, 0],
               #         [1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0]]),
               #  array([[2, 0, 0, 0, 1, 1, 0, 0, 2, 0, 0],
               #         [2, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0]]),
               #  array([[0, 1, 0, 2, 1, 0, 0, 0, 1, 0, 0],
               #         [0, 2, 0, 1, 0, 0, 0, 0, 1, 0, 0]]),
               #  array([[0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0],
               #         [0, 0, 0, 0, 1, 0, 2, 1, 1, 0, 1]]),
               #  array([[0, 0, 2, 0, 1, 0, 0, 0, 2, 0, 0],
               #         [0, 0, 0, 0, 0, 0, 1, 0, 1, 2, 1]]),
               #  array([[0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1]])]
        
               # One can instantiate SparseRDD manually too:
               sparse = sc.parallelize(np.array([sp.eye(2).tocsr()]*20), 2)
               sparse = SparseRDD(sparse, bsize=5)
               sparse
               # <class 'splearn.rdd.SparseRDD'> from PythonRDD...
        
               sparse.collect()
               # [<10x2 sparse matrix of type '<type 'numpy.float64'>'
               #   with 10 stored elements in Compressed Sparse Row format>,
               #  <10x2 sparse matrix of type '<type 'numpy.float64'>'
               #   with 10 stored elements in Compressed Sparse Row format>,
               #  <10x2 sparse matrix of type '<type 'numpy.float64'>'
               #   with 10 stored elements in Compressed Sparse Row format>,
               #  <10x2 sparse matrix of type '<type 'numpy.float64'>'
               #   with 10 stored elements in Compressed Sparse Row format>]
        
        
        -  **DictRDD:**
        
           A column based data format, each column with it's own type.
        
           .. code:: python
        
               from splearn.rdd import DictRDD
        
               X = range(20)
               y = list(range(2)) * 10
               # PySpark RDD with 2 partitions
               X_rdd = sc.parallelize(X, 2) # each partition with 10 elements
               y_rdd = sc.parallelize(y, 2) # each partition with 10 elements
               # DictRDD
               # each partition will contain blocks with 5 elements
               Z = DictRDD((X_rdd, y_rdd),
                           columns=('X', 'y'),
                           bsize=5,
                           dtype=[np.ndarray, np.ndarray]) # 4 blocks, 2/partition
               # if no dtype is provided, the type of the blocks will be determined
               # automatically
        
               # or:
               import numpy as np
        
               data = np.array([range(20), list(range(2))*10]).T
               rdd = sc.parallelize(data, 2)
               Z = DictRDD(rdd,
                           columns=('X', 'y'),
                           bsize=5,
                           dtype=[np.ndarray, np.ndarray])
        
           Basic operations:
        
           .. code:: python
        
               len(Z) # 8 - number of blocks
               Z.columns # returns ('X', 'y')
               Z.dtype # returns the types in correct order
               # [numpy.ndarray, numpy.ndarray]
        
               Z # returns a DictRDD
               #<class 'splearn.rdd.DictRDD'> from PythonRDD...
        
               Z.collect()
               # [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),
               #  (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),
               #  (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0])),
               #  (array([15, 16, 17, 18, 19]), array([1, 0, 1, 0, 1]))]
        
               Z[:, 'y'] # column select - returns an ArrayRDD
               Z[:, 'y'].collect()
               # [array([0, 1, 0, 1, 0]),
               #  array([1, 0, 1, 0, 1]),
               #  array([0, 1, 0, 1, 0]),
               #  array([1, 0, 1, 0, 1])]
        
               Z[:-1, ['X', 'y']] # slicing - DictRDD
               Z[:-1, ['X', 'y']].collect()
               # [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),
               #  (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),
               #  (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0]))]
        
        
        Basic workflow
        --------------
        
        With the use of the described data structures, the basic workflow is
        almost identical to sklearn's.
        
        Distributed vectorizing of texts
        ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        
        SparkCountVectorizer
        ^^^^^^^^^^^^^^^^^^^^
        
        .. code:: python
        
            from splearn.rdd import ArrayRDD
            from splearn.feature_extraction.text import SparkCountVectorizer
            from sklearn.feature_extraction.text import CountVectorizer
        
            X = [...]  # list of texts
            X_rdd = ArrayRDD(sc.parallelize(X, 4))  # sc is SparkContext
        
            local = CountVectorizer()
            dist = SparkCountVectorizer()
        
            result_local = local.fit_transform(X)
            result_dist = dist.fit_transform(X_rdd)  # SparseRDD
        
        
        SparkHashingVectorizer
        ^^^^^^^^^^^^^^^^^^^^^^
        
        .. code:: python
        
            from splearn.rdd import ArrayRDD
            from splearn.feature_extraction.text import SparkHashingVectorizer
            from sklearn.feature_extraction.text import HashingVectorizer
        
            X = [...]  # list of texts
            X_rdd = ArrayRDD(sc.parallelize(X, 4))  # sc is SparkContext
        
            local = HashingVectorizer()
            dist = SparkHashingVectorizer()
        
            result_local = local.fit_transform(X)
            result_dist = dist.fit_transform(X_rdd)  # SparseRDD
        
        
        SparkTfidfTransformer
        ^^^^^^^^^^^^^^^^^^^^^
        
        .. code:: python
        
            from splearn.rdd import ArrayRDD
            from splearn.feature_extraction.text import SparkHashingVectorizer
            from splearn.feature_extraction.text import SparkTfidfTransformer
            from splearn.pipeline import SparkPipeline
        
            from sklearn.feature_extraction.text import HashingVectorizer
            from sklearn.feature_extraction.text import TfidfTransformer
            from sklearn.pipeline import Pipeline
        
            X = [...]  # list of texts
            X_rdd = ArrayRDD(sc.parallelize(X, 4))  # sc is SparkContext
        
            local_pipeline = Pipeline((
                ('vect', HashingVectorizer()),
                ('tfidf', TfidfTransformer())
            ))
            dist_pipeline = SparkPipeline((
                ('vect', SparkHashingVectorizer()),
                ('tfidf', SparkTfidfTransformer())
            ))
        
            result_local = local_pipeline.fit_transform(X)
            result_dist = dist_pipeline.fit_transform(X_rdd)  # SparseRDD
        
        
        Distributed Classifiers
        ~~~~~~~~~~~~~~~~~~~~~~~
        
        .. code:: python
        
            from splearn.rdd import DictRDD
            from splearn.feature_extraction.text import SparkHashingVectorizer
            from splearn.feature_extraction.text import SparkTfidfTransformer
            from splearn.svm import SparkLinearSVC
            from splearn.pipeline import SparkPipeline
        
            from sklearn.feature_extraction.text import HashingVectorizer
            from sklearn.feature_extraction.text import TfidfTransformer
            from sklearn.svm import LinearSVC
            from sklearn.pipeline import Pipeline
        
            X = [...]  # list of texts
            y = [...]  # list of labels
            X_rdd = sc.parallelize(X, 4)
            y_rdd = sc.parralelize(y, 4)
            Z = DictRDD((X_rdd, y_rdd),
                        columns=('X', 'y'),
                        dtype=[np.ndarray, np.ndarray])
        
            local_pipeline = Pipeline((
                ('vect', HashingVectorizer()),
                ('tfidf', TfidfTransformer()),
                ('clf', LinearSVC())
            ))
            dist_pipeline = SparkPipeline((
                ('vect', SparkHashingVectorizer()),
                ('tfidf', SparkTfidfTransformer()),
                ('clf', SparkLinearSVC())
            ))
        
            local_pipeline.fit(X, y)
            dist_pipeline.fit(Z, clf__classes=np.unique(y))
        
            y_pred_local = local_pipeline.predict(X)
            y_pred_dist = dist_pipeline.predict(Z[:, 'X'])
        
        
        Distributed Model Selection
        ~~~~~~~~~~~~~~~~~~~~~~~~~~~
        
        .. code:: python
        
            from splearn.rdd import DictRDD
            from splearn.grid_search import SparkGridSearchCV
            from splearn.naive_bayes import SparkMultinomialNB
        
            from sklearn.grid_search import GridSearchCV
            from sklearn.naive_bayes import MultinomialNB
        
            X = [...]
            y = [...]
            X_rdd = sc.parallelize(X, 4)
            y_rdd = sc.parralelize(y, 4)
            Z = DictRDD((X_rdd, y_rdd),
                        columns=('X', 'y'),
                        dtype=[np.ndarray, np.ndarray])
        
            parameters = {'alpha': [0.1, 1, 10]}
            fit_params = {'classes': np.unique(y)}
        
            local_estimator = MultinomialNB()
            local_grid = GridSearchCV(estimator=local_estimator,
                                      param_grid=parameters)
        
            estimator = SparkMultinomialNB()
            grid = SparkGridSearchCV(estimator=estimator,
                                     param_grid=parameters,
                                     fit_params=fit_params)
        
            local_grid.fit(X, y)
            grid.fit(Z)
        
        
        Special thanks
        ==============
        
        - scikit-learn community
        - spylearn community
        - pyspark community
        
        |Analytics|
        
        .. |Build Status| image:: https://travis-ci.org/lensacom/sparkit-learn.png?branch=master
           :target: https://travis-ci.org/lensacom/sparkit-learn
        .. |PyPi| image:: https://img.shields.io/pypi/v/sparkit-learn.svg
           :target: https://pypi.python.org/pypi/sparkit-learn
        .. |Analytics| image:: https://ga-beacon.appspot.com/UA-57495026-1/sparkit-learn/readme?pixel
           :target: https://github.com/lensacom/sparkit-learn
        
Platform: UNKNOWN
