ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark
    ⏰ 오늘의 공부/기타 2020. 3. 2. 20:06

     

    프로젝트를 통해 내가 부딪혔던 부분들 위주로 정리하려고 한다.

     

    SparkSession - Spark 모든 기능에 대한 진입점

    spark = pyspark.sql.SparkSession.builder.appName("pysaprk_python").getOrCreate()

    builder - 객체 생성

    master - 실행 환경 설정

    • local : 로컬 실행
    • local[4] : 4코어로 로컬 실행

     

    config - 실행 옵션 설정, SparkConf 및 SparkSession 자체 구성에 자동으로 전파

    getOrCreate - 기존 SparkSession을 가져오거나 없는 경우 실더에 설정된 옵션을 기반으로 새로운 SparkSession을 생성

     

     

    Spark collect () / select()

     

    collect : (Action)

    Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

    ⇒ collect()를 통해서 RDD안에 있는 데이터를 드라이버 프로그램 속으로 리스트 모양으로 넣을 수 있다.

    ⇒ collect() 연산은 모든 data 를 드라이버 메모리로 가져오기 때문에 반드시 충분한 메모리가 있는 상황에서 사용해야 한다!

     

    select : (Transformation)

    Projects a set of expressions and returns a new DataFrame.

     

     

    정렬된 요소 가져오기 ( top / takeOrdered)

    top / takeOrdered : RDD 상위 또는 하위 n 개 요소 가져오는 행동 연산자

     

    *유의점

    top과 takeOrdered는 전체 데이터를 정렬하지 않는다.

    각 파티션에서 상위(또는 하위) n개 요소를 가져온 후 이 결과를 병합하고, 이 중 상위(또는 하위) n개 요소를 반환한다.

    따라서 top과 takeOrdered 연산자는 훨씬 더 적은 양의 데이터를 네트워크로 전송

    sortBy와 take를 개별적으로 호출하는 것보다 무척 빠르다.

     

    하지만 collect와 마찬가지로 모든 결과를 드라이버의 메모리로 가져오기 때문에 n에 너무 큰 값을 지정해서는 안 된다!

    큰 데이터를 다룰 때는 sort()!

     

     

    transformation (실제 연산 수행 X)

     

    filter(): rdd 내용 중 내가 원하는 내용만 걸러주는 함수

    ( filter() 이후로도 RDD 자체는 바뀌지 않는다 !)

     

    flatMap() : 하나의 RDD를 여러 RDD 결과로 반환하고 싶을 때

    map() : 하나의 RDD → 다른 하나의 RDD로 변환

     

    withColumn() :

    Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

    Parameters:

    • colName – string, name of the new column.
    • col – a Column expression for the new column.

     

    2020.2.17 오늘의 배움 1 :

    col 에는 Column object, lit()을 통한 default 객체...등 만 넣을 수 있다..

    col 에는 같은 DataFrame 내의 col 연산 수행한 값만 넣을 수 있다.

    # (favorite + quote + retweet) count 값 합산한 새로운 column 'total' 생성
    df = df.withColumn('total', df.favorite_count + df.quote_count + df.retweeted_count)

     df 라고 정의된 DataFrame 내의 col 만 새로운 칼럼으로 추가 가능..

     

    결과

     

     

    zip():

    Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc.

     zip 연산자는 두 RDD의 각 요소를 순서대로 짝을 지은 새로운 RDD[(T, U)]를 반환한다.

    >>> x = sc.parallelize(range(0,5))
    >>> y = sc.parallelize(range(1000, 1005))
    >>> x.zip(y).collect()
    [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

    ✔️ List 값들로 새로운 DataFrame 생성하기

    # schema 생성
    struct = StructType([StructField("tweet_id", LongType(), True),
                         StructField("tweet_favorite_count", LongType(), True),
                         StructField("tweet_quote_count", LongType(), True),
                         StructField("tweet_retweeted_count", LongType(), True)])
    # 리트윗된 트윗들만 추출
    rdd = lines.selectExpr(schema).where(lines.retweeted == True)
    
    # df 에서 필요한 data 추출하여 list 로 만듬
    tweet_id = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\
                                                     .map(lambda v : v['id']).collect()
    tweet_favorite_count = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\
                                                     .map(lambda v : v['favorite_count']).collect()
    tweet_quoted_count = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\
                                                     .map(lambda v : v['quote_count']).collect()
    tweet_retweet_count = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\
                                                     .map(lambda v : v['retweet_count']).collect()
    
    # 새로운 dataframe 생성
    newDF = spark.createDataFrame(zip(tweet_id, tweet_favorite_count,tweet_quoted_count,tweet_retweet_count), struct)
    newDF.show()

    방법 :

    • list 들을 zip()을 통해 모아준다. [(list1, list2, list3, list4), (), () ] - - - > zip을 통해 이런 형태로 바꿔줌
    • createDataFrame() = Schema - row : 생성한 스키마 - zip()을 통해 모은 값으로 df 생성

     

     이 부분 때문에 엄청 헤매었는 데 성공해서 굉장히 뿌듯하다...

     

     

     


    참조: https://hero0926.tistory.com/6?category=736452

    '⏰ 오늘의 공부 > 기타' 카테고리의 다른 글

    특정 commit 삭제하기  (0) 2022.01.21
    Spark - RDD  (0) 2020.02.01
    Promise & async / await  (0) 2020.01.27
    Redis  (0) 2020.01.24
    Proxy 설정  (0) 2020.01.24

    댓글

Designed by Tistory.