数据转换(Transform)

Recurve 的 Transfer Operator 中支持用户自定义 Python 代码来实现复杂灵活的数据转换,比如增减字段,转换格式等。这种转换是流式的,每次只会处理一行数据。

配置 Transform 过程

在 Transfer Operator 的配置页面中,提供了 Transform 的配置项。可输入 Python 代码,系统在运行时会自动调用。页面上提供默认一个默认的代码结构,可根据此进行调整。

from recurvedata.core.transformer import Transformer


class MyTransformer(Transformer):
    def transform_impl(self, row, *args, **kwargs):
        return row


transformer = MyTransformer()
  • 可在 transform_impl 方法中添加自定义的数据处理逻辑,其中 rowOrderedDict (即有序的字典)类型,可使用 row["col"] 进行引用。

  • 输入参数 row 中的字段顺序,表示一行数据,与上游表结构或 SELECT 中的顺序语句保持一致。

  • 返回值允许多种情况:

    • 返回一个 OrderedDict 对象,表示返回一行数据

    • 返回包含多个 OrderedDict 对象的数组,表示将一行数据裂变成多行数据

    • 上述两种情形,其字段顺序及类型应与下游 Load 阶段中的表结构保持一致

    • 返回 None, 表示该行数据被丢弃

Transformer 内置方法

Transformer 基类提供了一系列内置方法,可用于常见的数据处理,可直接使用 self 调用。

解压缩

  • mysql_uncompress(value: bytes, return_str=False) -> str | bytes

等价于 MySQL 的 UNCOMPRESS 函数,可将压缩后的结果解压出来。

  • mysql_compress(value: str) -> bytes | None

等价于 MySQL 的 COMPRESS 函数。

JSON 反序列化和序列化

  • json_loads(*args, **kwargs)

  • json_dumps(*args, **kwargs)

加解密、哈希

  • aes_encrypt(key_name: str, data: str | bytes, context: str = DEFAULT) -> str

对数据进行 AES 加密,采用 CBC 模式,返回加密后的字符串。其中 key_name 表示于密钥名,运行时会替换成真实密钥(详见 数据脱敏)。该函数默认使用固定的 IV,因此对于相同的明文会生成相同的密文。可以传入 context来修改该行为:传入 None 会生成随机 IV,其他字符串则会进行 sha256 之后取前 16 位作为 IV.

  • aes_decrypt(key_name: str, data: bytes | str) -> str

aes_encrypt 相对,返回解密后的字符串。

  • base64_encode(data: str | bytes) -> str

将字符串或字节数据编码为 Base64 格式,返回编码后的字符串。

  • base64_decode(data: str | bytes) -> str

将 Base64 编码的数据解码为原始字符串,返回解码后的结果。

  • md5(data: str | bytes) -> str

计算字符串或字节数据的 MD5 哈希值,返回小写的 32 位十六进制字符串。

  • sha1(data: str | bytes) -> str

计算字符串或字节数据的 SHA-1 哈希值,返回小写的 40 位十六进制字符串。

  • sha256(data: str | bytes) -> str

计算字符串或字节数据的 SHA-256 哈希值,返回小写的 64 位十六进制字符串。

Python 库

系统底层使用 Python 3.11 环境,所有内置的标准库(如 datetime, json, re 等)均可使用,也支持常用的第三方库。部分三方库及其版本如下所示,有可能会更新,若有新增库的需求,可随时联系我们。

cryptography==44.0.0
cytoolz==1.0.1
httpx==0.28.1
humanize==4.11.0
jmespath==0.10.0
jsonschema==4.23.0
lxml==5.3.0
numpy==2.2.1
orjson==3.10.14
pandas==2.2.3
pendulum==3.0.0
pydantic==2.10.5
python-dateutil==2.9.0.post0
pytz==2024.2
pyyaml==6.0.2
requests==2.32.3
sqlalchemy==2.0.37
tenacity==8.5.0
ujson==5.10.0
urllib3==2.3.0

示例

import datetime

from recurvedata.core.transformer import Transformer


class MyTransformer(Transformer):
    def transform_impl(self, row, *args, **kwargs):
        # 将名字转换为全小写
        row["name"] = row["name"].lower()
        
        # 根据手机号计算一个 MD5 值
        row["mobile_md5"] = self.md5(row["mobile"])
        
        # 从 JSON 中提取内容
        event_detail = self.json_loads(row["event_detail"])
        row["event_name"] = event_detail["event_detail"]
        row["event_time"] = datetime.datetime.fromtimestamp(event_detail["timestamp"])

        return row


transformer = MyTransformer()

Last updated