Step
Overview
The step is a cluster-related resource, and its operation needs the specified cluster ID.
Add Steps
BMR supports many kinds of steps, and different steps have different configurations. The following code is used to add Custom Jar, Streaming, Hive, and Pig steps to the cluster of specified hadoop type.
When adding steps, you can ensure the idempotence of creating the request through the configuration of clientToken properties of the AddStepsRequest object. The clientToken is an ASCII string with a length not exceeding 64 bits, and the configuration of clientToken for AddStepsRequest object is addStepsRequest.withClientToken(clientToken)
.
The AddStepsResponse object returned under request contains the new step ID array List<String>
and is obtained via response.getStepIds()
.
public void addSteps(BmrClient bmrClient, String clusterId) {
List<StepConfig> steps = new ArrayList<StepConfig>();
// Custom Jar step
steps.add(
new JavaStepConfig()
.withName("java-step")
.withActionOnFailure("Continue")
.withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
.withMainClass("org.apache.hadoop.examples.WordCount")
.withArguments("bos://path/to/input bos://path/to/java_output")
);
// Streaming step
steps.add(
new StreamingStepConfig()
.withName("streaming-step")
.withActionOnFailure("Continue")
.withInput("bos://path/to/input_streaming")
.withMapper("cat")
.withOutput("bos://path/to/output_streaming")
);
// Streaming step with the use of attached files
steps.add(
new StreamingStepConfig()
.withName("streaming-step2")
.withActionOnFailure("Continue")
.withInput("bos://path/to/input_streaming2")
.withMapper("cat")
.withReducer("cat")
.withOutput("bos://path/to/output_streaming2")
.withAdditionalFile("bos://path/to/testA.jar", "testB.jar")
.withArguments("-libjars testB.jar")
);
// Hive step
steps.add(
new HiveStepConfig()
.withName("hive-step")
.withActionOnFailure("Continue")
.withScript("bos://path/to/hive/hql/hive_src.hql")
.withInput("bos://path/to/hive/data/hive_src.data")
.withOutput("bos://path/to/output_hive")
.withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
);
// Pig step
steps.add(
new PigStepConfig()
.withName("pig-step")
.withActionOnFailure("Continue")
.withScript("bos://path/to/pig/script/pig_grep.pig")
.withInput("bos://path/to/pig/data/pig_grep.data")
.withOutput("bos://path/to/output_pig")
);
//Spark step
steps.add(
new SparkStepConfig()
.withName("spark-step")
.withActionOnFailure("Continue")
.withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
.withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
.withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
);
}
try {
AddStepsResponse response = bmrClient.addSteps(
new AddStepsRequest().withClusterId(clusterId)
.withSteps(steps)
);
// output of added step ID
for (String stepId : response.getStepIds()) {
System.out.println(stepId);
}
} catch (BceServiceException e) {
System.out.println("Add steps failed: " + e.getErrorMessage());
} catch (BceClientException e) {
System.out.println(e.getMessage());
}
}
List All Steps
The following code is used to list all steps on the specified cluster. The query parameter maxKeys is used to restrict the number of steps returned under every request, and the valid query parameter marker is used to specify the initial position of query records. The marker parameter value is generated and returned by the BMR system. Instead of configuration in the initial request, such a parameter is subsequently used in the circular query requests.
public void listSteps(BmrClient bmrClient, String clusterId) {
int maxKeys = 10;
try {
// method 1. list of steps about the specified cluster ID
ListStepsResponse response1 = bmrClient.listSteps(clusterId);
// method 2. single query request with configuration of maxKeys
ListStepsResponse response2 = bmrClient.listSteps(clusterId,maxKeys);
// method 3. circular query requests with configuration of maxKeys and marker
boolean isTruncated = true;
String marker = null;
int page = 0;
while (isTruncated) {
ListStepsResponse response3 = bmrClient.listSteps(clusterId, marker, maxKeys);
page++;
System.out.format("Page %d: Step count: %d\n", page, response3.getSteps().size());
isTruncated = response3.isTruncated();
marker = response3.getNextMarker();
}
// method 4. query request with customization of ListStepsRequest object
ListStepsResponse response4 = bmrClient.listSteps(
new ListStepsRequest().withClusterId(clusterId).withMaxKeys(maxKeys)
);
// output of step status
for (Step step : response4.getSteps()) {
System.out.println(step.getStatus().getState());
}
} catch (BceServiceException e) {
System.out.println("List steps failed: " + e.getErrorMessage());
}
}
The ListStepsResponse object returned under request contains the cluster object array List<Step>
, and the cluster object array is obtained via response.getSteps()
. The properties of the step object (i.e., Step) include the step configuration information, and every property has its getter accessor method.
public class Step {
private String id; // step ID
private String actionOnFailure; // step failure policy
private String type; // step type
private Map<String, String> properties; // step description
private String name; // step name
private StepStatus status; // step status
}
public class StepStatus {
private String createDateTime; // step submission time
private String endDateTime; // step end time
private String startDateTime; // step start time
private String state; // step status field
}
Query Specified Steps
The following code is used to query the information on the specified step:
The GetStepResponse object returned under request contains the getter accessor method to obtain step properties, and the information on the target step’s properties is obtained by directly calling the response accessor.
public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
try {
// method 1. query on steps based on specified cluster ID and step ID
GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);
// method 2. query request with customization of GetStepRequest object
GetStepResponse response2 = bmrClient.getStep(
new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
);
// output of step status information
System.out.println(response1.getStatus().getState());
} catch (BceServiceException e) {
System.out.println("Describe steps failed: " + e.getErrorMessage());
}
}