Step(作业)
更新时间:2022-05-18
添加steps
作业是和集群相关联的资源,对作业的操作需要指定集群ID。
BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。请注意:参考下面样例代码时,需要修改作业参数指定的BOS路径为您的账户可用的BOS路径。
Python
1steps = [
2 BmrClient.step(
3 'Java',
4 'Continue',
5 BmrClient.java_step_properties(
6 'bos://benchmark/hadoop/hadoop-mapreduce-examples.jar',
7 'org.apache.hadoop.examples.WordCount',
8 'bos://helloworld/input/install.log bos://tester01/sdk/output_java/out1'
9 ),
10 'sdk-job-01'
11 ),
12 BmrClient.step(
13 'Streaming',
14 'Continue',
15 BmrClient.streaming_step_properties(
16 'bos://helloworld/input/install.log',
17 'bos://tester01/sdk/output_streaming/out1',
18 'cat'),
19 'sdk-job-02'
20 ),
21 BmrClient.step(
22 'Hive',
23 'Continue',
24 BmrClient.hive_step_properties(
25 'bos://chy3/hive/hql/hive_src.hql',
26 '--hivevar LOCAT=bos://chy3/hive/tables/src',
27 'bos://chy3/hive/data/hive_src.data',
28 'bos://tester01/sdk/output_hive/out1'
29 ),
30 'sdk-job-03'
31 ),
32 BmrClient.step(
33 'Pig',
34 'Continue',
35 BmrClient.pig_step_properties(
36 'bos://chy3/pig/script/pig_grep.pig',
37 input='bos://chy3/pig/data/pig_grep.data',
38 output='bos://tester01/sdk/output_pig/out1'
39 ),
40 'sdk-job-04'
41 ),
42 BmrClient.step(
43 'Spark',
44 'Continue',
45 BmrClient.spark_step_properties(
46 'bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar',
47 '--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer',
48 'bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out'
49 ),
50 'sdk-job-05'
51 ),
52 BmrClient.step(
53 'Streaming',
54 'Continue',
55 BmrClient.streaming_step_properties(
56 'bos://helloworld/input/install.log',
57 'bos://tester01/sdk/output_streaming/out1',
58 'cat',
59 'cat',
60 '-libjars testB.jar'
61 ),
62 'sdk-job-06',
63 [BmrClient.additional_file("bos://path/to/testA.jar", "testB.jar")]
64 )
65]
66
67try:
68 response = bmr_client.add_steps(cluster_id, steps)
69 LOG.debug('add steps response: %s' % response)
70except BceHttpClientError as e:
71 if isinstance(e.last_error, BceServerError):
72 LOG.error('add_steps failed. Response %s, code: %s, msg: %s'
73 % (e.last_error.status_code, e.last_error.code, e.last_error.args))
74 else:
75 LOG.error('add_steps failed. Unknown exception: %s' % e)
列出全部steps
如下代码可以罗列出指定集群上的全部作业,用户可以通过指定pageNo和pageSize来限制一次请求返回的最大作业数目:
Python
1try:
2 response = bmr_client.list_steps(cluster_id, pageNo=1, pageSize=50)
3 for step in response.steps:
4 LOG.debug('list step %s: %s' % (step.id, step))
5except BceHttpClientError as e:
6 if isinstance(e.last_error, BceServerError):
7 LOG.error('list_steps failed. Response %s, code: %s, msg: %s'
8 % (e.last_error.status_code, e.last_error.code, e.last_error.args))
9 else:
10 LOG.error('list_steps failed. Unknown exception: %s' % e)
查询指定的step
如下代码可以查看指定作业的信息:
Python
1try:
2 response = bmr_client.get_step(cluster_id, step_id)
3 LOG.debug('describe step response: %s' % response)
4except BceHttpClientError as e:
5 if isinstance(e.last_error, BceServerError):
6 LOG.error('get_step failed. Response %s, code: %s, msg: %s'
7 % (e.last_error.status_code, e.last_error.code, e.last_error.args))
8 else:
9 LOG.error('get_step failed. Unknown exception: %s' % e)