ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Quadcore Team/Trend]ํ•ด์‹œํƒœ๊ทธ ๋ถ„์„
    it's me/๐Ÿ‘ฉ‍๐Ÿ’ป ํ”„๋กœ์ ํŠธ ์ •๋ฆฌ 2020. 2. 26. 16:15

     

    ๋ฏธ๋ฃจ๊ณ  ๋ฏธ๋ฃจ๋‹ค ์ด์ œ์„œ์•ผ ์˜ฌ๋ฆฌ๋Š” ํ”„๋กœ์ ํŠธ ์ •๋ฆฌ 1...

    * Hashtag Ranking ๊ณผ์ •

    1) raw tweet ์ „์ฒ˜๋ฆฌ

    • raw tweet df ์ค‘ 'entities.hashtags.text' ์ถ”์ถœ ( ํ•ด์‹œํƒœ๊ทธ column ๋งŒ ์ถ”์ถœ )
    • rdd ๋กœ ๋งŒ๋“  ํ›„ collect()๋ฅผ ํ†ตํ•ด ๋ชจ๋“  row ํ•˜๋‚˜์˜ ๋ฆฌ์ŠคํŠธ ์•ˆ์— ์ €์žฅ๋˜๋„๋ก ํ•˜๊ธฐ
    def process(rdd):
        try:
            rawTweet = spark.read.json(rdd)
            #creates an in-memory table that is scoped to the cluster in which it was created.
            rawTweet.registerTempTable("tweets") 
            hashtag = rawTweet.selectExpr('entities.hashtags.text as hashtag').rdd.flatMap(lambda x : x)
            print(hashtag.collect())
            # ํ˜„์žฌ ํƒ€์ž„์— ๋“ค์–ด์˜จ hashtag ์ „์ฒ˜๋ฆฌ
            result = hashtag_processing(hashtag.collect())
    
            #word count ์ž‘์—…์„ ์œ„ํ•ด ๊ฒฐ๊ณผ rdd๋กœ ๋งŒ๋“ค์–ด์คŒ
            rdd = spark.sparkContext.parallelize(result)
            word_count(rdd)
    
        except:
            pass

     

     

    2) ํ•ด๋‹น ์‹œ๊ฐ„ ์•ˆ์— ๋“ค์–ด์˜จ hashtag ๋ชจ๋‘ ๋ชจ์•„ ์ „์ฒ˜๋ฆฌ

    • ๊ฐ tweet ๋ณ„๋กœ ์œ ์‚ฌ ๋‹จ์–ด ์ œ๊ฑฐ 

    ( ex) ๋ฉค๋ฒ„ ์ด๋ฆ„ : ์˜์–ด - ํ•œ๊ธ€ ํ•˜๋‚˜์˜ ๋‹จ์–ด๋กœ ๋งŒ๋“  ํ›„ set ์„ ํ†ตํ•ด ์ค‘๋ณต ์ œ๊ฑฐํ•˜๊ณ  ํ•˜๋‚˜๋งŒ ๋‚จ๊ธด๋‹ค.)

     

    ์ด๋ ‡๊ฒŒ ํ•˜๋Š” ์ด์œ  ?

    - ํ•˜๋‚˜์˜ ํŠธ์œ— ์•ˆ์—์„œ ์ค‘๋ณต ๋ฉค๋ฒ„ ์ด๋ฆ„์„ ๋‹ค ์ง‘๊ณ„ํ•˜๋Š” ๊ฒƒ์€ ํŠธ๋ Œ๋“œ ์ทจ์ง€์— ๋งž์ง€ ์•Š๋Š”๋‹ค๊ณ  ํŒ๋‹จ!

    - ํŠธ๋ Œ๋“œ(๋‹ค์–‘ํ•œ ์œ ์ €๋“ค์ด ์–ธ๊ธ‰ ๋งŽ์ด ํ•˜๋Š”) ์ˆœ์œ„ ์ง‘๊ณ„์— ํ•œ๋ฒˆ๋งŒ ๋ฐ˜์˜ํ•˜๋ฉด ๋œ๋‹ค๊ณ  ์ƒ๊ฐํ–ˆ๋‹ค. → 1 ํŠธ์œ— 1 ์ง‘๊ณ„

    • ์ „์ฒด ํŠธ์œ— ํ•ด์‹œํƒœ๊ทธ ์•ˆ์—์„œ ๋ถˆ์šฉ์–ด ์ œ๊ฑฐ
    # hashtag ์ „์ฒ˜๋ฆฌ
        def hashtag_processing(text):
            del_similar = []
            result = []
            # ๊ฐ tweet ๋ณ„ ์œ ์‚ฌ์–ด ์ œ๊ฑฐ
            for v in text:
                if not v:
                    continue
                words = '-'.join(v)
                words = words.upper()
                temp = []
                for i in similarwords[0]:
                    if i in words:
                        words = words.replace(i, 'JUNGKOOK')
                for i in similarwords[1]:
                    if i in words:
                        words = words.replace(i, 'JIMIN')
                for i in similarwords[2]:
                    if i in words:
                        words = words.replace(i, 'JHOPE')
                for i in similarwords[3]:
                    if i in words:
                        words = words.replace(i, 'SUGA')
                for i in similarwords[4]:
                    if i in words:
                        words = words.replace(i, 'V')
                for i in similarwords[5]:
                    if i in words:
                        words = words.replace(i, 'RM')
                for i in similarwords[6]:
                    if i in words:
                        words = words.replace(i, 'JIN')
                temp = words.split('-')
                temp = list(set(temp))
                del_similar.append(temp)
        
            total = list(chain.from_iterable(del_similar))  # ๋ฆฌ์ŠคํŠธ ์•ˆ์— ๋ฆฌ์ŠคํŠธ ํ•˜๋‚˜์˜ ๋ฆฌ์ŠคํŠธ๋กœ ํ•ฉ์น˜๊ธฐ
            # ๋ถˆ์šฉ์–ด ์ œ๊ฑฐ
            for i in total:
                if i not in mystopwords:
                    result.append(i)
        
            return result

     

     

    3) ์ „์ฒ˜๋ฆฌ ํ›„ ์ง‘๊ณ„ ๋œ hashtag word count

    • ํ•ด์‹œํƒœ๊ทธ ์ „์ฒ˜๋ฆฌ๊นŒ์ง€ ์™„๋ฃŒํ•œ ํ›„ word count ์ž‘์—…์„ ์œ„ํ•ด ๋‹ค์‹œ rdd ๋กœ ๋งŒ๋“ค์–ด ์ค€๋‹ค.
    • map, reduceByKey ํ™œ์šฉํ•˜์—ฌ transformation ์ˆ˜ํ–‰

      ( ์ด ๋•Œ, ์ตœ์†Œ 3๋ฒˆ ์ด์ƒ ๋“ฑ์žฅํ•œ hashtag ๋งŒ filter ์ ์šฉ)

       

    • takeOrdered ๋ฅผ ํ†ตํ•ด action ์ˆ˜ํ–‰

      ( ์ด ๋•Œ, ์ƒ์œ„ 20 ๊ฐœ๋งŒ ๊ฐ€์ ธ์˜ค๋„๋ก )

     

     

    4) ์ตœ์ข… ๊ฒฐ๊ณผ redis์— ์ €์žฅ ( ํด๋ผ์ด์–ธํŠธ์—์„œ ์š”์ฒญ ์‹œ ๋น ๋ฅด๊ฒŒ ์ „๋‹ฌํ•˜๊ธฐ ์œ„ํ•ด redis ์— ์ผ์ฃผ์ผ ๋งŒ๋ฃŒ๋กœ ์ €์žฅ)

    # ์ถ”์ถœ๋œ ๋‹จ์–ด word count
    def word_count(list):
        print('word count ๋“ค์–ด์˜ด')
        pairs = list.map(lambda word: (word, 1))
        # ์ƒ์œ„ 10๊ฐœ๋งŒ ๊ฐ€์ ธ์˜ค๊ธฐ + ๋“ฑ์žฅ๋นˆ๋„ 2๋ฒˆ ์ด์ƒ
        wordCounts = pairs.reduceByKey(lambda x, y: x + y).filter(lambda args: args[1] > 2)
        ranking = wordCounts.takeOrdered(15, lambda args: -args[1])
        print(ranking)
        return ranking
    
        # ํ•ด์‹œํƒœ๊ทธ ์ˆœ์œ„ ์ €์žฅ
    
    
    def save_hashtag(data, time):
        # key : 'hashtag' , value : ์ˆœ์œ„ ๊ฒฐ๊ณผ json ์œผ๋กœ redis ์ €์žฅ
        rank_to_json = json.dumps(data)
        myRedis.set('hashtag', rank_to_json, ex=60 * 60 * 24 * 7)
        print('์ €์žฅ์™„๋ฃŒ')

     

     

    ๊ฒฐ๊ณผ

     

    ๋Œ“๊ธ€

Designed by Tistory.