# 数据转换（Transform）

Recurve 的 Transfer Operator 中支持用户自定义 Python 代码来实现复杂灵活的数据转换，比如增减字段，转换格式等。这种转换是流式的，每次只会处理一行数据。

## 配置 Transform 过程

在 Transfer Operator 的配置页面中，提供了 Transform 的配置项。可输入 Python 代码，系统在运行时会自动调用。页面上提供默认一个默认的代码结构，可根据此进行调整。

```python
from recurvedata.core.transformer import Transformer


class MyTransformer(Transformer):
    def transform_impl(self, row, *args, **kwargs):
        return row


transformer = MyTransformer()
```

* 可在 `transform_impl` 方法中添加自定义的数据处理逻辑，其中 `row` 是 `OrderedDict` （即有序的字典）类型，可使用 `row["col"]` 进行引用。
* 输入参数 `row` 中的字段顺序，表示一行数据，与上游表结构或 `SELECT` 中的顺序语句保持一致。
* 返回值允许多种情况：
  * 返回一个 `OrderedDict` 对象，表示返回一行数据
  * 返回包含多个 `OrderedDict` 对象的数组，表示将一行数据裂变成多行数据
  * 上述两种情形，其字段顺序及类型应与下游 Load 阶段中的表结构保持一致
  * 返回 `None`， 表示该行数据被丢弃

## Transformer 内置方法

`Transformer` 基类提供了一系列内置方法，可用于常见的数据处理，可直接使用 `self` 调用。

### 解压缩

* `mysql_uncompress(value: bytes, return_str=False) -> str | bytes`

等价于 MySQL 的 UNCOMPRESS 函数，可将压缩后的结果解压出来。

* `mysql_compress(value: str) -> bytes | None`

等价于 MySQL 的 `COMPRESS` 函数。

### JSON 反序列化和序列化

* `json_loads(*args, **kwargs)`
* `json_dumps(*args, **kwargs)`

### 加解密、哈希

* `aes_encrypt(key_name: str, data: str | bytes, context: str = DEFAULT) -> str`

对数据进行 AES 加密，采用 CBC 模式，返回加密后的字符串。其中 `key_name` 表示于密钥名，运行时会替换成真实密钥（详见 [数据脱敏](/cn-reorc-help-center/data-security/shu-ju-tuo-min.md)）。该函数默认使用固定的 IV，因此对于相同的明文会生成相同的密文。可以传入 `context`来修改该行为：传入 None 会生成随机 IV，其他字符串则会进行 sha256 之后取前 16 位作为 IV.

* `aes_decrypt(key_name: str, data: bytes | str) -> str`

与 `aes_encrypt` 相对，返回解密后的字符串。

* `base64_encode(data: str | bytes) -> str`

将字符串或字节数据编码为 Base64 格式，返回编码后的字符串。

* `base64_decode(data: str | bytes) -> str`

将 Base64 编码的数据解码为原始字符串，返回解码后的结果。

* `md5(data: str | bytes) -> str`

计算字符串或字节数据的 MD5 哈希值，返回小写的 32 位十六进制字符串。

* `sha1(data: str | bytes) -> str`

计算字符串或字节数据的 SHA-1 哈希值，返回小写的 40 位十六进制字符串。

* `sha256(data: str | bytes) -> str`

计算字符串或字节数据的 SHA-256 哈希值，返回小写的 64 位十六进制字符串。

## Python 库

系统底层使用 Python 3.11 环境，所有内置的标准库（如 `datetime`, `json`, `re` 等）均可使用，也支持常用的第三方库。部分三方库及其版本如下所示，有可能会更新，若有新增库的需求，可随时联系我们。

```
cryptography==44.0.0
cytoolz==1.0.1
httpx==0.28.1
humanize==4.11.0
jmespath==0.10.0
jsonschema==4.23.0
lxml==5.3.0
numpy==2.2.1
orjson==3.10.14
pandas==2.2.3
pendulum==3.0.0
pydantic==2.10.5
python-dateutil==2.9.0.post0
pytz==2024.2
pyyaml==6.0.2
requests==2.32.3
sqlalchemy==2.0.37
tenacity==8.5.0
ujson==5.10.0
urllib3==2.3.0
```

## 示例

```python
import datetime

from recurvedata.core.transformer import Transformer


class MyTransformer(Transformer):
    def transform_impl(self, row, *args, **kwargs):
        # 将名字转换为全小写
        row["name"] = row["name"].lower()
        
        # 根据手机号计算一个 MD5 值
        row["mobile_md5"] = self.md5(row["mobile"])
        
        # 从 JSON 中提取内容
        event_detail = self.json_loads(row["event_detail"])
        row["event_name"] = event_detail["event_detail"]
        row["event_time"] = datetime.datetime.fromtimestamp(event_detail["timestamp"])

        return row


transformer = MyTransformer()
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.reorc.com/cn-reorc-help-center/data-ingestion/shu-ju-zhuan-huan-transform.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
