AWS JAVA SDK를 이용하여 개발한 MapReduce를 EMR Custom Jar로 등록하여 Job Flow를 생성 및 실행하는 방법이다.
private static final String EMR_SETUP_NAME = "Setup hadoop debugging"; private static final String HADOOP_SETUP_JAR = "s3://elasticmapreduce/libs/script-runner/script-runner.jar"; private static final List<String> HADOOP_SETUP_ARGS_AS_LIST = Arrays.asList("s3://elasticmapreduce/libs/state-pusher/0.1/fetch"); private static final String HADOOP_VERSION = "하둡버전"; private static final int INSTANCE_COUNT = 4; //master 1개, slave 3개 private static final String INSTANCE_TYPE = InstanceType.C3Xlarge.toString(); private static final String FLOW_NAME = "FLOW_NAME"; private static final String EMR_NAME = "EMR_NAME"; private static final String S3N_HADOOP_JAR = "s3n://버킷이름/폴더명/커스텀.jar"; private static final String S3N_LOG_URI = "s3://버킷이름/Log/"; private static final String[] JOB_ARGS = new String[] { "" }; private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS); private static final List<JobFlowExecutionState> DONE_STATES = Arrays.asList(new JobFlowExecutionState[] { JobFlowExecutionState.COMPLETED, JobFlowExecutionState.FAILED, JobFlowExecutionState.TERMINATED }); static AmazonElasticMapReduce emr; private static void init() throws Exception { AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); emr = new AmazonElasticMapReduceClient(credentials); } public static void main(String[] args) throws Exception { init(); try { JobFlowInstancesConfig instances = new JobFlowInstancesConfig(); instances.setHadoopVersion(HADOOP_VERSION); instances.setInstanceCount(INSTANCE_COUNT); instances.setMasterInstanceType(INSTANCE_TYPE); instances.setSlaveInstanceType(INSTANCE_TYPE); /*각종 Job Flow 설정 추가 ex) instances.setTerminationProtected(true); */ RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances); request.setLogUri(S3N_LOG_URI); //step 추가로 초기 서버에 하둡을 세팅하기 위한 step이다. HadoopJarStepConfig jarConfig_setting = new HadoopJarStepConfig(HADOOP_SETUP_JAR); jarConfig_setting.setArgs(HADOOP_SETUP_ARGS_AS_LIST); StepConfig stepConfig_setting = new StepConfig(EMR_SETUP_NAME, jarConfig_setting); //custom jar step추가 HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR); jarConfig.setArgs(ARGS_AS_LIST); StepConfig stepConfig = new StepConfig(EMR_NAME, jarConfig); request.setSteps(Arrays.asList(new StepConfig[] {stepConfig_setting, stepConfig})); RunJobFlowResult result = emr.runJobFlow(request); // 생성한 EMR에 대한 실행 상태코드 체크 String lastState = ""; STATUS_LOOP: while (true) { DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest(Arrays.asList(new String[] {result.getJobFlowId()})); DescribeJobFlowsResult descResult = emr.describeJobFlows(desc); for (JobFlowDetail detail : descResult.getJobFlows()) { if (isDone(detail.getExecutionStatusDetail().getState())) { System.out.println(detail.getExecutionStatusDetail().getState()); break STATUS_LOOP; } else if (!lastState.equals(detail.getExecutionStatusDetail().getState())) { lastState = detail.getExecutionStatusDetail().getState(); System.out.println(lastState); } } Thread.sleep(10000); } } catch (AmazonServiceException ase) { System.out.println("Caughtption: " + ase.getMessage()); System.out.println("Reponseus Code: " + ase.getStatusCode()); System.out.println("Error: " + ase.getErrorCode()); System.out.println("Request" + ase.getRequestId()); } } public static boolean isDone(String value) { JobFlowExecutionState state = JobFlowExecutionState.fromValue(value); return DONE_STATES.contains(state); }
끝.
참고 : http://mpouttuclarke.wordpress.com/2011/06/24/how-to-run-an-elastic-mapreduce-job-using-the-java-sdk/
'java' 카테고리의 다른 글
맵리듀스에서 RDBMS로 인서트 시 GC overhead limit exceeded Issue (0) | 2014.08.07 |
---|---|
S3 전체 파일 리스트 가져오기 (3) | 2014.07.28 |
[Effective Java] private 생성자를 사용해서 인스턴스 생성을 못하게 하자 (0) | 2014.01.23 |