-
Spark - RDD⏰ 오늘의 공부/기타 2020. 2. 1. 00:23
저수준 api
- RDD : 분산형 데이터 처리 (여러 서버에 분산되어 있는 데이터)
→ 이 RDD들이 각 노드에서 병렬적으로 동시에 처리되어 나중에 합쳐지거나 또 추가적인 연산 진행 - 브로드캐스트 변수, 어큐뮬레이터 → 분산형 공유 변수를 베포하고 다루기 위한 api
언제 사용 ?
- 고수준 api 에서 제공하지 않는 기능이 필요한 경우
- RDD 이용해 개발된 기존 코드 유지하는 경우
- 사용자가 정의한 공유 변수를 다뤄야 하는 경우
즉, 물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝) 에 세부적인 제어가 필요할 때
→ 기본적으로는 구조적 api 사용하는 것이 좋다.
RDD
- RDD 레코드 : 프로그래머가 선택한 java, python, scala 등의 객체
- 객체이므로 사용자가 완벽하게 제어 가능
- 강력한 제어권을 가지고 있지만 모든 값을 다루거나, 값 사이의 상호작용 과정을 반드시 수동으로 정의해야함
- 구조적 api 와 다르게 레코드의 내부 구조를 스파크에서 파악할 수 없어서 최적화를 위한 수작업이 필요
- data lineage 가 tracking 되어 fault tolerant 하다.
종류
- 제네릭 RDD
- 키 - 값 RDD (특수 연산, 키를 이용한 사용자 지정 파티셔닝 개념 가짐)
주요속성
- 파티션의 목록
- 각 조각을 연산하는 함수
- 다른 RDD 와의 의존성 목록
- 부가적으로 RDD 를 위한 파티셔너
- 부가적으로 각 조각을 연산하기 위한 기본 위치 목록
생성하기
- 기존에 사용하던 DataFrame 이나 Dataset을 이용하여 생성
- textFile 을 통해 파일 읽어와서 생성
// 로컬 컬렉션으로 RDD 생성 spark.range(10).rdd // 데이터 처리 위해서는 row 객체를 올바른 데이터 타입으로 변환하거나 row 객체에서 값 추출 spark.range(10).toDF("id").rdd.map(lambda row: row[0]) // 데이터소스로 RDD 생성 spark.sparkContext.textFile('./some/path/withTextFiles', n)
- 마지막 파라미터 n 은 데이터 셋이 나눠진 파티션의 개수
- 한 클러스터 내에서 2-4개 파티션으로 데이터셋 나누는 것이 가장 좋다 보통.
컬렉션 객체를 RDD 로 만들 때
- sparkContext 의 parallelize 메서드 호출 ( 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환 / 파티션 수 명시적으로 지정)
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\ .split(" ") words = spark.sparkContext.parallelize(myCollection, 2)
지연평가
- RDD의 연산 종류는 transformation 과 action 이 있다.
- 액션은 데이터를 드라이버로 되돌려주든지(count, collect 등) 데이터를 외부 저장 시스템에 쓰는것(CopyToHadoop) 등의 일이다.
- 액션은 스케쥴러를 시작하게 하며, RDD 트랜스포메이션 간의 종속성에 바탕을 둔 DAG 를 생성한다.
- 트랜스포메이션의 로직 에러가 발생할 때, 지연 평가 때문에, 액션이 수행된 지점에서 실패한 것으로 나타나는 경우에 유의하자
- ex) word count 프로그램에서 null pointer exception 이 발생한다고 가정할때, 코드가 contains를 체크하는 시점이 아니라 collect 단계에서 예외가 발생한다.
트랜스포메이션
- 데이터셋 형태 만든다.
- filter : RDD 의 레코드를 모두 확인한 후 조건 함수를 만족하는 레코드 반환
- map :
- flatmap
- sortBy
- randomSplit
액션
- Reduce : 모든 값을 하나의 값으로 만듬
- count : RDD 전체의 로우 수
- take : RDD에서 가져올 값의 개수 파라미터로 사용
체크포인팅
- RDD 를 디스크에 저장하는 방식
- 저장된 RDD 를 참조할 때는 원본 데이터 소스를 다시 계산해 RDD 를 생성하지 않고 디스크에 저장된 중간 결과 파티션을 참조
- 반복적인 연산 수행 시 매우 유용하다.
spark.sparkContext.setCheckpointDir("some/path/for/checkpointing") words.checkpoint()
스파크 잡 스케쥴링
- 잡 실행 과정
- 스파크 프로그램 자체는 드라이버 노드에서 실행되며 일련의 명령들을 익스큐터에게 보낸다.
- 애플리케이션들은 클러스터 매니저에 의해 스케쥴링되고, 각각 하나의 SparkContext를 가진다.
- 스파크 애플리케이션들은 공존하는 여러 개의 잡을 차례로 실행할 수 있다.
- Job들은 애플리케이션의 한 RDD가 호출하는 각 액션에 대응한다.
'⏰ 오늘의 공부 > 기타' 카테고리의 다른 글
특정 commit 삭제하기 (0) 2022.01.21 Spark (0) 2020.03.02 Promise & async / await (0) 2020.01.27 Redis (0) 2020.01.24 Proxy 설정 (0) 2020.01.24 - RDD : 분산형 데이터 처리 (여러 서버에 분산되어 있는 데이터)