最近更新时间:2026-06-08 16:37:55
金山云数据处理平台支持多种类型的计算任务,包括Ray任务、Spark任务和数据同步任务,可用于大规模数据的分布式处理、批处理和跨存储系统的数据迁移。
本文档基于图片批量处理场景,介绍如何在数据处理平台上创建和运行以上三种任务类型。
代码文件、资源组、KS3数据必须在同一个Region,否则任务无法访问代码包或数据。
Head和Worker内存建议设置为2GB或以上,资源不足可能导致任务失败。
处理KS3文件需要使用boto3包(Ray任务场景)。
KS3路径和文件路径必须以/结尾(数据同步任务场景)。
已创建1个及以上通用类型的资源池,资源池内已安装数据处理组件,具体操作参见资源池。
资源池中已创建1个及以上的队列,具体操作参见队列。
建议在资源池中开启关联托管Prometheus、Klog服务,以便后续查看任务监控及日志。
示例说明:将KS3桶里的图片进行批量裁剪操作。
登录对象存储KS3控制台。
在左侧导航栏,选择存储空间,点击待上传代码数据和图片数据的空间名称。
该存储空间需与后续创建的数据处理任务,在同一个地区。
在存储空间详情页面,选择内容管理,点击创建目录,输入目录名为/python,并将已保存为zip压缩包形式的代码文件上传至该目录。具体代码内容参考代码示例ray_images.py。
返回根目录,点击创建目录,在弹出的提示窗口中,输入目录名为/datasets。在目录下,依次创建如下目录,用于存储数据。
请严格按照本实践给出的名称进行创建,避免示例代码运行错误。
/flowers-min/ :用于保存待处理的图片文件,点击上传文件,选择待处理的图片进行上传即可。本实践的示例代码支持处理jpg、jpeg或png格式图片。
/process-demo-output/:用于保存处理完成后的文件。
登录星流平台训推平台,在顶部导航栏,数据准备 > 数据处理。
在数据处理页,单击+新建,选择新建Ray任务。
在新建Ray任务页面中,按照如下要求,配置各项参数。
任务名称:自定义即可。
资源类型:选择自运维资源。
资源组:选择准备工作中提前创建好的资源组。
队列:选择准备工作中提前创建好的队列。
AK/SK:填写当前登录账号的AccessKeyID和密钥,具体获取方式可参考AK/SK相关内容。
Entrypoint.cmd:指定Python脚本启动文件,本次实践为python ray_images.py。
高级设置:展现相应下拉框,依次设置如下参数:
Head核数:调整为≥2,本实践为2。
Head内存:调整为≥2Gi,本实践为4Gi。
Worker核数:调整为≥2,本实践为2。
Worker内存:调整为≥2Gi,本实践为4Gi。
Runtime_env:填写JSON格式的working_dir,表示启动文件所在的KS3路径,本实践为{"working_dir":"s3://<bucket-name>/python/ray_images.zip"},其中<bucket-name>为上传了数据和代码文件的KS3存储空间名称,需根据实际情况进行修改。
其他参数保持默认或根据实际情况进行修改即可。
完成上述配置后,点击确定。
参数说明:
INPUT_S3_PREFIX:待处理数据保存路径,其中<bucket-name>为上传了数据和代码文件的KS3存储空间名称,需根据实际情况进行修改。
OUTPUT_S3_PREFIX:处理完成后的文件所保存的路径,其中<bucket-name>为上传了数据和代码文件的KS3存储空间名称,需根据实际情况进行修改。
AK、SK:当前登录账号的AccessKeyID和密钥,具体获取方式可参考AK/SK相关内容。
import io
import os
import ray
import boto3
from PIL import Image
from botocore.config import Config
# ========== 配置 ==========
INPUT_S3_PREFIX = "s3://<bucket-name>/datasets/flowers-min/"
OUTPUT_S3_PREFIX = "s3://<bucket-name>/datasets/process-demo-output/ray-images/"
RESIZE_TO = (256, 256)
# ==== 配置您的KS3信息 ====
AK = "xxxxxx"
SK = "xxxxxx"
ENDPOINT = "http://ks3-cn-ningxia-internal.ksyuncs.com"
def _make_s3_client():
return boto3.client(
"s3",
aws_access_key_id=AK,
aws_secret_access_key=SK,
endpoint_url=ENDPOINT,
region_name="BEIJING",
config=Config(s3={"addressing_style": "virtual"}),
)
s3 = _make_s3_client()
def parse_s3_path(s3_path: str):
assert s3_path.startswith("s3://")
path = s3_path[5:]
bucket, prefix = path.split("/", 1)
return bucket, prefix
def list_s3_images(bucket, prefix):
keys = []
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if key.lower().endswith((".jpg", ".jpeg", ".png")):
keys.append(key)
return keys
@ray.remote
def resize_image_task(bucket: str, input_key: str, output_bucket: str, output_prefix: str):
s3_client = _make_s3_client()
obj = s3_client.get_object(Bucket=bucket, Key=input_key)
image_bytes = obj["Body"].read()
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
img = img.resize(RESIZE_TO)
buf = io.BytesIO()
img.save(buf, format="JPEG")
buf.seek(0)
filename = os.path.basename(input_key)
out_key = f"{output_prefix.rstrip('/')}/{filename}"
s3_client.put_object(
Bucket=output_bucket,
Key=out_key,
Body=buf.getvalue(),
ContentType="image/jpeg",
)
return f"s3://{output_bucket}/{out_key}"
def main():
ray.init()
in_bucket, in_prefix = parse_s3_path(INPUT_S3_PREFIX)
out_bucket, out_prefix = parse_s3_path(OUTPUT_S3_PREFIX)
image_keys = list_s3_images(in_bucket, in_prefix)
print(f"Found {len(image_keys)} images")
tasks = [
resize_image_task.remote(in_bucket, key, out_bucket, out_prefix)
for key in image_keys
]
results = ray.get(tasks)
print("Done:")
for r in results:
print(r)
if __name__ == "__main__":
main()示例说明:本实践的Spark任务为计算圆周率。
点击 spark-examples_2.12-3.3.1.jar下载作业示例,本实践为计算圆周率。
登录对象存储KS3控制台。
在左侧导航栏,选择存储空间,点击待上传代码数据空间名称。
在存储空间详情页面,选择内容管理,点击创建目录,输入目录名为/python。
将已下载的.jar代码文件上传至该目录。
登录星流平台训推平台,在顶部导航栏,数据准备 > 数据处理。
在数据处理页,单击+新建,选择新建Spark任务。
在新建任务页面中,按照如下要求,配置各项参数。
任务名称:自定义即可。
资源类型:选择自运维资源。
资源组:选择准备工作中提前创建好的资源组。
队列:选择准备工作中提前创建好的队列。
AK/SK:填写当前登录账号的AccessKeyID和密钥,具体获取方式可参考AK/SK相关内容。
AppResource:填写Spark任务代码包的KS3位置,如ks3://<bucket-name>/python/spark-examples_2.12-3.3.1.jar,其中<bucket-name>为上传了代码文件的KS3存储空间名称,需根据实际情况进行修改。
Class:指定Spark任务的入口类(主类),该类必须包含main方法,Spark会执行该类的main方法作为程序起点,本实践为org.apache.spark.examples.SparkPi。若运行Python脚本,则填写org.apache.spark.deploy.PythonRunner。
高级设置:展现相应下拉框,依次设置如下参数:
Driver核数:调整为≥2,本实践为2。
Driver内存:调整为≥2Gi,本实践为4Gi。
Executor数量:本实践为2。
Executor核数:本实践为2。
Executor内存:本实践为4Gi。
其他参数保持默认或根据实际情况进行修改即可。
完成上述配置后,点击确定。
等待数据处理任务状态变为运行中或完成。
在数据处理任务列表中,点击指定任务名称/ID,可进入其详情页面.
若当前任务在运行中,则可查看相应监控数据。其他状态可直接点击日志页签,查看相应日志。
示例说明:将KS3数据同步至KPFS性能型文件存储。
将同步源所在的桶和同步目标所在桶的路径配置到存储配置里。
创建KS3存储配置。注意,此处选择待同步数据的KS3实例和路径。
创建KPFS存储配置。注意,此处所选的文件系统路径容量配额须大于待同步数据的大小。
登录星流平台训推平台,在顶部导航栏,数据准备 > 数据处理。
在数据处理页,单击+新建,选择新建Spark任务。
在新建任务页面中,按照如下要求,配置各项参数。
任务名称:自定义即可。
资源类型:选择自运维资源。
资源组:选择准备工作中提前创建好的资源组。
队列:选择准备工作中提前创建好的队列。
AK/SK:填写当前登录账号的AccessKeyID和密钥,具体获取方式可参考AK/SK相关内容。
同步源:选择源数据所在的KS3存储配置路径。注意,KS3路径和文件路径要以/结尾。
同步目标:选择数据需要至的目标KPFS存储配置路径。
完成上述配置后,点击确定。
等待数据处理任务状态变为运行中或完成。
在数据处理任务列表中,点击指定任务名称/ID,可进入其详情页面.
若当前任务在运行中,则可查看相应监控数据。其他状态可直接点击日志页签,查看相应日志。
任务完成后,可点击同步目标所在的KPFS存储配置,检查数据是否完整。
纯净模式
