-
[Quadcore Team/Trend]ํด์ํ๊ทธ ๋ถ์it's me/๐ฉ๐ป ํ๋ก์ ํธ ์ ๋ฆฌ 2020. 2. 26. 16:15
๋ฏธ๋ฃจ๊ณ ๋ฏธ๋ฃจ๋ค ์ด์ ์์ผ ์ฌ๋ฆฌ๋ ํ๋ก์ ํธ ์ ๋ฆฌ 1...
* Hashtag Ranking ๊ณผ์
1) raw tweet ์ ์ฒ๋ฆฌ
- raw tweet df ์ค 'entities.hashtags.text' ์ถ์ถ ( ํด์ํ๊ทธ column ๋ง ์ถ์ถ )
- rdd ๋ก ๋ง๋ ํ collect()๋ฅผ ํตํด ๋ชจ๋ row ํ๋์ ๋ฆฌ์คํธ ์์ ์ ์ฅ๋๋๋ก ํ๊ธฐ
def process(rdd): try: rawTweet = spark.read.json(rdd) #creates an in-memory table that is scoped to the cluster in which it was created. rawTweet.registerTempTable("tweets") hashtag = rawTweet.selectExpr('entities.hashtags.text as hashtag').rdd.flatMap(lambda x : x) print(hashtag.collect()) # ํ์ฌ ํ์์ ๋ค์ด์จ hashtag ์ ์ฒ๋ฆฌ result = hashtag_processing(hashtag.collect()) #word count ์์ ์ ์ํด ๊ฒฐ๊ณผ rdd๋ก ๋ง๋ค์ด์ค rdd = spark.sparkContext.parallelize(result) word_count(rdd) except: pass
2) ํด๋น ์๊ฐ ์์ ๋ค์ด์จ hashtag ๋ชจ๋ ๋ชจ์ ์ ์ฒ๋ฆฌ
- ๊ฐ tweet ๋ณ๋ก ์ ์ฌ ๋จ์ด ์ ๊ฑฐ
( ex) ๋ฉค๋ฒ ์ด๋ฆ : ์์ด - ํ๊ธ ํ๋์ ๋จ์ด๋ก ๋ง๋ ํ set ์ ํตํด ์ค๋ณต ์ ๊ฑฐํ๊ณ ํ๋๋ง ๋จ๊ธด๋ค.)
์ด๋ ๊ฒ ํ๋ ์ด์ ?
- ํ๋์ ํธ์ ์์์ ์ค๋ณต ๋ฉค๋ฒ ์ด๋ฆ์ ๋ค ์ง๊ณํ๋ ๊ฒ์ ํธ๋ ๋ ์ทจ์ง์ ๋ง์ง ์๋๋ค๊ณ ํ๋จ!
- ํธ๋ ๋(๋ค์ํ ์ ์ ๋ค์ด ์ธ๊ธ ๋ง์ด ํ๋) ์์ ์ง๊ณ์ ํ๋ฒ๋ง ๋ฐ์ํ๋ฉด ๋๋ค๊ณ ์๊ฐํ๋ค. → 1 ํธ์ 1 ์ง๊ณ
- ์ ์ฒด ํธ์ ํด์ํ๊ทธ ์์์ ๋ถ์ฉ์ด ์ ๊ฑฐ
# hashtag ์ ์ฒ๋ฆฌ def hashtag_processing(text): del_similar = [] result = [] # ๊ฐ tweet ๋ณ ์ ์ฌ์ด ์ ๊ฑฐ for v in text: if not v: continue words = '-'.join(v) words = words.upper() temp = [] for i in similarwords[0]: if i in words: words = words.replace(i, 'JUNGKOOK') for i in similarwords[1]: if i in words: words = words.replace(i, 'JIMIN') for i in similarwords[2]: if i in words: words = words.replace(i, 'JHOPE') for i in similarwords[3]: if i in words: words = words.replace(i, 'SUGA') for i in similarwords[4]: if i in words: words = words.replace(i, 'V') for i in similarwords[5]: if i in words: words = words.replace(i, 'RM') for i in similarwords[6]: if i in words: words = words.replace(i, 'JIN') temp = words.split('-') temp = list(set(temp)) del_similar.append(temp) total = list(chain.from_iterable(del_similar)) # ๋ฆฌ์คํธ ์์ ๋ฆฌ์คํธ ํ๋์ ๋ฆฌ์คํธ๋ก ํฉ์น๊ธฐ # ๋ถ์ฉ์ด ์ ๊ฑฐ for i in total: if i not in mystopwords: result.append(i) return result
3) ์ ์ฒ๋ฆฌ ํ ์ง๊ณ ๋ hashtag word count
- ํด์ํ๊ทธ ์ ์ฒ๋ฆฌ๊น์ง ์๋ฃํ ํ word count ์์ ์ ์ํด ๋ค์ rdd ๋ก ๋ง๋ค์ด ์ค๋ค.
-
map, reduceByKey ํ์ฉํ์ฌ transformation ์ํ
( ์ด ๋, ์ต์ 3๋ฒ ์ด์ ๋ฑ์ฅํ hashtag ๋ง filter ์ ์ฉ)
-
takeOrdered ๋ฅผ ํตํด action ์ํ
( ์ด ๋, ์์ 20 ๊ฐ๋ง ๊ฐ์ ธ์ค๋๋ก )
4) ์ต์ข ๊ฒฐ๊ณผ redis์ ์ ์ฅ ( ํด๋ผ์ด์ธํธ์์ ์์ฒญ ์ ๋น ๋ฅด๊ฒ ์ ๋ฌํ๊ธฐ ์ํด redis ์ ์ผ์ฃผ์ผ ๋ง๋ฃ๋ก ์ ์ฅ)
# ์ถ์ถ๋ ๋จ์ด word count def word_count(list): print('word count ๋ค์ด์ด') pairs = list.map(lambda word: (word, 1)) # ์์ 10๊ฐ๋ง ๊ฐ์ ธ์ค๊ธฐ + ๋ฑ์ฅ๋น๋ 2๋ฒ ์ด์ wordCounts = pairs.reduceByKey(lambda x, y: x + y).filter(lambda args: args[1] > 2) ranking = wordCounts.takeOrdered(15, lambda args: -args[1]) print(ranking) return ranking # ํด์ํ๊ทธ ์์ ์ ์ฅ def save_hashtag(data, time): # key : 'hashtag' , value : ์์ ๊ฒฐ๊ณผ json ์ผ๋ก redis ์ ์ฅ rank_to_json = json.dumps(data) myRedis.set('hashtag', rank_to_json, ex=60 * 60 * 24 * 7) print('์ ์ฅ์๋ฃ')
๊ฒฐ๊ณผ
'it's me > ๐ฉโ๐ป ํ๋ก์ ํธ ์ ๋ฆฌ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Quadcore Team/Trend] ์ธ๊ธฐ ํธ์ ๋ญํน (0) 2020.03.02 [4์ฃผ์ฐจ] - word count with Spark (0) 2020.02.01 QuadCore Team Project (0) 2020.02.01 Authorization System (0) 2020.01.15