ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Quadcore Team/Trend] ์ธ๊ธฐ ํŠธ์œ— ๋žญํ‚น
    it's me/๐Ÿ‘ฉ‍๐Ÿ’ป ํ”„๋กœ์ ํŠธ ์ •๋ฆฌ 2020. 3. 2. 18:50

     

    ์ด์ œ์„œ์•ผ ์˜ฌ๋ฆฌ๋Š” ํ”„๋กœ์ ํŠธ ์ •๋ฆฌ 2...

     

    * ์ธ๊ธฐ ํŠธ์œ— ๋žญํ‚น ์ถ”์ถœ ๊ณผ์ •

     

    1) Cassandra to Spark

    • ์นด์‚ฐ๋“œ๋ผ์˜ raw data ์ค‘ ๋ฆฌํŠธ์œ— ๋œ data ์ถ”์ถœ ( ์˜ฌ๋ผ์˜ค๋Š” ํŠธ์œ— ์ค‘ 80% ์ด์ƒ์ด ๋ฆฌํŠธ์œ— ๋ฐ์ดํ„ฐ)
    • ํ˜„์žฌ์‹œ๊ฐ„ ๊ธฐ์ค€ 30์ดˆ ์ „๊นŒ์ง€ data ์ถ”์ถœ
    if __name__ == "__main__":
            while True:
                # ํ˜„์žฌ์‹œ๊ฐ„ ๋งˆ์ดํฌ๋กœ ์„ธ์ปจ์ฆˆ ๊นŒ์ง€
                current_time = int(time.time() * 1000000)  # ํ˜„์žฌ์‹œ๊ฐ„ ๋งˆ์ดํฌ๋กœ ์„ธ์ปจ์ฆˆ ๊นŒ์ง€
                # redis ์ €์žฅ ํฌ๋งท ์‹œ๊ฐ„ ํ˜•์‹ ( ๋…„/์›”/์ผ/์‹œ/๋ถ„) ์œผ๋กœ 
                current_time_format = datetime.datetime.fromtimestamp(int(current_time/1000000)).strftime('%Y/%m/%d/%H/%M')
                # ์นด์‚ฐ๋“œ๋ผ๋กœ๋ถ€ํ„ฐ data ๋ถˆ๋Ÿฌ์˜ค๊ธฐ (30์ดˆ ๋งˆ๋‹ค)
                lines = spark.read \
                .format("org.apache.spark.sql.cassandra") \
                .options(table="master_dataset", keyspace="bts") \
                .load().select(schema)
                .where(col('timestamp') >= current_time - SECONDS)\
                .where(col('timestamp') <= current_time)\
                .where(col('retweeted') == True).limit(100).cache()
                print(current_time_format)
                print(current_time)  # ํ˜„์žฌ์‹œ๊ฐ„ ์ถœ๋ ฅ
    
                result = process_tweet(lines)
                if result is not False:
                    #print(result)
                    save_tweet(result, current_time_format)
                else:
                    print('there is no data')
                time.sleep(10)

     

    2) Tweet Processing

     

    data ๊ฐ€ ์กด์žฌํ•  ๊ฒฝ์šฐ

    • retweeted_status ์ค‘ id, favorite_count, quote_count, retweet_count 
    • retweeted_status ์ „์ฒด
    • ํ•ด๋‹น ๊ธ€์ด ์“ฐ์—ฌ์ง„ ์‹œ๊ฐ„ timestamp ๊ฐ’

     

    ๋ฝ‘์•„๋‚ด์–ด ์ƒˆ๋กœ์šด dataframe ์ƒ์„ฑ --> ์ด ๊ณผ์ •์—์„œ ์—„์ฒญ ํ—ค๋งธ๋‹ค...(๋”ฐ๋กœ ์ •๋ฆฌํ•  ์˜ˆ์ •)

     

    ์ƒˆ๋กœ์šด dataframe ์˜ StructType

    struct = StructType([StructField("tweet_id", LongType(), False),
                         StructField("tweet_content", StringType(), False),
                         StructField("favorite_count", LongType(), False),
                         StructField("quote_count", LongType(), False),
                         StructField("retweeted_count", LongType(), False),
                         StructField("timestamp", LongType(), False)])
    def process_tweet(data):
        print('process ๋“ค์–ด์˜ด')
        if bool(data.take(1)):        
            # df ์ƒ์„ฑ์— ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ ์ถ”์ถœ (๋ฆฌํŠธ์œ— ๋œ ํŠธ์œ— id, ํŠธ์œ„ ๋‚ด์šฉ, ํŠธ์œ— ์ข‹์•„์š” ๊ฐœ์ˆ˜, ํŠธ์œ— ์ธ์šฉ ๊ฐœ์ˆ˜, ๋ฆฌํŠธ์œ— ๊ฐœ์ˆ˜, ์ž‘์„ฑ์‹œ๊ฐ„)
            tweet_content = data.select('retweeted_status').rdd.flatMap(lambda value: value).collect()
            tweet_id = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \
                .map(lambda v: v['id']).collect()
            favorite_count = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \
                .map(lambda v: v['favorite_count']).collect()
            quoted_count = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \
                .map(lambda v: v['quote_count']).collect()
            retweeted_count = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \
                .map(lambda v: v['retweet_count']).collect()
            timestamp = data.select('timestamp').rdd.flatMap(lambda value: value).collect()
    
            # ์ƒˆ๋กœ์šด dataframe ์ƒ์„ฑ
            newDF = spark.createDataFrame(
                zip(tweet_id, tweet_content, favorite_count, quoted_count, retweeted_count, timestamp), struct)
            newDF.show()
            result = rank_tweet(newDF)
            return result
        else:
            return False

     

    3) tweet Ranking

    • ์ƒˆ๋กญ๊ฒŒ ๋งŒ๋“  dataframe ์—์„œ favorite, quote, retweet count ๊ฐ’ ํ•ฉ์‚ฐํ•œ ์ƒˆ๋กœ์šด ์นผ๋Ÿผ total ์ƒ์„ฑ
    • ๊ฐ™์€ tweet_id ์ค‘ ์ตœ์‹  ๊ฒƒ๋งŒ ์ง‘๊ณ„ํ•˜์—ฌ total ์ˆœ์œผ๋กœ ์ •๋ ฌ
    • ์ƒ์œ„ 10๊ฐœ tweet ๋‚ด์šฉ๋งŒ ๋ฆฌ์ŠคํŠธ๋กœ ๋ณ€ํ™˜
    # ํŠธ์œ— ๋žญํ‚น ์ง‘๊ณ„
    def rank_tweet(df):
        if bool(df.take(1)):
            print('rank ๋“ค์–ด์˜ด')
            # (favorite + quote + retweet) count ๊ฐ’ ํ•ฉ์‚ฐํ•œ ์ƒˆ๋กœ์šด column 'total' ์ƒ์„ฑ
            df = df.withColumn('total', df.favorite_count + df.quote_count + df.retweeted_count)
            # ๊ฐ™์€ tweet_id ์ค‘ ์ตœ์‹  ๊ฒƒ๋งŒ ์ง‘๊ณ„ (total ์ˆœ์œผ๋กœ)
            rank = df.groupBy(df.tweet_id).agg(
                f.first('tweet_content').alias('tweet_content'),
                f.first('total').alias('total'),
                f.max('timestamp').alias('timestamp')
            ).orderBy('total', ascending=False)
            rank.show()
            # ์ƒ์œ„ ๋žญํ‚น 10๊ฐœ์˜ tweet ๋‚ด์šฉ ๋ฆฌ์ŠคํŠธ๋กœ ๋ณ€ํ™˜
            tweet_list = rank.select('tweet_content').rdd.flatMap(lambda x: x).take(10)
            return tweet_list
        else:
            print('๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๋น„์–ด์žˆ์Œ')
            return False

     

    4) Save Tweet

    • python list ํ˜•ํƒœ์˜ data ๋ฅผ redis ์— ์ €์žฅํ•˜๊ธฐ ์œ„ํ•ด bytes ๋กœ ๋ณ€ํ™˜์‹œํ‚ด
    • ํ˜„์žฌ์‹œ๊ฐ„ ( ๋…„/์›”/์ผ/์‹œ/๋ถ„ ) ๊ธฐ์ค€: key ์œผ๋กœ redis์— ์ผ์ฃผ์ผ ๋งŒ๋ฃŒ ํƒ€์ž„ ์ง€์ •ํ•ด์„œ ์ €์žฅ

     

     

    ๊ฒฐ๊ณผ

     

    count ์ˆ˜๊ฐ€ ๋งŽ์€ ์ˆœ๋Œ€๋กœ ์ •๋ ฌํ•œ ๊ฒฐ๊ณผ

    ๋Œ“๊ธ€

Designed by Tistory.