您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

90 行
2.4 KiB

import kfp.dsl as dsl
import kfp.gcp as gcp
def evaluate_op(
*,
gcs_bucket,
pose_estimation_gcs_path,
log_dir,
docker_image,
memory_limit,
num_gpu,
gpu_type,
checkpoint_file=None,
):
""" Create a Kubeflow ContainerOp to train an estimator on the cube sphere dataset.
Args:
gcs_bucket: GCS Bucket where the datasets are located
pose_estimation_gcs_path: Path inside the gcp bucket where the datasets are located
log_dir: path to save the Tensorboard event files.
docker_image (str): Docker image registry URI.
memory_limit (str): Set memory limit for this operator. For simplicity,
we set memory_request = memory_limit.
num_gpu (int): Set the number of GPU for this operator
gpu_type (str): Set the type of GPU
Returns:
kfp.dsl.ContainerOp: Represents an op implemented by a container image
to train an estimator.
"""
command = ["python", "-m", "pose_estimation.cli"]
arguments = [
"evaluate",
"--config-file=config.yaml",
"--download-data-gcp=True",
f"--gcs-bucket={gcs_bucket}",
f"--pose-estimation-gcs-path={pose_estimation_gcs_path}",
f"--log-dir={log_dir}",
]
evalulate = dsl.ContainerOp(
name="train", image=docker_image, command=command, arguments=arguments,
)
# GPU
evalulate.set_gpu_limit(num_gpu)
evalulate.add_node_selector_constraint(
"cloud.google.com/gke-accelerator", gpu_type
)
evalulate.set_memory_request(memory_limit)
evalulate.set_memory_limit(memory_limit)
evalulate.apply(gcp.use_gcp_secret("user-gcp-sa"))
return evalulate
@dsl.pipeline(
name="evalulate pipeline",
description="evalulate the model using kubeflow pipeline",
)
def evalulate_pipeline_single_cube(
docker_image: str = "",
gcs_bucket: str = "",
pose_estimation_gcs_path: str = "",
logdir: str = "",
):
memory_limit = "64Gi"
num_gpu = 1
gpu_type = "nvidia-tesla-v100"
# Pipeline definition
evaluate_op(
gcs_bucket=gcs_bucket,
pose_estimation_gcs_path=pose_estimation_gcs_path,
log_dir=logdir,
docker_image=docker_image,
memory_limit=memory_limit,
num_gpu=num_gpu,
gpu_type=gpu_type,
)
if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(
evalulate_pipeline_single_cube, __file__ + ".tar.gz"
)