全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

数据处理操作指南

最近更新时间:2026-03-26 12:26:41

一、前置条件

  1. 已创建1个及以上通用类型的资源池,资源池具体操作参见资源池,资源池内需安装数据处理组件

  2. 资源池中已创建1个及以上的队列,具体操作参见队列

  3. 建议您在资源池中开启关联托管Prometheus、Klog服务,以便后续查看任务监控及日志。

二、新建Ray任务

1.入口

数据准备-数据处理-新建-新建Ray任务

2.步骤

示例说明:将KS3桶里的图片进行批量裁剪操作

前置操作

  • 将数据处理的代码文件ray_images.py打包成zip包放入对应资源组所在region的ks3桶中

  • 图片数据存放在相应的ks3路径s3://bucket/datasets/flowers-min/

  • 创建输出路径s3://bucket/datasets/process-demo-output/

填写基础信息

填写任务名称、选择优先级、填写任务描述

填写任务配置

  • 配置项

    • AKSK

    • Entrypoint.cmd,填写python脚本启动文件

      • eg: python ray_images.py

    • Head核数调整为>=2

    • Work核数调整为>=2

    • Runtime_env填写json格式的working_dir,表示启动文件所在的ks3路径

      • eg: {"working_dir":"s3://bucket/python/ray_images.zip"}

  • 注意事项

配置项

说明

示例

Region 一致性

代码文件存放的 KS3 region 必须和资源组在同一个 region

代码在 cn-beijing-6,资源组也要在 cn-beijing-6

启动命令

在 Entrypoint.cmd 中填写,通过 python 命令指定启动文件

python ray_image_processing.py

RuntimeEnv

代码文件路径,需写成 JSON 格式

{"working_dir":"s3://bucket-name/python/ray_images.zip"}

内存配置

Head 和 Worker 内存建议设置为 2GB 或以上

资源充足时建议 ≥ 2GB

依赖包

处理 KS3 文件需要使用 boto3 包

boto3 包支持读取ks3文件,具体可以看示例代码

  • 要点:

    • 代码文件、资源组、KS3 数据必须在同一个 region

    • 启动命令:python ray_image_processing.py

    • RuntimeEnv 使用 JSON 格式:{"working_dir":"s3://bucket/path/to/code.zip"}

    • 内存建议 ≥ 2GB(Head 和 Worker)

    • 需要安装 boto3 包访问 KS3

选择资源并提交

  • 自运维资源:使用关联KCE集群方式创建资源池

  • serverless资源:潮汐数据清洗场景下使用Serverless资源,无需繁琐的整机扩缩容操作,满足资源灵活弹性、低成本需求。

查看任务详情

附件:示例代码

import io
import os
import ray
import boto3
from PIL import Image
from botocore.config import Config

# ========== 配置 ==========
INPUT_S3_PREFIX = "s3://bucket/datasets/flowers-min/"
OUTPUT_S3_PREFIX = "s3://bucket/datasets/process-demo-output/ray-images/"
RESIZE_TO = (256, 256)
# ==== 配置你的 KS3 信息 ====
AK = "xxxxxx"
SK = "xxxxxx"
ENDPOINT = "http://ks3-cn-ningxia-internal.ksyuncs.com"  # HTTP 内网

def _make_s3_client():
    return boto3.client(
        "s3",
        aws_access_key_id=AK,
        aws_secret_access_key=SK,
        endpoint_url=ENDPOINT,
        region_name="BEIJING",
        config=Config(s3={"addressing_style": "virtual"}),
    )


# 主进程用:列 list 等
s3 = _make_s3_client()


# ========== 工具函数 ==========
def parse_s3_path(s3_path: str):
    """
    s3://bucket/prefix -> (bucket, prefix)
    """
    assert s3_path.startswith("s3://")
    path = s3_path[5:]
    bucket, prefix = path.split("/", 1)
    return bucket, prefix


def list_s3_images(bucket, prefix):
    keys = []

    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            if key.lower().endswith((".jpg", ".jpeg", ".png")):
                keys.append(key)

    return keys


# ========== Ray 并行任务 ==========
@ray.remote
def resize_image_task(
        bucket: str,
        input_key: str,
        output_bucket: str,
        output_prefix: str,
):
  
    s3_client = _make_s3_client()

    # 1. 读取图片
    obj = s3_client.get_object(Bucket=bucket, Key=input_key)
    image_bytes = obj["Body"].read()

    img = Image.open(io.BytesIO(image_bytes)).convert("RGB")

    # 2. resize
    img = img.resize(RESIZE_TO)

    # 3. 写回 S3
    buf = io.BytesIO()
    img.save(buf, format="JPEG")
    buf.seek(0)

    filename = os.path.basename(input_key)
    out_key = f"{output_prefix.rstrip('/')}/{filename}"

    s3_client.put_object(
        Bucket=output_bucket,
        Key=out_key,
        Body=buf.getvalue(),
        ContentType="image/jpeg",
    )

    return f"s3://{output_bucket}/{out_key}"


# ========== 主流程 ==========
def main():
    ray.init()

    in_bucket, in_prefix = parse_s3_path(INPUT_S3_PREFIX)
    out_bucket, out_prefix = parse_s3_path(OUTPUT_S3_PREFIX)

    image_keys = list_s3_images(in_bucket, in_prefix)
    print(f"Found {len(image_keys)} images")

    tasks = [
        resize_image_task.remote(
            in_bucket,
            key,
            out_bucket,
            out_prefix,
        )
        for key in image_keys
    ]

    results = ray.get(tasks)
    print("Done:")
    for r in results:
        print(r)


if __name__ == "__main__":
    main()

三、新建Spark任务

1.入口

数据准备-数据处理-新建-新建Spark任务

2.步骤

示例说明:计算 Pi

前置操作

  • 代码包准备

  • 将代码包放入对应资源组所在region的ks3桶中

填写基础信息

填写任务名称、选择优先级、填写任务描述

填写任务配置

  • 配置项

    • AKSK

    • AppResource: Spark 任务的代码包ks3位置

      • eg: ks3://bucket-test/python/spark-examples_2.12-3.3.1jar

    • Class:指定 Spark 任务的入口类(主类),这个类必须包含 main 方法,Spark 会执行这个类的 main 方法作为程序起点

      • eg: org.apache.spark.examples.SparkPi

    • Head核数调整为>=2

    • Work核数调整为>=2

  • 注意事项

配置项

说明

示例

Region 一致性

代码文件存放的 KS3 region 必须和资源组在同一个 region

代码在 cn-beijing-6,资源组也要在 cn-beijing-6

启动类Class

Python 任务填写 org.apache.spark.deploy.PythonRunner

用于运行 Python 脚本

启动类Class

Java 任务填写主类,如 org.apache.spark.examples.SparkPi

根据实际主类填写

  • 要点:

    • 代码文件、资源组、KS3 数据必须在同一个 region

    • Python 任务使用 org.apache.spark.deploy.PythonRunner

    • Java 任务填写实际的主类名

选择资源并提交

  • 自运维资源:使用关联KCE集群方式创建资源池

  • serverless资源:潮汐数据清洗场景下使用Serverless资源,无需繁琐的整机扩缩容操作,满足资源灵活弹性、低成本需求。

查看任务详情

四、新建数据同步任务

1.入口

数据准备-数据处理-新建-新建数据同步任务

2.步骤

示例说明:将KS3数据同步至KPFS性能型

前置操作

  • 存储配置,将同步源所在的桶和同步目标所在桶的路径配置到存储配置里

填写基础信息

填写任务名称、选择优先级、填写任务描述

填写任务配置

  • 配置项

    • 同步源:选择存储配置、填写路径

    • 同步目标:选择存储配置、填写路径

  • 注意事项

    • KS3路径和文件路径要以/结尾

选择资源并提交

  • 自运维资源:使用关联KCE集群方式创建资源池

  • serverless资源:潮汐数据清洗场景下使用Serverless资源,无需繁琐的整机扩缩容操作,满足资源灵活弹性、低成本需求。

查看任务详情

  • 可在任务详情中查看

  • 也可在数据同步任务完成后,可以在文件系统观察数据同步结果

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈