ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark - RDD
    ⏰ 오늘의 공부/기타 2020. 2. 1. 00:23

    저수준 api

    • RDD : 분산형 데이터 처리 (여러 서버에 분산되어 있는 데이터)
      → 이 RDD들이 각 노드에서 병렬적으로 동시에 처리되어 나중에 합쳐지거나 또 추가적인 연산 진행
    • 브로드캐스트 변수, 어큐뮬레이터 → 분산형 공유 변수를 베포하고 다루기 위한 api

     

    언제 사용 ?

    • 고수준 api 에서 제공하지 않는 기능이 필요한 경우
    • RDD 이용해 개발된 기존 코드 유지하는 경우
    • 사용자가 정의한 공유 변수를 다뤄야 하는 경우

     

    즉, 물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝) 에 세부적인 제어가 필요할 때

    → 기본적으로는 구조적 api 사용하는 것이 좋다.

     

    RDD

    • RDD 레코드 : 프로그래머가 선택한 java, python, scala 등의 객체
    • 객체이므로 사용자가 완벽하게 제어 가능
    • 강력한 제어권을 가지고 있지만 모든 값을 다루거나, 값 사이의 상호작용 과정을 반드시 수동으로 정의해야함
    • 구조적 api 와 다르게 레코드의 내부 구조를 스파크에서 파악할 수 없어서 최적화를 위한 수작업이 필요
    • data lineage 가 tracking 되어 fault tolerant 하다.

     

    종류

    • 제네릭 RDD
    • 키 - 값 RDD (특수 연산, 키를 이용한 사용자 지정 파티셔닝 개념 가짐)

     

    주요속성

    • 파티션의 목록
    • 각 조각을 연산하는 함수
    • 다른 RDD 와의 의존성 목록
    • 부가적으로 RDD 를 위한 파티셔너
    • 부가적으로 각 조각을 연산하기 위한 기본 위치 목록

     

    생성하기

    • 기존에 사용하던 DataFrame 이나 Dataset을 이용하여 생성 
    • textFile 을 통해 파일 읽어와서 생성
    // 로컬 컬렉션으로 RDD 생성
    spark.range(10).rdd
    
    // 데이터 처리 위해서는 row 객체를 올바른 데이터 타입으로 변환하거나 row 객체에서 값 추출
    spark.range(10).toDF("id").rdd.map(lambda row: row[0])
    
    // 데이터소스로 RDD 생성
    spark.sparkContext.textFile('./some/path/withTextFiles', n)
    • 마지막 파라미터 n 은 데이터 셋이 나눠진 파티션의 개수
    • 한 클러스터 내에서 2-4개 파티션으로 데이터셋 나누는 것이 가장 좋다 보통.

     

    컬렉션 객체를 RDD 로 만들 때

    • sparkContext 의 parallelize 메서드 호출 ( 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환 / 파티션 수 명시적으로 지정)
    myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
    .split(" ")
    
    words = spark.sparkContext.parallelize(myCollection, 2)

     

    지연평가

    • RDD의 연산 종류는 transformation 과 action 이 있다.
    • 액션은 데이터를 드라이버로 되돌려주든지(count, collect 등) 데이터를 외부 저장 시스템에 쓰는것(CopyToHadoop) 등의 일이다.
    • 액션은 스케쥴러를 시작하게 하며, RDD 트랜스포메이션 간의 종속성에 바탕을 둔 DAG 를 생성한다.
    • 트랜스포메이션의 로직 에러가 발생할 때, 지연 평가 때문에, 액션이 수행된 지점에서 실패한 것으로 나타나는 경우에 유의하자
      • ex) word count 프로그램에서 null pointer exception 이 발생한다고 가정할때, 코드가 contains를 체크하는 시점이 아니라 collect 단계에서 예외가 발생한다.

     

    트랜스포메이션

    • 데이터셋 형태 만든다.
    • filter : RDD 의 레코드를 모두 확인한 후 조건 함수를 만족하는 레코드 반환
    • map :
    • flatmap
    • sortBy
    • randomSplit

     

    액션

    • Reduce : 모든 값을 하나의 값으로 만듬
    • count : RDD 전체의 로우 수
    • take : RDD에서 가져올 값의 개수 파라미터로 사용

     

    체크포인팅

    • RDD 를 디스크에 저장하는 방식
    • 저장된 RDD 를 참조할 때는 원본 데이터 소스를 다시 계산해 RDD 를 생성하지 않고 디스크에 저장된 중간 결과 파티션을 참조
    • 반복적인 연산 수행 시 매우 유용하다.
    spark.sparkContext.setCheckpointDir("some/path/for/checkpointing")
    words.checkpoint()

     

    스파크 잡 스케쥴링

    • 잡 실행 과정
      • 스파크 프로그램 자체는 드라이버 노드에서 실행되며 일련의 명령들을 익스큐터에게 보낸다.
      • 애플리케이션들은 클러스터 매니저에 의해 스케쥴링되고, 각각 하나의 SparkContext를 가진다.
      • 스파크 애플리케이션들은 공존하는 여러 개의 잡을 차례로 실행할 수 있다.
      • Job들은 애플리케이션의 한 RDD가 호출하는 각 액션에 대응한다.

     

     

    '⏰ 오늘의 공부 > 기타' 카테고리의 다른 글

    특정 commit 삭제하기  (0) 2022.01.21
    Spark  (0) 2020.03.02
    Promise & async / await  (0) 2020.01.27
    Redis  (0) 2020.01.24
    Proxy 설정  (0) 2020.01.24

    댓글

Designed by Tistory.