自定义 Python

PythonDumpTask

PythonDumpTask 是一种特殊的数据源,可以自定义 Python 代码来实现其他数据源无法满足的需求。 如写 Python 脚本从网上下载一个 CSV 文件作为输入文件来导入到 Hive.

Python Code

要输入的 Python 代码,需要满足两个条件

  • 必须实现 execute,第一个参数是 filename. 该函数作为入口给 OneFlow 调用。

  • execute 负责把数据以 CSV 格式写入到 filename

如果程序内要用到数据库配置,可以使用 OneFlow 提供的 DataSource,不要在代码里暴露数据库配置和密码。 定义 execute 函数时,使用特殊的参数名来指定需要的数据源 OneFlow 在执行时会传入对应的 pigeon connector 对象,可用于数据库交互。 参数名格式:datasource_xxx,即必须要有 datasource_ 前缀。如:datasource_impala='ym_impala_default' 在运行时,OneFlow 会传入 pigeon.connector.ImpalaConnector 对象。使用 PythonDump 时,Load Task 需要指定 Create Table DDL,因为 OneFlow 无法自动推导出数据的 schema.可以参考 TiDB Disk Monitorarrow-up-right, 这里截取 execute 函数的实现

def execute(filename, datasource_tidb='ym_tidb_default', *args, **kwargs):
    m = Monitor(datasource_tidb)
    df = m.execute()
    df.to_csv(filename, index=False, header=False, quoting=csv.QUOTE_ALL)

Last updated