全部文档
当前文档

共搜索到 0 条结果

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

数据处理操作指南

最近更新时间:2026-06-08 16:37:55

背景信息

金山云数据处理平台支持多种类型的计算任务,包括Ray任务、Spark任务和数据同步任务,可用于大规模数据的分布式处理、批处理和跨存储系统的数据迁移。

本文档基于图片批量处理场景,介绍如何在数据处理平台上创建和运行以上三种任务类型。

约束限制

  • 代码文件、资源组、KS3数据必须在同一个Region,否则任务无法访问代码包或数据。

  • Head和Worker内存建议设置为2GB或以上,资源不足可能导致任务失败。

  • 处理KS3文件需要使用boto3包(Ray任务场景)。

  • KS3路径和文件路径必须以/结尾(数据同步任务场景)。

准备工作

  • 已创建1个及以上通用类型的资源池,资源池内已安装数据处理组件,具体操作参见资源池

  • 资源池中已创建1个及以上的队列,具体操作参见队列

  • 建议在资源池中开启关联托管Prometheus、Klog服务,以便后续查看任务监控及日志。

Ray任务

示例说明:将KS3桶里的图片进行批量裁剪操作。

步骤一:数据准备

  1. 登录对象存储KS3控制台

  2. 在左侧导航栏,选择存储空间,点击待上传代码数据和图片数据的空间名称。

    该存储空间需与后续创建的数据处理任务,在同一个地区

  3. 在存储空间详情页面,选择内容管理,点击创建目录,输入目录名/python,并将已保存为zip压缩包形式的代码文件上传至该目录。具体代码内容参考代码示例ray_images.py

  4. 返回根目录,点击创建目录,在弹出的提示窗口中,输入目录名/datasets。在目录下,依次创建如下目录,用于存储数据。

    请严格按照本实践给出的名称进行创建,避免示例代码运行错误。

    • /flowers-min/ :用于保存待处理的图片文件,点击上传文件,选择待处理的图片进行上传即可。本实践的示例代码支持处理jpgjpegpng格式图片。

    • /process-demo-output/:用于保存处理完成后的文件。

步骤二:新建Ray任务

  1. 登录星流平台训推平台,在顶部导航栏,数据准备 > 数据处理

  2. 在数据处理页,单击+新建,选择新建Ray任务

  3. 在新建Ray任务页面中,按照如下要求,配置各项参数。

    • 任务名称:自定义即可。

    • 资源类型:选择自运维资源

    • 资源组:选择准备工作中提前创建好的资源组。

    • 队列:选择准备工作中提前创建好的队列。

    • AK/SK:填写当前登录账号的AccessKeyID密钥,具体获取方式可参考AK/SK相关内容

    • Entrypoint.cmd:指定Python脚本启动文件,本次实践为python ray_images.py

    • 高级设置:展现相应下拉框,依次设置如下参数:

      • Head核数:调整为≥2,本实践为2。

      • Head内存:调整为≥2Gi,本实践为4Gi。

      • Worker核数:调整为≥2,本实践为2。

      • Worker内存:调整为≥2Gi,本实践为4Gi。

      • Runtime_env:填写JSON格式的working_dir,表示启动文件所在的KS3路径,本实践为{"working_dir":"s3://<bucket-name>/python/ray_images.zip"},其中<bucket-name>为上传了数据和代码文件的KS3存储空间名称,需根据实际情况进行修改。

    • 其他参数保持默认或根据实际情况进行修改即可。

  4. 完成上述配置后,点击确定。

步骤三:查看任务详情

  1. 等待数据处理任务状态变为运行中完成

  2. 数据处理任务列表中,点击指定任务名称/ID,可进入其详情页面。

  3. 若当前任务在运行中,则可查看相应监控数据。其他状态可直接点击日志页签,查看相应日志。

示例代码

参数说明:

  • INPUT_S3_PREFIX:待处理数据保存路径,其中<bucket-name>为上传了数据和代码文件的KS3存储空间名称,需根据实际情况进行修改。

  • OUTPUT_S3_PREFIX:处理完成后的文件所保存的路径,其中<bucket-name>为上传了数据和代码文件的KS3存储空间名称,需根据实际情况进行修改。

  • AK、SK:当前登录账号的AccessKeyID密钥,具体获取方式可参考AK/SK相关内容

import io
import os
import ray
import boto3
from PIL import Image
from botocore.config import Config

# ========== 配置 ==========
INPUT_S3_PREFIX = "s3://<bucket-name>/datasets/flowers-min/"
OUTPUT_S3_PREFIX = "s3://<bucket-name>/datasets/process-demo-output/ray-images/"
RESIZE_TO = (256, 256)
# ==== 配置您的KS3信息 ====
AK = "xxxxxx"
SK = "xxxxxx"
ENDPOINT = "http://ks3-cn-ningxia-internal.ksyuncs.com"

def _make_s3_client():
    return boto3.client(
        "s3",
        aws_access_key_id=AK,
        aws_secret_access_key=SK,
        endpoint_url=ENDPOINT,
        region_name="BEIJING",
        config=Config(s3={"addressing_style": "virtual"}),
    )

s3 = _make_s3_client()

def parse_s3_path(s3_path: str):
    assert s3_path.startswith("s3://")
    path = s3_path[5:]
    bucket, prefix = path.split("/", 1)
    return bucket, prefix

def list_s3_images(bucket, prefix):
    keys = []
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            if key.lower().endswith((".jpg", ".jpeg", ".png")):
                keys.append(key)
    return keys

@ray.remote
def resize_image_task(bucket: str, input_key: str, output_bucket: str, output_prefix: str):
    s3_client = _make_s3_client()
    obj = s3_client.get_object(Bucket=bucket, Key=input_key)
    image_bytes = obj["Body"].read()
    img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
    img = img.resize(RESIZE_TO)
    buf = io.BytesIO()
    img.save(buf, format="JPEG")
    buf.seek(0)
    filename = os.path.basename(input_key)
    out_key = f"{output_prefix.rstrip('/')}/{filename}"
    s3_client.put_object(
        Bucket=output_bucket,
        Key=out_key,
        Body=buf.getvalue(),
        ContentType="image/jpeg",
    )
    return f"s3://{output_bucket}/{out_key}"

def main():
    ray.init()
    in_bucket, in_prefix = parse_s3_path(INPUT_S3_PREFIX)
    out_bucket, out_prefix = parse_s3_path(OUTPUT_S3_PREFIX)
    image_keys = list_s3_images(in_bucket, in_prefix)
    print(f"Found {len(image_keys)} images")
    tasks = [
        resize_image_task.remote(in_bucket, key, out_bucket, out_prefix)
        for key in image_keys
    ]
    results = ray.get(tasks)
    print("Done:")
    for r in results:
        print(r)

if __name__ == "__main__":
    main()

Spark任务

示例说明:本实践的Spark任务为计算圆周率。

步骤一:数据准备

  1. 点击 spark-examples_2.12-3.3.1.jar下载作业示例,本实践为计算圆周率。

  2. 登录对象存储KS3控制台

  3. 在左侧导航栏,选择存储空间,点击待上传代码数据空间名称。

  4. 在存储空间详情页面,选择内容管理,点击创建目录,输入目录名/python

  5. 将已下载的.jar代码文件上传至该目录。

步骤二:新建spark任务

  1. 登录星流平台训推平台,在顶部导航栏,数据准备 > 数据处理

  2. 在数据处理页,单击+新建,选择新建Spark任务

  3. 在新建任务页面中,按照如下要求,配置各项参数。

    • 任务名称:自定义即可。

    • 资源类型:选择自运维资源

    • 资源组:选择准备工作中提前创建好的资源组。

    • 队列:选择准备工作中提前创建好的队列。

    • AK/SK:填写当前登录账号的AccessKeyID密钥,具体获取方式可参考AK/SK相关内容

    • AppResource:填写Spark任务代码包的KS3位置,如ks3://<bucket-name>/python/spark-examples_2.12-3.3.1.jar,其中<bucket-name>为上传了代码文件的KS3存储空间名称,需根据实际情况进行修改。

    • Class:指定Spark任务的入口类(主类),该类必须包含main方法,Spark会执行该类的main方法作为程序起点,本实践为org.apache.spark.examples.SparkPi。若运行Python脚本,则填写org.apache.spark.deploy.PythonRunner

    • 高级设置:展现相应下拉框,依次设置如下参数:

      • Driver核数:调整为≥2,本实践为2。

      • Driver内存:调整为≥2Gi,本实践为4Gi。

      • Executor数量:本实践为2

      • Executor核数:本实践为2。

      • Executor内存:本实践为4Gi。

    • 其他参数保持默认或根据实际情况进行修改即可。

  4. 完成上述配置后,点击确定。

步骤三:查看任务详情

  1. 等待数据处理任务状态变为运行中完成

  2. 数据处理任务列表中,点击指定任务名称/ID,可进入其详情页面.

  3. 若当前任务在运行中,则可查看相应监控数据。其他状态可直接点击日志页签,查看相应日志。

数据同步任务

示例说明:将KS3数据同步至KPFS性能型文件存储

步骤一:存储配置准备

将同步源所在的桶和同步目标所在桶的路径配置到存储配置里。

  1. 创建KS3存储配置。注意,此处选择待同步数据的KS3实例和路径。

  2. 创建KPFS存储配置。注意,此处所选的文件系统路径容量配额须大于待同步数据的大小。

步骤二:新建数据同步任务

  1. 登录星流平台训推平台,在顶部导航栏,数据准备 > 数据处理

  2. 在数据处理页,单击+新建,选择新建Spark任务

  3. 在新建任务页面中,按照如下要求,配置各项参数。

    • 任务名称:自定义即可。

    • 资源类型:选择自运维资源

    • 资源组:选择准备工作中提前创建好的资源组。

    • 队列:选择准备工作中提前创建好的队列。

    • AK/SK:填写当前登录账号的AccessKeyID密钥,具体获取方式可参考AK/SK相关内容

    • 同步源:选择源数据所在的KS3存储配置路径。注意,KS3路径和文件路径要以/结尾。

    • 同步目标:选择数据需要至的目标KPFS存储配置路径。

  4. 完成上述配置后,点击确定。

步骤三:查看任务详情

  1. 等待数据处理任务状态变为运行中完成

  2. 数据处理任务列表中,点击指定任务名称/ID,可进入其详情页面.

  3. 若当前任务在运行中,则可查看相应监控数据。其他状态可直接点击日志页签,查看相应日志。

  4. 任务完成后,可点击同步目标所在的KPFS存储配置,检查数据是否完整。

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈