본문 바로가기

java

AWS JAVA SDK를 이용한 EMR(Elastic MapReduce) Job Flow 생성 및 실행

AWS JAVA SDK를 이용하여 개발한 MapReduce를 EMR Custom Jar로 등록하여 Job Flow를 생성 및 실행하는 방법이다.

private static final String EMR_SETUP_NAME = "Setup hadoop debugging";
private static final String HADOOP_SETUP_JAR  = "s3://elasticmapreduce/libs/script-runner/script-runner.jar";
private static final List<String> HADOOP_SETUP_ARGS_AS_LIST = Arrays.asList("s3://elasticmapreduce/libs/state-pusher/0.1/fetch");
private static final String HADOOP_VERSION = "하둡버전";
private static final int INSTANCE_COUNT = 4;   //master 1개, slave 3개
private static final String INSTANCE_TYPE = InstanceType.C3Xlarge.toString();
private static final String FLOW_NAME = "FLOW_NAME";
private static final String EMR_NAME = "EMR_NAME";
private static final String S3N_HADOOP_JAR = "s3n://버킷이름/폴더명/커스텀.jar";
private static final String S3N_LOG_URI  = "s3://버킷이름/Log/";
private static final String[] JOB_ARGS = new String[] { "" };
private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
private static final List<JobFlowExecutionState> DONE_STATES = Arrays.asList(new JobFlowExecutionState[] { JobFlowExecutionState.COMPLETED,
                                             JobFlowExecutionState.FAILED,
                                             JobFlowExecutionState.TERMINATED });

static AmazonElasticMapReduce emr;
private static void init() throws Exception {
    AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
    emr = new AmazonElasticMapReduceClient(credentials);
}

public static void main(String[] args) throws Exception {
    init();
    try {
        JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
        instances.setHadoopVersion(HADOOP_VERSION);
        instances.setInstanceCount(INSTANCE_COUNT);
        instances.setMasterInstanceType(INSTANCE_TYPE);
        instances.setSlaveInstanceType(INSTANCE_TYPE);
        /*각종 Job Flow 설정 추가 ex) instances.setTerminationProtected(true); */

        RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);
        request.setLogUri(S3N_LOG_URI);

        //step 추가로 초기 서버에 하둡을 세팅하기 위한 step이다.            
        HadoopJarStepConfig jarConfig_setting = new HadoopJarStepConfig(HADOOP_SETUP_JAR);
        jarConfig_setting.setArgs(HADOOP_SETUP_ARGS_AS_LIST);
        StepConfig stepConfig_setting = new StepConfig(EMR_SETUP_NAME, jarConfig_setting);
        //custom jar step추가
        HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
        jarConfig.setArgs(ARGS_AS_LIST);
        StepConfig stepConfig = new StepConfig(EMR_NAME, jarConfig);

        request.setSteps(Arrays.asList(new StepConfig[] {stepConfig_setting, stepConfig}));

        RunJobFlowResult result = emr.runJobFlow(request);

        // 생성한 EMR에 대한 실행 상태코드 체크
        String lastState = "";
        STATUS_LOOP: while (true) {
            DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest(Arrays.asList(new String[] {result.getJobFlowId()}));
            DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
            for (JobFlowDetail detail : descResult.getJobFlows()) {
                if (isDone(detail.getExecutionStatusDetail().getState())) {
                    System.out.println(detail.getExecutionStatusDetail().getState());
                    break STATUS_LOOP;
                } else if (!lastState.equals(detail.getExecutionStatusDetail().getState())) {
                    lastState = detail.getExecutionStatusDetail().getState();
                    System.out.println(lastState);
                }
            }
          Thread.sleep(10000);
        }
    } catch (AmazonServiceException ase) {
        System.out.println("Caughtption: "      + ase.getMessage());
        System.out.println("Reponseus Code: "   + ase.getStatusCode());
        System.out.println("Error: "            + ase.getErrorCode());
        System.out.println("Request"            + ase.getRequestId());
    }
}

public static boolean isDone(String value)
{
    JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
    return DONE_STATES.contains(state);
}

끝.


참고 : http://mpouttuclarke.wordpress.com/2011/06/24/how-to-run-an-elastic-mapreduce-job-using-the-java-sdk/