Metadata-Version: 2.1
Name: datafushion-plugins-sparkpy
Version: 1.0.7
Summary: DataFushion的SparkPy算法插件
Home-page: UNKNOWN
Author: 肖林朋
Author-email: 1553990434@qq.com
License: XiaoLinpeng Licence
Description: # DataFushion_Plugins_SparkPy说明
        
        ## 1.简介
        
        针对Spark的Python版本算法(pyspark)在DataFushion平台使用所给出的插件,主要用于规范化算法的输入输出
        
        ## 2.通常算法使用
        
        - [x] Step1:引入datafushion_spark包中的operation模块
        - [x] Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑
        
        ```python
        from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
            FileExtractFormatEnum
        
        
        if __name__ == '__main__':
            with operation(app_name="AvgWindPowerByStatus", master="local") as destruction:  # type:HandleDataFrameSet
                input_data_struct_list = destruction.input_data_struct_list
                mapping_flags = destruction.mapping_flags
                param_map = destruction.param_map
                spark = destruction.spark  # type:SparkSession
        
                data_result = None  # type DataFrame
        
                # 算法逻辑部分
                for index, input_data_struct in enumerate(input_data_struct_list):  # type: HandleInputDataStruct
                    # 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
                    data_list = input_data_struct.data_list  # type: DataFrame
                    data_list.show()
                    if index == 0:
                        data_result = data_list.groupby("status").agg({
                            "power": "mean"
                        }).withColumnRenamed("avg(power)", "powerAvg")
                    else:
                        data_result = data_result.union(data_list.groupby("status").agg({
                            "power": "mean"
                        }).withColumnRenamed("avg(power)", "powerAvg"))
        
                # 保存最终结果
                destruction.data_result = data_result
                # 保存存储的格式,需要与打包的配置文件对应
                destruction.output_type = FileExtractFormatEnum.JSON.value
        ```
        
        注意:
        
        ------
        
        如果是Windows开发的话需要在脚本文件前加入，findspark请自行下载，没有在包中做依赖管理
        
        ```python
        import findspark
        
        
        findspark.init()
        ```
        
        
        
        ------
        
        destruction为解构的`HandleDataFrameSet`实体类
        
        ------
        
        input_data_struct_list中包含了输入数据的封装,其类型为List
        
        其元素为`HandleInputDataStruct`类,包含的属性为file_type,file_path,file_input_mapping,data_list
        
        算法需要使用的是file_input_mapping和data_list
        
        data_list是输入数据的`DataFrame`
        
        file_input_mapping为输入数据字段的映射
        
        spark为sparkSession对象
        
        mapping_flags为映射标识字典，key为每个单独输入的映射标识，key为输入映射
        
        ------
        
        param_map为算法的参数字典
        
        ------
        
        在对数据进行业务算法处理完成后,需要将拆解的destruction中的data_result属性赋值为业务算法的最终数据结果
        
        ------
        
        在对数据进行业务算法处理完成后,需要将拆解的destruction中的output_type属性赋值为业务算法需要输出的文件格式`FileExtractFormatEnum.JSON.value`中提供了`JSON,CSV,PARQUET,GENERAL`四类格式
        
        ------
        
        目前`PARQUET`类的输出格式只支持作为Spark类型的算法积木中的输入
        
        ## 3.模型训练算法使用
        
        - [x] Step1:引入datafushion_spark包中的operation模块
        
        - [x] Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑
        
          ***此处以鸢尾花训练为例进行逻辑回归模型训练***
        
          ```python
          from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
              FileExtractFormatEnum, TrainFiledEnum, TrainModelResult
          from pyspark.ml.feature import VectorAssembler
          from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
          from pyspark.ml import Pipeline, PipelineModel
          
          
          if __name__ == '__main__':
              with operation(app_name="IrisClassify", mapping_data=False,
                             master="local") as destruction:  # type:HandleDataFrameSet
                  input_data_struct_list = destruction.input_data_struct_list
                  mapping_flags = destruction.mapping_flags  # type: dict
                  param_map = destruction.param_map
                  spark = destruction.spark  # type:SparkSession
          
                  algo_iter = param_map['iter']
                  algo_reg = param_map['reg']
                  algo_elastic_net = param_map['elasticNet']
                  mapping_list = []
                  for k, v in mapping_flags.items():
                      mapping_list.append(v)
                  data_result = None
          
                  # 算法逻辑部分
                  for index, input_data_struct in enumerate(input_data_struct_list):  # type: HandleInputDataStruct
                      # 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
                      data = input_data_struct.data_list  # type: DataFrame
                      mapping = mapping_list[index]
                      feature_fields = mapping[TrainFiledEnum.FEATURE.value]
                      label_field = mapping[TrainFiledEnum.LABEL.value][0]
                      train_data = data.withColumnRenamed(label_field, TrainFiledEnum.LABEL.value)
                      featureAssembler = VectorAssembler().setInputCols(feature_fields).setOutputCol('features')
                      logistic_regression = LogisticRegression().setMaxIter(algo_iter).setRegParam(algo_reg).setElasticNetParam(
                              algo_elastic_net)
                      pipeline_model: PipelineModel = Pipeline().setStages([featureAssembler, logistic_regression]).fit(
                              train_data)
                      # 将data_result实例化为一个TrainModelResult对象
                      data_result = TrainModelResult(train_data=train_data, pipeline_model=pipeline_model)
                      lg_model: LogisticRegressionModel = pipeline_model.stages[1]
                      for item in lg_model.summary.objectiveHistory:
                          print(item)
          
                      # 保存最终结果
                  destruction.data_result = data_result
                  # 保存存储的格式,需要与打包的配置文件对应
                  destruction.output_type = FileExtractFormatEnum.MODEL.value
          ```
        
          注意:
        
          ------
        
          如果需要训练模型的话，一般情况下
        
          1.将operation中设置为mapping_data=False,因为一般我们需要自己根据标识来确定怎样处理特征数据
        
          2.将data_result需要设置为TrainModelResult实例，其中TrainModelResult包括的数据有train_data和pipeline_model，即训练数据和管道模型
        
          3.最后需要设置解构回调对象的output_type为model格式`destruction.output_type = FileExtractFormatEnum.MODEL.value`
Platform: any
Description-Content-Type: text/markdown
