管道服务
虽然用 Python 生成器很容易就实现一个管道服务功能,但 Noba 的管道服务还是要灵活一些。目前管道功能还是比较简单,仅仅实现了通过管道对某个对象1顺序进行加工处理的功能。
一次完整的管道服务代码包括这几个步骤:创建加工类、创建管道服务、通过管道加工对象。最后会介绍管道的进阶用法
创建加工类
加工类是指一系列对需要被加工对象进行处理的类。本例在 pipeline
文件夹下创建 handle_data.py
代码如下:
# pipeline/handle_data.py
class ChangeFieldName():
def handle(self, data, next, *what):
print("更改列名")
# 省略具体加工代码
return next(data)
class ChangeDataType():
def handle(self, data, next, *what):
print("更改数据类型")
# 省略具体加工代码
return next(data)
class RepeatRowData():
def handle(self, data, next, *what):
print("删除重复行")
# 省略具体加工代码
return next(data)
class ExceptionData():
def handle(self, data, next, *what):
print("处理异常数据")
# 省略具体加工代码
return next(data)
class MissingData():
def handle(self, data, next, *what):
print("处理缺失数据")
# 省略具体加工代码
return next(data)
创建管道服务
管道服务是通过 ioc 容器创建出来的
# main.py
from noba import core
if __name__ == '__main__':
pipeline_service = core.make('pipeline')
通过管道加工对象
Noba 的管道目前是单向管道。通过管道可以把加工类顺序作用在需要被加工的对象上
# main.py
from noba import core
def target(data):
# 省略一些逻辑代码
return data
if __name__ == '__main__':
pipeline_service = core.make('pipeline')
# 通过 ioc 创建 pandas 服务
pd = core.make('pd')
# processed_data 为处理前的原始数据
raw_data = pd.DataFrame([[10.2, 9.5, None, 9.2],[11, 8.5, None, 9.1],[None, None, None, None],[12.3, 13, 13.1, 12.1]],columns=['open', 'close', 'high', 'low'])
# pipeline 为处理管道
pipeline = ['handle_data.ChangeFieldName', 'handle_data.ChangeDataType', 'handle_data.RepeatRowData', 'handle_data.ExceptionData', 'handle_data.MissingData']
# processed_data 为处理后的数据
processed_data = pipeline_service.send(raw_data).through(pipeline).then(target)
管道的进阶用法
- 在第 3 个例子中定义
pipeline
中的加工类时并没有使用完整的路径2。原因是 Noba 默认加工类都是放在pipeline
文件夹下的。可以通过如下代码来更改默认路径。# main.py pipeline_service.folder('your pipleline folder')
- 默认加工类的处理方法是
handle
。可以通过如下代码来更改处理方法。# main.py pipeline_service.via("handle_pipeline")
- 定义管道时可以为加工类传递额外参数。如下面代码就给
ChangeFieldName
的handle
方法传递了参数('close price', 'open price')
。# main.py pipeline = ['handle_data.ChangeFieldName:close price,open price', 'handle_data.ChangeDataType', 'handle_data.RepeatRowData', 'handle_data.ExceptionData', 'handle_data.MissingData']
1
对象可以是一个数据对象(比如需要对某个 dataframe 填补空值、修正异常值等数据清理)或则其他任何需要顺序加工的对象
2
以 ChangeFieldName
类为例子,完整路径应该是 pipeline.handle_data.ChangeFieldName
。但在第 3 例时,定义的路径是 handle_data.ChangeFieldName