最近更新时间:2026-03-26 12:26:41
已创建1个及以上通用类型的资源池,资源池具体操作参见资源池,资源池内需安装数据处理组件。
资源池中已创建1个及以上的队列,具体操作参见队列。
建议您在资源池中开启关联托管Prometheus、Klog服务,以便后续查看任务监控及日志。
数据准备-数据处理-新建-新建Ray任务
示例说明:将KS3桶里的图片进行批量裁剪操作
将数据处理的代码文件ray_images.py打包成zip包放入对应资源组所在region的ks3桶中
图片数据存放在相应的ks3路径s3://bucket/datasets/flowers-min/
创建输出路径s3://bucket/datasets/process-demo-output/
填写任务名称、选择优先级、填写任务描述
配置项
AKSK
Entrypoint.cmd,填写python脚本启动文件
eg: python ray_images.py
Head核数调整为>=2
Work核数调整为>=2
Runtime_env填写json格式的working_dir,表示启动文件所在的ks3路径
eg: {"working_dir":"s3://bucket/python/ray_images.zip"}
注意事项
配置项 | 说明 | 示例 |
Region 一致性 | 代码文件存放的 KS3 region 必须和资源组在同一个 region | 代码在 |
启动命令 | 在 Entrypoint.cmd 中填写,通过 python 命令指定启动文件 |
|
RuntimeEnv | 代码文件路径,需写成 JSON 格式 |
|
内存配置 | Head 和 Worker 内存建议设置为 2GB 或以上 | 资源充足时建议 ≥ 2GB |
依赖包 | 处理 KS3 文件需要使用 boto3 包 | boto3 包支持读取ks3文件,具体可以看示例代码 |
要点:
代码文件、资源组、KS3 数据必须在同一个 region
启动命令:python ray_image_processing.py
RuntimeEnv 使用 JSON 格式:{"working_dir":"s3://bucket/path/to/code.zip"}
内存建议 ≥ 2GB(Head 和 Worker)
需要安装 boto3 包访问 KS3
自运维资源:使用关联KCE集群方式创建资源池
serverless资源:潮汐数据清洗场景下使用Serverless资源,无需繁琐的整机扩缩容操作,满足资源灵活弹性、低成本需求。
import io
import os
import ray
import boto3
from PIL import Image
from botocore.config import Config
# ========== 配置 ==========
INPUT_S3_PREFIX = "s3://bucket/datasets/flowers-min/"
OUTPUT_S3_PREFIX = "s3://bucket/datasets/process-demo-output/ray-images/"
RESIZE_TO = (256, 256)
# ==== 配置你的 KS3 信息 ====
AK = "xxxxxx"
SK = "xxxxxx"
ENDPOINT = "http://ks3-cn-ningxia-internal.ksyuncs.com" # HTTP 内网
def _make_s3_client():
return boto3.client(
"s3",
aws_access_key_id=AK,
aws_secret_access_key=SK,
endpoint_url=ENDPOINT,
region_name="BEIJING",
config=Config(s3={"addressing_style": "virtual"}),
)
# 主进程用:列 list 等
s3 = _make_s3_client()
# ========== 工具函数 ==========
def parse_s3_path(s3_path: str):
"""
s3://bucket/prefix -> (bucket, prefix)
"""
assert s3_path.startswith("s3://")
path = s3_path[5:]
bucket, prefix = path.split("/", 1)
return bucket, prefix
def list_s3_images(bucket, prefix):
keys = []
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if key.lower().endswith((".jpg", ".jpeg", ".png")):
keys.append(key)
return keys
# ========== Ray 并行任务 ==========
@ray.remote
def resize_image_task(
bucket: str,
input_key: str,
output_bucket: str,
output_prefix: str,
):
s3_client = _make_s3_client()
# 1. 读取图片
obj = s3_client.get_object(Bucket=bucket, Key=input_key)
image_bytes = obj["Body"].read()
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
# 2. resize
img = img.resize(RESIZE_TO)
# 3. 写回 S3
buf = io.BytesIO()
img.save(buf, format="JPEG")
buf.seek(0)
filename = os.path.basename(input_key)
out_key = f"{output_prefix.rstrip('/')}/{filename}"
s3_client.put_object(
Bucket=output_bucket,
Key=out_key,
Body=buf.getvalue(),
ContentType="image/jpeg",
)
return f"s3://{output_bucket}/{out_key}"
# ========== 主流程 ==========
def main():
ray.init()
in_bucket, in_prefix = parse_s3_path(INPUT_S3_PREFIX)
out_bucket, out_prefix = parse_s3_path(OUTPUT_S3_PREFIX)
image_keys = list_s3_images(in_bucket, in_prefix)
print(f"Found {len(image_keys)} images")
tasks = [
resize_image_task.remote(
in_bucket,
key,
out_bucket,
out_prefix,
)
for key in image_keys
]
results = ray.get(tasks)
print("Done:")
for r in results:
print(r)
if __name__ == "__main__":
main()数据准备-数据处理-新建-新建Spark任务
示例说明:计算 Pi
代码包准备
将代码包放入对应资源组所在region的ks3桶中
填写任务名称、选择优先级、填写任务描述
配置项
AKSK
AppResource: Spark 任务的代码包ks3位置
eg: ks3://bucket-test/python/spark-examples_2.12-3.3.1jar
Class:指定 Spark 任务的入口类(主类),这个类必须包含 main 方法,Spark 会执行这个类的 main 方法作为程序起点
eg: org.apache.spark.examples.SparkPi
Head核数调整为>=2
Work核数调整为>=2
注意事项
配置项 | 说明 | 示例 |
Region 一致性 | 代码文件存放的 KS3 region 必须和资源组在同一个 region | 代码在 |
启动类Class | Python 任务填写 | 用于运行 Python 脚本 |
启动类Class | Java 任务填写主类,如 | 根据实际主类填写 |
要点:
代码文件、资源组、KS3 数据必须在同一个 region
Python 任务使用 org.apache.spark.deploy.PythonRunner
Java 任务填写实际的主类名
自运维资源:使用关联KCE集群方式创建资源池
serverless资源:潮汐数据清洗场景下使用Serverless资源,无需繁琐的整机扩缩容操作,满足资源灵活弹性、低成本需求。
数据准备-数据处理-新建-新建数据同步任务
示例说明:将KS3数据同步至KPFS性能型
存储配置,将同步源所在的桶和同步目标所在桶的路径配置到存储配置里
填写任务名称、选择优先级、填写任务描述
配置项
同步源:选择存储配置、填写路径
同步目标:选择存储配置、填写路径
注意事项
KS3路径和文件路径要以/结尾
自运维资源:使用关联KCE集群方式创建资源池
serverless资源:潮汐数据清洗场景下使用Serverless资源,无需繁琐的整机扩缩容操作,满足资源灵活弹性、低成本需求。
可在任务详情中查看
也可在数据同步任务完成后,可以在文件系统观察数据同步结果
纯净模式
