R on Hadoop and Amazon EMR

R 2014.08.19 15:44

Amazon EMR은 R을 지원한다. 

AMI Versions 3.0.0 이상부터는 Hadoop과 함께 R 3.0.2버전이 함께 Includes되어 설치된다.

현재 R최신버전은 3.1.1이지만 R 3.0.2버전이면 거의 최신버전이라고 할 수 있다.


Amazon 서비스에서 R을 지원하기 때문에 EMR을 이용하여 R Script를 실행하는 방법을 알아보고자 한다.


우선 고려해야 될 사항을 나열해보면 아래와 같다.

첫째 AMI를 통해 설치되는 R버전이 내가 사용하고자 하는 R package에서 지원하는 버전대인지?

둘째 EMR Hadoop Cluster 세팅 시 R은 설치 되지만 R Package는 설치되지 않기 때문에 설치가 가는한지?


우선 첫번째 문제부터 살펴보면 R 3.0.2버전이면 거의 왠만한 R package는 사용가능하다. 

rdocumentation.org 에서 제공하는 R package rank내의 ggplot2, plyr등 모두 사용가능하며 이슈는 없다.

둘째로는 사용하고자 하는 R package가 설치가 가능한지인데 이 부분도 EMR에서는 Bootstrap Actions라는 기능이 있다. 해당 기능을 통해서 R package뿐만 아니라 기타 여러 패키지가 설치 가능하다.


그럼 이제 EMR에서 R Script를 실행하는 방법을 알아보자.


AMI version

위에서 언급한 바와 같이 3.0.0이상 가장 최신버전을 선택하도록 하자.


Bootstrap Actions

Bootstrap Custom Actions을 통해 R Script에서 사용할 package와 기타 필요한 패키지들을 설치할 수 있도록 해야 한다.

여기서는 R package 하나와 mysql을 설치하도록 지정했다.

아래와 같이 shell script파일을 생성한다.

 
#!/bin/bash
sudo yum -y install mysql mysql-server mysql-connector-odbc mysql-devel
wget http://cran.r-project.org/src/contrib/DBI_0.2-7.tar.gz
sudo R CMD INSTALL DBI_0.2-7.tar.gz

만약 설치되어야 하는 R package가 많다면  아래와 같이 RScript Install.package구문으로도 가능하다.

 
 sudo Rscript -e "install.packages(c('DBI', 'rJava', 'etc'),repos='http://ftp.heanet.ie/mirrors/cran.r-project.org/')"

mysql설치는 apt-get 명령어가 아닌 yum을 사용했다. AMI 3.x대 버전 이후 부터는 apt-get이 아닌 yum을 이용하여

필요한 패키지들을 설치할 수 있다. 해당 shell script를 S3내의 내 버킷공간에 저장해 둔다.


S3에 저장한 shell script를 S3 location에 Path를 지정한 후 Add해주면 Bootstrap Action설정은 모두 끝난다.


Steps

R Script 작성 시 맨 상단에 "#! /usr/bin/env Rscript" 해당 구문을 추가해주어야 한다.

추가하지 않으면 EMR에서 R Script를 해석하지 못한다.

#! /usr/bin/env Rscript
library(DBI, quietly=TRUE)
.....

실행할 R Script를 Bootstrap의 Shell script와 마찬가지로 S3내의 내 버컷공간에 저장해 둔다.

그리고 Streaming program을 선택 후 Step을 추가해주면 된다.



Mapper, Reduce에서 싫행될 R Script를 각각 지정 후 Input, Output 장소도 지정한다.

Output의 경우 Reduce에서 실행되는 R Script의 실행 결과물이 파일로 저장된다.



위와 같이 최종 설정 후 Create cluster를 생성하면 EMR내에서 R Script가 실행가능하다.



참고
http://www.joyofdata.de/blog/mapreduce-r-hadoop-amazon-emr/
https://forums.cascading.io/index.php?/topic/9-unable-to-run-emr-bootstrapscript/
http://mrjob.readthedocs.org/en/latest/guides/emr-bootstrap-cookbook.html#ami-2-x-and-ami-3-x-version-differences


저작자 표시
신고

'R' 카테고리의 다른 글

R을 이용한 이상치 분석  (0) 2014.10.01
R on Hadoop and Amazon EMR  (0) 2014.08.19
R Oracle Connection  (4) 2014.02.24
R svn commit history 시각화  (0) 2014.02.21

WRITTEN BY
빵군
Web Programmer HOONS닷넷(http://www.hoons.kr) 2011 ASP.NET 시삽 http://about.me/y2kpooh

받은 트랙백이 없고 , 댓글이 없습니다.
secret

AWS JAVA SDK를 이용하여 개발한 MapReduce를 EMR Custom Jar로 등록하여 Job Flow를 생성 및 실행하는 방법이다.

private static final String EMR_SETUP_NAME = "Setup hadoop debugging";
private static final String HADOOP_SETUP_JAR  = "s3://elasticmapreduce/libs/script-runner/script-runner.jar";
private static final List<String> HADOOP_SETUP_ARGS_AS_LIST = Arrays.asList("s3://elasticmapreduce/libs/state-pusher/0.1/fetch");
private static final String HADOOP_VERSION = "하둡버전";
private static final int INSTANCE_COUNT = 4;   //master 1개, slave 3개
private static final String INSTANCE_TYPE = InstanceType.C3Xlarge.toString();
private static final String FLOW_NAME = "FLOW_NAME";
private static final String EMR_NAME = "EMR_NAME";
private static final String S3N_HADOOP_JAR = "s3n://버킷이름/폴더명/커스텀.jar";
private static final String S3N_LOG_URI  = "s3://버킷이름/Log/";
private static final String[] JOB_ARGS = new String[] { "" };
private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
private static final List<JobFlowExecutionState> DONE_STATES = Arrays.asList(new JobFlowExecutionState[] { JobFlowExecutionState.COMPLETED,
                                             JobFlowExecutionState.FAILED,
                                             JobFlowExecutionState.TERMINATED });

static AmazonElasticMapReduce emr;
private static void init() throws Exception {
    AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
    emr = new AmazonElasticMapReduceClient(credentials);
}

public static void main(String[] args) throws Exception {
    init();
    try {
        JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
        instances.setHadoopVersion(HADOOP_VERSION);
        instances.setInstanceCount(INSTANCE_COUNT);
        instances.setMasterInstanceType(INSTANCE_TYPE);
        instances.setSlaveInstanceType(INSTANCE_TYPE);
        /*각종 Job Flow 설정 추가 ex) instances.setTerminationProtected(true); */

        RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);
        request.setLogUri(S3N_LOG_URI);

        //step 추가로 초기 서버에 하둡을 세팅하기 위한 step이다.            
        HadoopJarStepConfig jarConfig_setting = new HadoopJarStepConfig(HADOOP_SETUP_JAR);
        jarConfig_setting.setArgs(HADOOP_SETUP_ARGS_AS_LIST);
        StepConfig stepConfig_setting = new StepConfig(EMR_SETUP_NAME, jarConfig_setting);
        //custom jar step추가
        HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
        jarConfig.setArgs(ARGS_AS_LIST);
        StepConfig stepConfig = new StepConfig(EMR_NAME, jarConfig);

        request.setSteps(Arrays.asList(new StepConfig[] {stepConfig_setting, stepConfig}));

        RunJobFlowResult result = emr.runJobFlow(request);

        // 생성한 EMR에 대한 실행 상태코드 체크
        String lastState = "";
        STATUS_LOOP: while (true) {
            DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest(Arrays.asList(new String[] {result.getJobFlowId()}));
            DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
            for (JobFlowDetail detail : descResult.getJobFlows()) {
                if (isDone(detail.getExecutionStatusDetail().getState())) {
                    System.out.println(detail.getExecutionStatusDetail().getState());
                    break STATUS_LOOP;
                } else if (!lastState.equals(detail.getExecutionStatusDetail().getState())) {
                    lastState = detail.getExecutionStatusDetail().getState();
                    System.out.println(lastState);
                }
            }
          Thread.sleep(10000);
        }
    } catch (AmazonServiceException ase) {
        System.out.println("Caughtption: "      + ase.getMessage());
        System.out.println("Reponseus Code: "   + ase.getStatusCode());
        System.out.println("Error: "            + ase.getErrorCode());
        System.out.println("Request"            + ase.getRequestId());
    }
}

public static boolean isDone(String value)
{
    JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
    return DONE_STATES.contains(state);
}

끝.


참고 : http://mpouttuclarke.wordpress.com/2011/06/24/how-to-run-an-elastic-mapreduce-job-using-the-java-sdk/

저작자 표시
신고

WRITTEN BY
빵군
Web Programmer HOONS닷넷(http://www.hoons.kr) 2011 ASP.NET 시삽 http://about.me/y2kpooh

받은 트랙백이 없고 , 댓글이 없습니다.
secret

S3 특정 bucket에 저장된 Object 파일리스트 목록을 가져올때 아래와 같이 코딩을 할 수 있다.

AWSCredentials crd = new BasicAWSCredentials(accessKey, secretKey);
AmazonS3 s3 = new AmazonS3Client(crd);
ObjectListing objects = s3.listObjects(bucketName, folderName);
do {
        //1000개 단위로 읽음
	for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) {
      
	}
      objects = s3.listNextBatchOfObjects(objects);  <--이녀석은 1000개 단위로만 가져옴..
} while (objects.isTruncated());

지정된 bucket에 S3 Object가 10,300개가 있다고 가정하면 위와 같이 코딩을 하게 되면

1000개 단위 10번 즉 10,000개의 Object만 가져올 수 있다.


전체 데이터를 다 가져오려면 아래와 같이 코딩을 해야 한다.

AWSCredentials crd = new BasicAWSCredentials(accessKey, secretKey);
AmazonS3 s3 = new AmazonS3Client(crd);
ListObjectsRequest listObject = new ListObjectsRequest();
listObject.setBucketName(bucketName);
listObject.setPrefix(folderName);
            
ObjectListing objects = s3.listObjects(listObject);
do {
	objects = s3.listObjects(listObject);
        //1000개 단위로 읽음
	for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) {
      
	}
      //objects = s3.listNextBatchOfObjects(objects);  <--이녀석은 1000개 단위로만 가져옴..
      listObject.setMarker(objects.getNextMarker());
} while (objects.isTruncated());
끝.


저작자 표시
신고

WRITTEN BY
빵군
Web Programmer HOONS닷넷(http://www.hoons.kr) 2011 ASP.NET 시삽 http://about.me/y2kpooh

받은 트랙백이 없고 , 댓글 하나 달렸습니다.
secret