-
์ด์ ์์ผ ์ฌ๋ฆฌ๋ ํ๋ก์ ํธ ์ ๋ฆฌ 2...
* ์ธ๊ธฐ ํธ์ ๋ญํน ์ถ์ถ ๊ณผ์
1) Cassandra to Spark
- ์นด์ฐ๋๋ผ์ raw data ์ค ๋ฆฌํธ์ ๋ data ์ถ์ถ ( ์ฌ๋ผ์ค๋ ํธ์ ์ค 80% ์ด์์ด ๋ฆฌํธ์ ๋ฐ์ดํฐ)
- ํ์ฌ์๊ฐ ๊ธฐ์ค 30์ด ์ ๊น์ง data ์ถ์ถ
if __name__ == "__main__": while True: # ํ์ฌ์๊ฐ ๋ง์ดํฌ๋ก ์ธ์ปจ์ฆ ๊น์ง current_time = int(time.time() * 1000000) # ํ์ฌ์๊ฐ ๋ง์ดํฌ๋ก ์ธ์ปจ์ฆ ๊น์ง # redis ์ ์ฅ ํฌ๋งท ์๊ฐ ํ์ ( ๋ /์/์ผ/์/๋ถ) ์ผ๋ก current_time_format = datetime.datetime.fromtimestamp(int(current_time/1000000)).strftime('%Y/%m/%d/%H/%M') # ์นด์ฐ๋๋ผ๋ก๋ถํฐ data ๋ถ๋ฌ์ค๊ธฐ (30์ด ๋ง๋ค) lines = spark.read \ .format("org.apache.spark.sql.cassandra") \ .options(table="master_dataset", keyspace="bts") \ .load().select(schema) .where(col('timestamp') >= current_time - SECONDS)\ .where(col('timestamp') <= current_time)\ .where(col('retweeted') == True).limit(100).cache() print(current_time_format) print(current_time) # ํ์ฌ์๊ฐ ์ถ๋ ฅ result = process_tweet(lines) if result is not False: #print(result) save_tweet(result, current_time_format) else: print('there is no data') time.sleep(10)
2) Tweet Processing
data ๊ฐ ์กด์ฌํ ๊ฒฝ์ฐ
- retweeted_status ์ค id, favorite_count, quote_count, retweet_count
- retweeted_status ์ ์ฒด
- ํด๋น ๊ธ์ด ์ฐ์ฌ์ง ์๊ฐ timestamp ๊ฐ
๋ฝ์๋ด์ด ์๋ก์ด dataframe ์์ฑ --> ์ด ๊ณผ์ ์์ ์์ฒญ ํค๋งธ๋ค...(๋ฐ๋ก ์ ๋ฆฌํ ์์ )
์๋ก์ด dataframe ์ StructType
struct = StructType([StructField("tweet_id", LongType(), False), StructField("tweet_content", StringType(), False), StructField("favorite_count", LongType(), False), StructField("quote_count", LongType(), False), StructField("retweeted_count", LongType(), False), StructField("timestamp", LongType(), False)])
def process_tweet(data): print('process ๋ค์ด์ด') if bool(data.take(1)): # df ์์ฑ์ ํ์ํ ๋ฐ์ดํฐ ์ถ์ถ (๋ฆฌํธ์ ๋ ํธ์ id, ํธ์ ๋ด์ฉ, ํธ์ ์ข์์ ๊ฐ์, ํธ์ ์ธ์ฉ ๊ฐ์, ๋ฆฌํธ์ ๊ฐ์, ์์ฑ์๊ฐ) tweet_content = data.select('retweeted_status').rdd.flatMap(lambda value: value).collect() tweet_id = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \ .map(lambda v: v['id']).collect() favorite_count = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \ .map(lambda v: v['favorite_count']).collect() quoted_count = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \ .map(lambda v: v['quote_count']).collect() retweeted_count = data.select('retweeted_status').rdd.map(lambda value: json.loads(value[0])) \ .map(lambda v: v['retweet_count']).collect() timestamp = data.select('timestamp').rdd.flatMap(lambda value: value).collect() # ์๋ก์ด dataframe ์์ฑ newDF = spark.createDataFrame( zip(tweet_id, tweet_content, favorite_count, quoted_count, retweeted_count, timestamp), struct) newDF.show() result = rank_tweet(newDF) return result else: return False
3) tweet Ranking
- ์๋กญ๊ฒ ๋ง๋ dataframe ์์ favorite, quote, retweet count ๊ฐ ํฉ์ฐํ ์๋ก์ด ์นผ๋ผ total ์์ฑ
- ๊ฐ์ tweet_id ์ค ์ต์ ๊ฒ๋ง ์ง๊ณํ์ฌ total ์์ผ๋ก ์ ๋ ฌ
- ์์ 10๊ฐ tweet ๋ด์ฉ๋ง ๋ฆฌ์คํธ๋ก ๋ณํ
# ํธ์ ๋ญํน ์ง๊ณ def rank_tweet(df): if bool(df.take(1)): print('rank ๋ค์ด์ด') # (favorite + quote + retweet) count ๊ฐ ํฉ์ฐํ ์๋ก์ด column 'total' ์์ฑ df = df.withColumn('total', df.favorite_count + df.quote_count + df.retweeted_count) # ๊ฐ์ tweet_id ์ค ์ต์ ๊ฒ๋ง ์ง๊ณ (total ์์ผ๋ก) rank = df.groupBy(df.tweet_id).agg( f.first('tweet_content').alias('tweet_content'), f.first('total').alias('total'), f.max('timestamp').alias('timestamp') ).orderBy('total', ascending=False) rank.show() # ์์ ๋ญํน 10๊ฐ์ tweet ๋ด์ฉ ๋ฆฌ์คํธ๋ก ๋ณํ tweet_list = rank.select('tweet_content').rdd.flatMap(lambda x: x).take(10) return tweet_list else: print('๋ฐ์ดํฐํ๋ ์ ๋น์ด์์') return False
4) Save Tweet
- python list ํํ์ data ๋ฅผ redis ์ ์ ์ฅํ๊ธฐ ์ํด bytes ๋ก ๋ณํ์ํด
- ํ์ฌ์๊ฐ ( ๋ /์/์ผ/์/๋ถ ) ๊ธฐ์ค: key ์ผ๋ก redis์ ์ผ์ฃผ์ผ ๋ง๋ฃ ํ์ ์ง์ ํด์ ์ ์ฅ
๊ฒฐ๊ณผ
'it's me > ๐ฉโ๐ป ํ๋ก์ ํธ ์ ๋ฆฌ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Quadcore Team/Trend]ํด์ํ๊ทธ ๋ถ์ (0) 2020.02.26 [4์ฃผ์ฐจ] - word count with Spark (0) 2020.02.01 QuadCore Team Project (0) 2020.02.01 Authorization System (0) 2020.01.15