-
Spark⏰ 오늘의 공부/기타 2020. 3. 2. 20:06
프로젝트를 통해 내가 부딪혔던 부분들 위주로 정리하려고 한다.
SparkSession - Spark 모든 기능에 대한 진입점
spark = pyspark.sql.SparkSession.builder.appName("pysaprk_python").getOrCreate()
builder - 객체 생성
master - 실행 환경 설정
- local : 로컬 실행
- local[4] : 4코어로 로컬 실행
config - 실행 옵션 설정, SparkConf 및 SparkSession 자체 구성에 자동으로 전파
getOrCreate - 기존 SparkSession을 가져오거나 없는 경우 실더에 설정된 옵션을 기반으로 새로운 SparkSession을 생성
Spark collect () / select()
collect : (Action)
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
⇒ collect()를 통해서 RDD안에 있는 데이터를 드라이버 프로그램 속으로 리스트 모양으로 넣을 수 있다.
⇒ collect() 연산은 모든 data 를 드라이버 메모리로 가져오기 때문에 반드시 충분한 메모리가 있는 상황에서 사용해야 한다!
select : (Transformation)
Projects a set of expressions and returns a new DataFrame.
정렬된 요소 가져오기 ( top / takeOrdered)
top / takeOrdered : RDD 상위 또는 하위 n 개 요소 가져오는 행동 연산자
*유의점
top과 takeOrdered는 전체 데이터를 정렬하지 않는다.
각 파티션에서 상위(또는 하위) n개 요소를 가져온 후 이 결과를 병합하고, 이 중 상위(또는 하위) n개 요소를 반환한다.
따라서 top과 takeOrdered 연산자는 훨씬 더 적은 양의 데이터를 네트워크로 전송
sortBy와 take를 개별적으로 호출하는 것보다 무척 빠르다.
하지만 collect와 마찬가지로 모든 결과를 드라이버의 메모리로 가져오기 때문에 n에 너무 큰 값을 지정해서는 안 된다!
큰 데이터를 다룰 때는 sort()!
transformation (실제 연산 수행 X)
filter(): rdd 내용 중 내가 원하는 내용만 걸러주는 함수
( filter() 이후로도 RDD 자체는 바뀌지 않는다 !)
flatMap() : 하나의 RDD를 여러 RDD 결과로 반환하고 싶을 때
map() : 하나의 RDD → 다른 하나의 RDD로 변환
withColumn() :
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters:
- colName – string, name of the new column.
- col – a Column expression for the new column.
2020.2.17 오늘의 배움 1 :
col 에는 Column object, lit()을 통한 default 객체...등 만 넣을 수 있다..
col 에는 같은 DataFrame 내의 col 연산 수행한 값만 넣을 수 있다.
# (favorite + quote + retweet) count 값 합산한 새로운 column 'total' 생성 df = df.withColumn('total', df.favorite_count + df.quote_count + df.retweeted_count)
⇒ df 라고 정의된 DataFrame 내의 col 만 새로운 칼럼으로 추가 가능..
결과
zip():
Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc.
⇒ zip 연산자는 두 RDD의 각 요소를 순서대로 짝을 지은 새로운 RDD[(T, U)]를 반환한다.
>>> x = sc.parallelize(range(0,5)) >>> y = sc.parallelize(range(1000, 1005)) >>> x.zip(y).collect() [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
✔️ List 값들로 새로운 DataFrame 생성하기
# schema 생성 struct = StructType([StructField("tweet_id", LongType(), True), StructField("tweet_favorite_count", LongType(), True), StructField("tweet_quote_count", LongType(), True), StructField("tweet_retweeted_count", LongType(), True)]) # 리트윗된 트윗들만 추출 rdd = lines.selectExpr(schema).where(lines.retweeted == True) # df 에서 필요한 data 추출하여 list 로 만듬 tweet_id = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\ .map(lambda v : v['id']).collect() tweet_favorite_count = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\ .map(lambda v : v['favorite_count']).collect() tweet_quoted_count = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\ .map(lambda v : v['quote_count']).collect() tweet_retweet_count = rdd.select('tweet_content').rdd.map(lambda value: json.loads(value[0]))\ .map(lambda v : v['retweet_count']).collect() # 새로운 dataframe 생성 newDF = spark.createDataFrame(zip(tweet_id, tweet_favorite_count,tweet_quoted_count,tweet_retweet_count), struct) newDF.show()
방법 :
- list 들을 zip()을 통해 모아준다. [(list1, list2, list3, list4), (), () ] - - - > zip을 통해 이런 형태로 바꿔줌
- createDataFrame() = Schema - row : 생성한 스키마 - zip()을 통해 모은 값으로 df 생성
⇒ 이 부분 때문에 엄청 헤매었는 데 성공해서 굉장히 뿌듯하다...
'⏰ 오늘의 공부 > 기타' 카테고리의 다른 글
특정 commit 삭제하기 (0) 2022.01.21 Spark - RDD (0) 2020.02.01 Promise & async / await (0) 2020.01.27 Redis (0) 2020.01.24 Proxy 설정 (0) 2020.01.24