MapReduce-Understading BigData(7)
Intro
학교 수강과목에서 학습한 내용을 복습하는 용도의 포스트입니다.
빅데이터 개념과 오픈소스인 아파치 하둡과 맵리듀스 및 스파크를 이용한 빅데이터 적용을 공부합니다.
맵 리듀스의 경우 사용하기에 다소 진입장벽이 있는편입니다.
스파크처럼 통합 환경을 제공하지 않아 원하는 유틸리티나 라이브러리를 별도로 연결해서 사용해야하기 때문입니다. 이를 해소하는 것이 스파크라는 분산 데이터 처리 통합 엔진입니다.
따라서 맵 리듀스로 먼저 공부해보고, 스파크로 넘어갑니다.
스파크 엔진의 경우 Java가 아닌 Scalar라는 언어로 사용하며, 기존 우리가 알고 있는 SQL을 통해 고급 질의가 가능하며, 시각화나 스트림 처리 및 기계학습등 까지의 높은 수준의 분석을 제공하는 통합 프레임 워크입니다.
빅데이터 컴퓨팅(분산시스템상의 분산처리 환경)의 기본 개념과 원리를 이해하고 이를 실습해보는 과정에서 2대 이상의 리눅스 클러스터 서버를 구축 및 활용할 것입니다.
빅데이터이해(1) 보러가기
빅데이터이해(2) 보러가기
빅데이터이해(3) 보러가기
빅데이터이해(4) 보러가기
빅데이터이해(5) 보러가기
빅데이터이해(6) 보러가기
빅데이터이해(8) 보러가기
이번 시간에는 하둡의 병렬처리시스템인 맵리듀스를 소개합니다.
MapR Academy과 구글 논문 MapReduce을 참고하여 포스팅합니다.
MapReduce
맵리듀스 프로그램은 클러스터 상에서 실행될 병렬처리 프로그램입니다.
그래서 하둡의 프레임 구조 하에서 해야하므로 프로그램 작성이 쉽지 않습니다.
분할된 데이터에 대해 동일한 작업을 실행하는 병렬처리 프로그램이므로 그나마 다른 병렬처리 프로그램보다는 간단합니다.
따라서 어떻게 동작하는지에 대한 이해가 필요해서 그 내용을 다뤄보도록 하겠습니다.
MapReduce 작업
맵리듀스의 기본 개념은 Lisp 함수형 언어에서 도입되었습니다.
Map함수는 리스트의 각 입력 당 멱승 함수를 적용하여 출력리스트를 생성합니다.
(1 2 3 4)라는 리스트가 있다고 가정할 때 개별 원소에 대해 동일한 기능을 하게 하는 함수가 바로 Map입니다.
(map square '(1 2 3 4)) =(1 4 9 16)
Reduce는 맵의 출력 이후 생성된 리스트에 대해 적용되는 함수로, 단일 출력 값을 생성하는 함수입니다.
(reduce +'(1 4 9 16)) = 30
하둡 클러스터에서는 이 개념을 차용하여 각각의 리스트의 원소가 노드 및 데이터 블록이 될 것입니다.
각 데이터 블록에 대해 동시에 동일한 기능을 하도록 하는 것이 Map이 되며, 동일한 함수의 결과를 suffle하여 가져와서 최종 결과를 내도록 Reduce를 호출하지요.
구글은 1997년 19번째로 검색엔진을 개발했는데, 웹 검색 엔진 구현에 맵리듀스를 적용했다고합니다.
웹 검색 단계는 아래와 같습니다.
- 웹 페이지의 링크를 따라 웹 크롤링
- URL 기준으로 페이지 정렬
- 잘못된 페이지(junk) 제거
- 주어진 단어의 빈도, 히트 수, 페이지의 갱신 여부에 따라 URL을 정렬하여 랭크(등수)를 부여(PageRank 기법 등)
- 사용자에 표시할 인덱스 생성(각 단어에 대해, 단어를 포함하는 URL의 리스트를 생성)
이 중에서 가장 성능에 영향을 미치는 것이 바로 단어의 빈도 카운트 부분 등인데, 이 때 맵리듀스를 활용하면 빠르게 처리가 가능합니다.
구글의 단어 카운트 알고리즘과 관련하여 논문이 공개되어있습니다.
MapRedce: Simplified Data Processing on Large Clustes
논문 url : https://research.google.com/archive/mapreduce.html
간단히 정리하자면, Map함수에서 각 문서의 단어를 카운트한 후 Reduce함수에서 취합하고있습니다.
map(String key, String value):
// key: document name
// value : document contents
for each word w in value:
EmitIntermediate(w,"1);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result=0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map 메소드는 키와 값을 입력으로 받습니다. 이 때 키는 문서의 이름, 값은 문서의 내용입니다.
문서의 각 단어에 대해 루프를 수행하는데 (단어, 1)의 2-튜플을 생성합니다.
Reduce 메소드는 키와 값들의 리스트를 입력으로 받습니다. 이 때 키는 단어이고, 리스트는 단어의 카운트리스트로 1 값들의 리스트입니다.
리스트의 값들에 대해 루프를 수행하여 더한 후 단어에 대한 최종 카운트 값을 생성합니다.
웹 페이지의 단어의 빈도수를 확인하여 어떤 문서가 가장 관련성이 있는가를 보고 랭킹을 매겨주는 것이지요.
MapReduce 계산 모델
이전 장에서 MapReduce는 Master와 Slave로 동작한다고 설명했었습니다.
Master에 해당하는 것이 Job Tracker이고, Slave에 해당하는 프로세스가 Task Tracker라고 했습니다.
그래서 실질적으로 맵리듀스 연산을 하는 것은 Task Tracker가 되겠지요.
맵리듀스는 맵, 셔플, 리듀스의 3단계로 수행되게됩니다.
-
Task Tracker 노드에 분산 배치된 입력 데이터에 대해 맵 단계에서 한 번에 한 레코드의 입력에 대해 맵 함수를 적용하여 key-value들의 쌍을 출력합니다.
-
다른 노드에 있는 task tracker에 전달하는 셔플 단계에서는 각 노드의 맵의 부분 결과를 통합한 후 리듀스 단계로 전송합니다.
-
리듀스 단계에서는 각 키에 대해 노드 단위로 분할된 모든 데이터를 리스트로 받아 리듀스 함수를 적용하여 0개 이상의 key-value 쌍들을 출력합니다.
만약 리듀스가 여러개 있다면 한 노드의 리듀스에서 수행할 수 있는 키값의 영역과 또다른 노드의 리듀스에서 수행할 수 있는 키값의 영역이 따로 있을 건데, 여러개의 각각의 통합된 결과도 또다시 리듀스를 통해 key-value 쌍들을 출력하게 될 것입니다.
하둡 런타임 모델(Hadoop Runtime Model)
맵리듀스 모델은 데이터가 위치한 노드로 계산 프로그램(클라이언트에서 작성된 프로그램)을 전송합니다.
맵리듀스 작업이 실행되면 맵과 리듀스 태스크들을 클러스터의 노드에 전송합니다.
맵을 실행할 때 가능한 노드의 로컬 디스크 데이터 상의 계산을 수행하여 네트워크 트래픽을 최소화합니다.
다른 노드에 있는 리듀스로 보내는 것을 suffle이라고 했는데 그럴 때 네트워크 부하가 심하기 때문입니다.
맵리듀스 작업 단계
하둡 프레임워크이 입력 노드에 데이터를 분할하여 저장합니다.
각 분할 데이터는 텍스트, 멀티미디어, 구조화된 데이터 등 임의의 타입의 많은 레코드로 구성될 것입니다.
맵태스크가 각 분할 데이터를 처리합니다.
특정 키의 모든 레코드들이 같은 리듀스 태스크에 전달되도록 맵의 출력들이 분할됩니다.
병합(combine) 단계에서는 같은 키의 레코드들을 결합하여 노드 간에 복사되는 레코드들의 수를 줄입니다.
한 노드에서 일어나는 일이라는 점을 제외하면 리듀스와 동일한 메서드를 사용합니다.
셔플 단계에서 맵 태스크의 중간 결과를 리듀스 태스크로 전송합니다.
리듀스 태스크를 수행하여 0개 이상의 키-값 쌍들을 생성합니다.
최종적으로 프레임워크가 리듀스 태스크의 결과를 취합하여 출력합니다.
사용자가 맵과 리듀스 태스크의 코드 작성만하면 하둡 프레임워크가 나머지 대부분을 처리하게되지만 그것마저 쉽지 않을 수 있다는 것이죠.
MapReduce 데이터 흐름
이번에는 데이터의 흐름 관점에서 맵리듀스를 살펴봅니다.
맵리듀스 작업 흐름 단계
- 클러스터에 데이터를 적재
- 맵리듀스로 데이터를 분석 처리
- 분석 결과를 HDFS에 저장
- 클러스터의 결과를 읽어서 비즈니스 로직 분석
순서대로 자세히 살펴봅시다.
클러스터에 데이터 적재 구조화 및 비구조화 데이터 소스에서 데이터를 하둡 클러스터에 적재하는 다양한 툴(에코 시스템) 사용
- sqoop은 SQL 데이터를 분산 파일 시스템에 적재합니다.
- Flume은 로그 데이터를 분산 파일 시스템에 적재합니다.
맵리듀스의 데이터 실행 흐름(1)
참고로 파티셔너의 경우 리듀스함수의 개수에 따라 여러개로 출력을 뱉기도 합니다.
- 하둡 파일 시스템으로부터 데이터를 적재
- 작업이 입력 데이터 형식을 정의(InputFormat)
- 모든 노드의 각 map()메소드에서 실행될 데이터를 분리(split)
레코트 리터가 데이터를 map()메소드의 입력이되는 키-값쌍으로 파싱합니다. - map() 메소드가 파티셔너로 보내지는 키-값 쌍들을 생성(map)
- 여러개의 리듀서가 있는 경우 각 리듀스 태스크 당 하나의 파티션을 생성(partitioner)
- 키 값 기준으로 하나의 파티션을 갖도록 셔플(shuffle)
- 각 파티션에서 키 값 기준으로 키-값 쌍들을 정렬(sort)
- reduce() 메소드가 중간 키-값 쌍(intermediate k-value pairs)들을 입력 받아 최종 키-값 쌍들의 리스트로 줄임(reduce)
- 작업이 출력 데이터 형식을 정의(outputformat)
- 출력 데이터가 하둡 파일 시스템에 저장
InputFormat 클래스
InputFormat 클래스는 작업의 입력 데이터를 검증하고, 맵 처리를 위해 파일들을 분리하고, RecoredReader 객체의 인스턴스를 생성합니다.
파일의 분할되는 크기는 디폴트로 블록의 크기인데요, 하둡의 디폴트 블록 크기는 64MB입니다.
분할된 데이터(input Split)는 레코드의 집합으로 맵 단계의 입력 키-값 쌍으로 전달합니다.
각 노드에 태스크가 할당되면 TaskTracker가 InputSplit을 record reader 생성자에게 전달합니다.
record reader는 레코드 단위로 읽어들인 키-값 쌍들을 map()메소드에게 전달합니다.
record reader는 한 라인을 한 레코드로 간주하는 것이 디폴트입니다.
inputFormat 클래스에서 멀티 라인 레코드 등 다른 형식의 레코드 형식을 정의할 수도 있습니다.
inputSplit의 모든 레코드를 읽어 들였으면 record reader는 중지됩니다.
Mapper 클래스
맵 단계는 Mapper 클래스의 map() 메소드로 구현됩니다.
분리된 데이터의 모든 레코드에 대해 입력을 받을 때까지 map()이 실행됩니다.
map메소드는 3개의 인수로 원형이 정의됩니다.
key, value, context가 그것들입니다.
디폴트로 record reader는 입력 파일에서 레코드의 바이트 오프셋을 키로, 해당 바이트 오프셋의 라인을 값으로 정의합니다.
map() 메소드는 입력의 값들을 토큰화하여 처리합니다.
물론 각 토큰에 대해 무엇을 할 것인지는 프로그램의 로직 내용에 따라 좌우됩니다.
맵 컨텍스트 객체는 map() 메소드의 출력을 취합하여 다음 단계의 파티셔너로 전달합니다.
Partitioner 클래스
partitioner 클래스는 map() 메소드의 중간 키-값 쌍들을 입력 받아서, 레코드 키를 해싱(hashing)하고, 해시된 키에 기반하여 파티션을 생성합니다.
같은 키의 레코드들은 같은 파티션으로 저장하여 같은 리듀서로 전송되게 됩니다.
셔플 단계(Shuffle Phase)
셔플 단게에서는 파티션을 정렬하고 통합을 하여 새로운 파티션을 구성한 후 리듀서에게 전송합니다.
서로 다른 노드에게 전송을하는 과정에서는 물리적으로 떨어져있는 경우도 많기 때문에, HTTP나 RPC 등 네트워크 프로토콜을 사용하여 전송하므로 맵리듀스 프로그램에서 가장 큰 네트워크 부하를 유발합니다.
Reducer 클래스
파티션의 각 키와 해당 값들의 리스트에 대해 reduce() 메소드가 호출됩니다.
각 값들의 리스트에 대해 처리한 결과를 컨텍스트에 저장됩니다.
컨텍스트의 outputcomitter가 실행되는 리듀서의 수만큼 즉, 리듀서 당 하나의 출력파일을 생성하게 됩니다.
맵리듀스 작업의 결과
맵리듀스 작업의 결과는 사용자가 지정한 디렉토리에 저장됩니다.
_SUCCESS 빈 파일은 작업의 성공을 표시합니다.
_logs/history* 파일들에 작업의 이력들이 캡쳐됩니다.
reduce() 메소드의 출력은 각 리듀서에 대해 part-r-00000, part-r-00001, … 등의 파일들에 각각 저장됩니다.
맵 단계만 수행되는 작업의 경우에는 part-m-00000, part-m-00001, … 등의 파일들에 맵의 출력이 저장됩니다.
맵리듀스 작업 실행 프레임워크
마스터 노드(Job Tracker)에 작업을 제출하는 클라이언트 노드에는 맵리듀스 드라이브가 있어서 JobClient를 호출해 job을 실행합니다.
그렇게 맵리듀스프로그램이 시작되면, JobClient는 JobTracker에 Job을 제출하고 JobTracker는 각 Task Tracker에게 Job을 배분합니다.
Task Tracker는 자신에게 할당된 Job을 실행하기 위해서 자식 프로세스를 생성하고 JVM 상에서 맵과 리듀스 동작을 실행합니다.
이 모든 내용에 대해 모니터링하기 위해서 job에 대한 정보를 계속 주고받을 수 있습니다.
슬레이브 노드에서 마스터노드로 보내는 heartbeat은 주기적으로 자신의 상태를 보고하는 내용이 되겠습니다.
다시 한번 요약해보겠습니다.
하둡의 맵리듀스 작업 실행 과정
- 사용자가 클라이언트 노드에서 JobClient 객체를 생성하여 맵리듀스 프로그램 실행을 시작
- JobClient 객체가 JobTracker에 작업을 제출
- JobTracker는 Job 객체를 생성하고, 적당한 TaskTracker 노드에 전송
- TaskTracker는 맵과 리듀스 작업을 차례로 수행하는 자식 프로세스를 생성하고 시작
- 작업 수행 중에 TaskTracker의 상태와 카운터 등이 계속 갱신
Heartbeat 적용
TaskTracker는 JobTracker에 주기적으로 heartbeats를 전송하여 살아있음을 알립니다.
heartbeats에는 태스크의 상태, 태스크 카운터, 데이터의 읽기/쓰기 상태 등이 포함됩니다.
한 태스크 트랙커로부터 heartbeats의 수신이 안되면 JobTracker는 다른 TaskTracker 노드에 중단된 작업을 재 스케줄합니다.
중단된 노드는 다운된 것으로 표시하고 이후 작업 스케줄 대상에서 제외됩니다.
하둡 작업 스케줄링
하둡에는 2가지 스케줄러가 있습니다.
-
페어 스케줄러(Fair Scheduler)_페이스북개발
각 사용자는 디폴트로 자신의 풀을가지는데, 각 풀(pool)에 자원이 공평하게 분배 한 잡을 수행하기 전에 다른 잡이 밀어내고 수행할 수 있도록 선점(pre-emption)을 지원합니다. -
커패시티 스케줄러(Capacity Scheduler)_야후개발
관리자가 계층적 풀(큐)을 구성하여 조직에 따른 차별적인 접근을 반영합니다.
각 큐별로 이용할 수 있는 자원의 용량을 정해주면 그 용량에 맞게 자원을 할당합니다.
자원기반 스케줄링과 작업 우선순위를 지원합니다.
하둡 실행 프레임워크의 제약
맵리듀스 버전 1 프레임워크는 여러 제약점이 있어서, 맵리듀스 버전 2 또는 YARN(Yet Another Resource Negotiator)이 개발되었습니다.
확장성 면에서 노드 4000개까지만 지원한다던지(scalability), 노드에서 실행하는 맵과리듀스의 슬롯들이 고정되어있다던지(inflexibility), 맵리듀스 작업만 지원하고 다른 에코시스템은 실행할 수 없다던지(program support) 하는 등의 제약이 있었지요.
YARN은 위의 제약점을 해결하고, 맵리듀스 버전 1과 같은 API와 CLI를 사용합니다.
자원의 관리와 작업의 관리를 분리함으로써 가능해졌습니다.
클러스터 자원의 관리와 작업 스케줄링은 ResourceManager가 관리하며, 자원의 협상과 작업 모니터링은 클러스터에서 수행되는 각 응용의 ApplicationMaster가 관리합니다.
왼쪽 그림이 맵리듀스 v1이고, 오른쪽 그림이 YARN입니다.
YARN 아키텍처
크게 컨테이너와 RM, NM,AM으로 구성됩니다.
할당된 컴퓨팅 자원의 논리적인 개체(CPU,메모리)를 컨테이너라고 합니다.
RM은 자원관리자로서, 클러스터 내의 컴퓨팅 자원을 할당하며 컨테이너를 생성 또는 삭제하고, NM을 추적합니다.
NM은 노드관리자로서 응용과 AM 실행을 시작하고 상태를 보고합니다.
AM은 응용마스터로서 응용 프로그램을 위해 컨테이너를 요청하고 실행합니다.
YARN의 작업 실행
- 클라이언트는 YARN의 자원 관리자에게 응용을 제출(이 때 컨테이너를 생성하기 위한 정보인 Container Launch Context와 함께 전달)
- 자원 관리자 내부에 있는 응용관리자가 한 컨테이너를 위해 협상하고, 해당 응용의 응용마스터 인스턴스를 생성
- 응용 마스터는 자원 관리자에 등록되고, 노드 관리자에 각 컨테이너의 CLC를 전달하고 컨테이너를 생성
- 응용 마스터는 응용의 실행을 관리하고, 실행 상황과 상태 정보 등을 모니터링(클라이언트는 자원 관리자에게 질의하거나, 직접 응용 마스터와 통신하여 응용의 상태를 모니터링)
- 응용 마스터는 응용의 실행이 완료되면 자원 관리자에게 알림
- 마지막으로 응용마스터는 자원관리자에서 등록을 해제하고, 자원 관리자는 컨테이너를 해제
YARN의 작업 모니터링을 제공하는 작업 이력 서버(Job History Server : 포트 19888)가 있습니다.
웹 UI 상에서 실행된 작업의 요약을 프레젠테이션하며, 작업을 클릭하면 상세한 이력을 표시해줍니다.
맵리듀스 프로그래밍 팁
드라이버, 매퍼, 리듀서 클래스들의 템플릿을 가지고 시작하는 경우가 많습니다.
응용 로직 이외의 맵리듀스 코드의 대부분은 공통적으로 작성을 하며 공통된 부분을 템플릿으로 작성하는 것이지요.
예를들어 import 문들, 클래스 정의들, 메서드 시그니쳐등이 해당됩니다.
따라서 우리는 응용의 로직부분을 수정해서 사용하게 되는 경우가 많을 것입니다.
그렇게 하기 위해서는 위에서 배운 맵리듀스 프레임워크 내에서 데이터의 흐름과 변환되는 4단계 변환 과정의 이해가 필수적입니다.
입력 파일들로부터 변환되어 매퍼에 전달되는 방법과 매퍼에서 데이터가 변환되는 방법, 데이터가 정렬되고 통합되어 리듀서에게 전달되는 방법, 리듀서가 데이터를 변환하여 출력 파일들에 내보내는 방법에 대해 개괄적인 지식이 있어야 한다는 뜻 입니다.
개인이 공부하고 포스팅하는 블로그입니다. 작성한 글 중 오류나 틀린 부분이 있을 경우 과감한 지적 환영합니다!