批量计算 创建作业(DAG 类型)

By | 2021年4月22日

create_job

参数说明:

所有类型的参数将被转换为包含属性信息的字典对象。

参数 类型 描述
job_desc JobDescription object, str, dict 作业的简单描述和作业对象中各个任务的描述信息,以及各个任务之间的DAG依赖关系

返回值说明:

create_job 方法将返回一个CreateResponse对象, 以下是 CreateResponse 对象的属性。可以通过 response.Id 的方式获取新任务的 ID。

属性 类型 描述
Id str 新任务的任务标识符

e.g.

  
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from batchcompute import Client, ClientError
  4. from batchcompute import CN_ZHANGJIAKOU as REGION
  5. from batchcompute.resources import (
  6. ClusterDescription, GroupDescription, Configs, Networks, VPC,
  7. JobDescription, TaskDescription, DAG,Mounts,
  8. AutoCluster,Disks,Notification,
  9. )
  10. access_key_id = "" # your access key id
  11. access_key_secret = "" # your access key secret
  12. image_id = "m-8vbd8lo9xxxx" # the id of a image created before,镜像需要确保已经注册给批量计算
  13. instance_type = "ecs.sn1.medium" # instance type
  14. inputOssPath = "oss://xxx/input/" # your input oss path
  15. outputOssPath = "oss://xxx/output/" #your output oss path
  16. stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
  17. stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path
  18. def getAutoClusterDesc():
  19. auto_desc = AutoCluster()
  20. auto_desc.ECSImageId = image_id
  21. #任务失败保留环境,程序调试阶段设置。环境保留费用会继续产生请注意及时手动清除环境任务失败保留环境,
  22. # 程序调试阶段设置。环境保留费用会继续产生请注意及时手动清除环境
  23. auto_desc.ReserveOnFail = False
  24. # 实例规格
  25. auto_desc.InstanceType = instance_type
  26. #case1 设置上限价格的竞价实例;
  27. # auto_desc.ResourceType = "Spot"
  28. # auto_desc.SpotStrategy = "SpotWithPriceLimit"
  29. # auto_desc.SpotPriceLimit = 0.5
  30. #case2 系统自动出价,最高按量付费价格
  31. # auto_desc.ResourceType = "Spot"
  32. # auto_desc.SpotStrategy = "SpotAsPriceGo"
  33. #case3 按量
  34. auto_desc.ResourceType = "OnDemand"
  35. #Configs
  36. configs = Configs()
  37. #Configs.Networks
  38. networks = Networks()
  39. vpc = VPC()
  40. #case1 只给CidrBlock
  41. vpc.CidrBlock = '192.168.0.0/16'
  42. #case2 CidrBlock和VpcId 都传入,必须保证VpcId的CidrBlock 和传入的CidrBlock保持一致
  43. # vpc.CidrBlock = '172.26.0.0/16'
  44. # vpc.VpcId = "vpc-8vbfxdyhxxxx"
  45. networks.VPC = vpc
  46. configs.Networks = networks
  47. # 设置系统盘type(cloud_efficiency/cloud_ssd)以及size(单位GB)
  48. configs.add_system_disk(size=40, type_='cloud_efficiency')
  49. #设置数据盘type(必须和系统盘type保持一致) size(单位GB) 挂载点
  50. # case1 linux环境
  51. # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='/path/to/mount/')
  52. # case2 windows环境
  53. # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='E:')
  54. # 设置节点个数
  55. configs.InstanceCount = 1
  56. auto_desc.Configs = configs
  57. return auto_desc
  58. def getDagJobDesc(clusterId = None):
  59. job_desc = JobDescription()
  60. dag_desc = DAG()
  61. mounts_desc = Mounts()
  62. job_desc.Name = "testBatchSdkJob"
  63. job_desc.Description = "test job"
  64. job_desc.Priority = 1
  65. # 订阅job完成或者失败事件
  66. noti_desc = Notification()
  67. noti_desc.Topic['Name'] = "test-topic"
  68. noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
  69. noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
  70. # job_desc.Notification = noti_desc
  71. job_desc.JobFailOnInstanceFail = False
  72. # 作业运行成功后户自动会被立即释放掉
  73. job_desc.AutoRelease = False
  74. job_desc.Type = "DAG"
  75. echo_task = TaskDescription()
  76. # 程序的输入路径映射,程序直接访问/home/test/input/来访问oss://xxx/input/中的文件
  77. # 支持文件挂载,在程序中直接访问文件
  78. # echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
  79. # "oss://xxx/test/file": "/home/test/test/file"}
  80. echo_task.InputMapping = {inputOssPath: "/home/test/input/"}
  81. # 程序的输出路径映射,可执行程序将结果输出到/home/test/output/,
  82. # 程序执行完毕后批量计算将/home/test/output/中的结果上传到oss://xxx/output/中
  83. # 输入和输出oss路径不要有交叉,如输入为oss://xxx/input/,输出为oss://xxx/input/output/;
  84. # 这样会导致未定义行为程序执行性能不能保证
  85. echo_task.OutputMapping = {"/home/test/output/":outputOssPath}
  86. #触发程序运行的命令行
  87. #case1 执行linux命令行
  88. echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"
  89. #case2 执行Windows CMD.exe
  90. # echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"
  91. #case3 输入可执行文件
  92. # PackagePath存放commandLine中的可执行文件或者二进制包
  93. # echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
  94. # echo_task.Parameters.Command.CommandLine = "sh test.sh"
  95. # 设置程序运行过程中相关环境变量信息
  96. echo_task.Parameters.Command.EnvVars["key1"] = "value1"
  97. echo_task.Parameters.Command.EnvVars["key2"] = "value2"
  98. # 设置docker参数
  99. #case1 docker镜像在oss registry上
  100. # echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_IMAGE"] = "localhost:5000/yuorBucket/dockers:0.1"
  101. # echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH"] = "oss://your-bucket/dockers"
  102. #case2 docker镜像在容器仓库
  103. # echo_task.Parameters.Command.Docker.Image = "registry.cn-beijing.aliyuncs.com/demotest/test:0.1"
  104. # 设置程序的标准输出地址,程序中的print打印会实时上传到指定的oss地址
  105. echo_task.Parameters.StdoutRedirectPath = stdoutOssPath
  106. # 设置程序的标准错误输出地址,程序抛出的异常错误会实时上传到指定的oss地址
  107. echo_task.Parameters.StderrRedirectPath = stderrOssPath
  108. # 设置任务的超时时间
  109. echo_task.Timeout = 600
  110. # 设置任务所需实例个数
  111. # 环境变量BATCH_COMPUTE_INSTANCE_ID为0到InstanceCount-1
  112. # 在执行程序中访问BATCH_COMPUTE_INSTANCE_ID,实现数据访问的切片实现单任务并发执行
  113. echo_task.InstanceCount = 1
  114. # 设置任务失败后重试次数
  115. echo_task.MaxRetryCount = 0
  116. # NAS数据挂载
  117. #采用NAS时必须保证网络和NAS在同一个VPC内
  118. nasMountEntry = {
  119. "Source": "nas://xxxx.nas.aliyuncs.com:/",
  120. "Destination": "/home/mnt/",
  121. "WriteSupport":True,
  122. }
  123. mounts_desc.add_entry(nasMountEntry)
  124. mounts_desc.Locale = "utf-8"
  125. mounts_desc.Lock = False
  126. # echo_task.Mounts = mounts_desc
  127. # 采用固定集群提交作业
  128. # echo_task.ClusterId = clusterId
  129. #采用auto集群提交作业
  130. echo_task.AutoCluster = getAutoClusterDesc()
  131. # 添加任务
  132. dag_desc.add_task('echoTask', echo_task)
  133. # 可以设置多个task,每个task可以根据需求进行设置各项参数
  134. # dag_desc.add_task('echoTask2', echo_task)
  135. # Dependencies设置多个task之间的依赖关系,echoTask2依赖echoTask;echoTask3依赖echoTask2
  136. # dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
  137. #
  138. job_desc.DAG = dag_desc
  139. return job_desc
  140. if __name__ == "__main__":
  141. client = Client(REGION, access_key_id, access_key_secret)
  142. try:
  143. job_desc = getDagJobDesc()
  144. job_id = client.create_job(job_desc).Id
  145. print('job created: %s' % job_id)
  146. except ClientError,e:
  147. print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())

Notice:

关于Mounts的注意事项

  1. Job 级别的 Mounts 参数会覆盖 Cluster 级别的配置信息;
  2. Modify Cluster 后,需要调用 RecreateInstance 接口才能使新指定的 Mounts 配置生效;
  3. 挂载 NAS 需要以 nas:// 做为 source 的前缀,否则会出错;
  4. 每个类的具体成员信息参见以下表格

(1)JobDescription 类

参数说明:

参数 类型 描述
properties dict, str, JobDescription object 包含作业描述信息的对象

属性说明:

序号 属性 类型 描述
1. Name str 作业名称
2. Description str 作业的简短描述信息
3. Priority int 优先级用一个[0,1000]范围内的整数指定,数值越高表示作业调度时的优先级越高
4. Notification dict 消息通知配置,可以配置 MNS 服务的 Topic 和 Job 相关事件
5. JobFailOnInstanceFail bool Instance 失败是否直接使 Job 失败
6. AutoRelease boolean 表示 Job 运行成功自动会被立即释放(删除)掉,默认为 False
7. Type str 目前仅支持有向无环图(directed acycline graph,DAG)形式描述任务
8. DAG dict, DAG object DAG 描述

(2)DAG 类

参数说明:

参数 类型 描述
properties dict, str, DAG object 所有任务的映射以及任务间依赖关系的描述信息

属性说明:

序号 属性 类型 描述
1. Tasks dict 所有任务名与任务描述的映射关系
2. Dependencies dict 所有任务间的相互依赖关系

方法说明 :

序号 方法 描述
1. add_task(task_name, task) 增加一个任务
2. get_task(task_name) 通过任务名获取任务信息
3. delete_task(task_name) 删除某个任务

(3) TaskDescription 类

参数说明:

参数 类型 描述
properties dict, str, TaskDescription object 单个任务的描述信息

属性说明:

序号 属性 类型 描述
1. Parameters dict, Parameters object 任务参数详情
2. InputMapping dict OSS 到本地路径的映射
3. OutputMapping dict 本地路径到 OSS 的映射
4. LogMapping dict 本地日志路径对 OSS 映射
5. Timeout int 任务超时时间
6. InstanceCount int 任务中实例的个数,正数
7. MaxRetryCount int 最大重试次数,默认为0
8. ClusterId str 集群标识符
9. Mounts dict, Mounts object 实例的网络挂载配置信息,由 Mounts 描述,目前支持 NAS 和 OSS 挂载。
10. AutoCluster dict, AutoCluster object 匿名集群,和集群标示符最多只能指定一个

(4) Parameters 类

参数说明:

参数 类型 描述
properties dict, str, Parameters object 任务参数的描述信息

属性说明:

序号 属性 类型 描述
1. Command dict, Command object 用户程序相关命令行参数
2. InputMappingConfig dict, InputMappingConfig object NFS 挂载服务配置项
3. StdoutRedirectPath str 标准输出的 OSS 路径
4. StderrRedirectPath str 标准错误的 OSS 路径

(5) AutoCluster 类

参数说明:

参数 类型 描述
properties dict, str, AutoCluster object 匿名集群信息

属性说明:

序号 属性 类型 描述
1. ECSImageId str ECS 镜像 ID,可以使用系统提供的镜像
2. InstanceType str 实例规格,实例类型
3. ResourceType str 资源类型,目前支持: “OnDemand” 和 “Spot”,默认为“OnDemand”
4. UserData dict 一个 KeyValue 映射,用户自定义的信息,使用 ECS 的 metaserver 获取
5. Configs Configs object 集群的配置信息, 详见4.13 节中 ClusterDescription 的介绍
6. SpotStrategy str 实例的竞价策略,只有在 ResourceType 为 Spot 的情况下有效。取值范围: SpotWithPriceLimit:设置上限价格的竞价实例; SpotAsPriceGo:系统自动出价,最高按量付费价格。
7. SpotPriceLimit float 实例的每小时最高价格(每个实例规格的价格而非每核小时的价格)。支持最大 3 位小数,SpotStrategy 为 SpotWithPriceLimit 生效。
8. ReserveOnFail bool 任务失败时不释放相关的虚拟机,会继续收取这些资源的费用直到用户删除作业,默认为 False,仅用于调查问题。
9. DependencyIsvService string 执行程序依赖的阿里云提供的ISV服务,目前提供的ISV服务有:“GTX”,默认为””,不依赖任何ISV服务。

(6) Command 类

参数说明:

参数 类型 描述
properties dict, str, Command object 用户程序相关命令行参数

属性说明:

序号 属性 类型 描述
1. CommandLine str 执行用户程序的命令
2. PackagePath str 用户程序所在 OSS 路径
3. EnvVars dict 用户程序执行时的环境变量

(7) InputMappingConfig 类

参数说明:

参数 类型 描述
properties dict, str, InputMappingConfig object NFS 挂载服务配置项

属性说明:

序号 属性 类型 描述
1. Locale str OSS object 挂载到本地时使用的字符集。可选范围包括 GBK、GB2312-80、BIG5、ANSI、EUC-JP、EUC-TW、EUC-KR、SHIFT-JIS、KSC5601 等
2. Lock bool NFS 挂载服务是否支持网络文件锁

(8) Notification 类

参数说明:

参数 类型 描述
properties dict, str, Command object 用户程序相关命令行参数

属性说明:

序号 属性 类型 描述
1. Topic Topic Object 消息 Topic

(9) Topic 类

参数说明:

参数 类型 描述
properties dict, str, Command object 用户程序相关命令行参数

属性说明:

序号 属性 类型 描述
1. Endpoint str MNS 区域 endpoint,格式如: http://${your_user_id}.mns.${region}-internal.aliyuncs.com/ ,请尽量使用内网 Endpoint。
2. Name str Topic 名称。
3. Events list 事件列表,请填写 cluster 相关的事件名。

(10) Mounts 类

参数说明:

参数 类型 描述
properties dict, str, Command object 创建集群时的网络磁盘挂载配置信息。

属性说明:

序号 属性 类型 描述
1. Entries array 网络磁盘挂载点信息列表, 由 MountPoint 描述。
2. Locale str 挂载 OSS,NAS 存储时语言选项。
3. Lock bool 挂载 OSS,NAS 存储时文件锁支持选项。
4. NAS dict NAS 配置信息。
5. OSS dict OSS 配置信息。

(11) MountPoint 类

参数说明:

参数 类型 描述
properties dict, str, Command object 网络挂载点。

属性说明:

序号 属性名称 类型 描述
1. Source str 网络磁盘挂载来源路径,可以是 nas://oss:// 开头的字符串。
2. Destination str 网络磁盘本地挂载点路径。
3. WriteSupport bool 挂载点是否可写。

(12) NAS 类

参数说明:

参数 类型 描述
properties dict, str, Command object NAS 配置信息。

属性说明:

序号 属性名称 类型 描述
1. AccessGroup list 需要将集群实例加入到的 NAS 访问组。
2. FileSystem list 需要访问的文件系统。

(13) OSS 类

参数说明:

参数 类型 描述
properties dict, str, Command object OSS 配置信息。

属性说明:

序号 属性名称 类型 描述
1. AccessKeyId str OSS挂载使用的 Access ID。
2. AccessKeySecret str OSS挂载使用的 Access Secret。
3. SecurityToken str OSS挂载使用的 Security Token。

请关注公众号获取更多资料

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注