管道服务

虽然用 Python 生成器很容易就实现一个管道服务功能,但 Noba 的管道服务还是要灵活一些。目前管道功能还是比较简单,仅仅实现了通过管道对某个对象1顺序进行加工处理的功能。

一次完整的管道服务代码包括这几个步骤:创建加工类创建管道服务通过管道加工对象。最后会介绍管道的进阶用法

创建加工类

加工类是指一系列对需要被加工对象进行处理的类。本例在 pipeline 文件夹下创建 handle_data.py 代码如下:

# pipeline/handle_data.py

class ChangeFieldName():
    def handle(self, data, next, *what):
        print("更改列名")
        # 省略具体加工代码
        return next(data)
        
class ChangeDataType():
    def handle(self, data, next, *what):
        print("更改数据类型")
        # 省略具体加工代码
        return next(data)
        
class RepeatRowData():
    def handle(self, data, next, *what):
        print("删除重复行")
        # 省略具体加工代码
        return next(data)

class ExceptionData():
    def handle(self, data, next, *what):
        print("处理异常数据")
        # 省略具体加工代码
        return next(data)

class MissingData():
    def handle(self, data, next, *what):
        print("处理缺失数据")
        # 省略具体加工代码
        return next(data)

创建管道服务

管道服务是通过 ioc 容器创建出来的

# main.py

from noba import core

if __name__ == '__main__':
    pipeline_service = core.make('pipeline')

通过管道加工对象

Noba 的管道目前是单向管道。通过管道可以把加工类顺序作用在需要被加工的对象上

# main.py

from noba import core

def target(data):
    # 省略一些逻辑代码
    return data    


if __name__ == '__main__':
    pipeline_service = core.make('pipeline')

    # 通过 ioc 创建 pandas 服务
    pd = core.make('pd')

    # processed_data 为处理前的原始数据
    raw_data = pd.DataFrame([[10.2, 9.5, None, 9.2],[11, 8.5, None, 9.1],[None, None, None, None],[12.3, 13, 13.1, 12.1]],columns=['open', 'close', 'high', 'low'])

    # pipeline 为处理管道
    pipeline = ['handle_data.ChangeFieldName', 'handle_data.ChangeDataType', 'handle_data.RepeatRowData', 'handle_data.ExceptionData', 'handle_data.MissingData']


    # processed_data 为处理后的数据
    processed_data = pipeline_service.send(raw_data).through(pipeline).then(target)

管道的进阶用法

  1. 在第 3 个例子中定义 pipeline 中的加工类时并没有使用完整的路径2。原因是 Noba 默认加工类都是放在 pipeline 文件夹下的。可以通过如下代码来更改默认路径。
    # main.py
    
    pipeline_service.folder('your pipleline folder')
    
  2. 默认加工类的处理方法是 handle。可以通过如下代码来更改处理方法。
    # main.py
    
    pipeline_service.via("handle_pipeline")
    
  3. 定义管道时可以为加工类传递额外参数。如下面代码就给 ChangeFieldNamehandle方法传递了参数 ('close price', 'open price')
    # main.py
    
    pipeline = ['handle_data.ChangeFieldName:close price,open price', 'handle_data.ChangeDataType', 'handle_data.RepeatRowData', 'handle_data.ExceptionData', 'handle_data.MissingData']
    

1

对象可以是一个数据对象(比如需要对某个 dataframe 填补空值、修正异常值等数据清理)或则其他任何需要顺序加工的对象

2

ChangeFieldName 类为例子,完整路径应该是 pipeline.handle_data.ChangeFieldName。但在第 3 例时,定义的路径是 handle_data.ChangeFieldName