-
[4์ฃผ์ฐจ] - word count with Sparkit's me/๐ฉ๐ป ํ๋ก์ ํธ ์ ๋ฆฌ 2020. 2. 1. 02:39
์๋ฒ๊ฐ๋ฐ์บ ํ 4์ฃผ์ฐจ์ธ ์ด๋ฒ์ฃผ๋ถํฐ๋ ๋ณธ๊ฒฉ์ ์ผ๋ก ํ ํ๋ก์ ํธ๋ฅผ ์์ํ๋ค.
- 4์ฃผ์ฐจ ๋์ Milestone : spark ํ์ฉํ์ฌ word count ๋ก ๋จ์ด ๋ถ์ + api ์ค๊ณ
- ์คํํฌ ์ฑ 2.3์ฅ ํ์ต
- ์คํํฌ ์ฑ 12.13์ฅ ํ์ต
- ์คํํฌ ์ฑ 21์ฅ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ ํ์ต
- ์ค์๊ฐ์ผ๋ก ๋ฐ์ดํฐ ๋ฐ์์์ word count ์คํ
์ด ์ค api ์ค๊ณ๋ฅผ ์ ์ธํ๊ณ ๋ ๊ทธ๋๋ ๋ค ์งํํ๋ค !
โญ๏ธ ์ด๋ฒ์ฃผ ์ ๋ฆฌ
Spark - word count
Spark ์์๋ ๊ธฐ๋ณธ์ ์ผ๋ก
- kafka
- flume
- kinesis
- Tcp sockets
- ํ์ผ ์์คํ ex) hdfs , s3 ...
์ผ๋ก๋ถํฐ data source ๋ค์ ๋ฐ์์ฌ ์ ์๋ค.
๋๋ ์์ง kafka ์ฐ๊ฒฐ์ ํ์ง ์์๊ธฐ ๋๋ฌธ์ Tcp socket ํต์ ์ ์ด์ฉํ์ฌ twitter api๋ก ๋ฐ์์จ raw data ๋ฅผ spark ์ ์ฐ๊ฒฐํ๋ค.
์ฌ์ค ์์ผ ํต์ ์ ์ฒ์์ด๋ผ ์ด๊ฒ๋ถํฐ ๋ง์ด ์ฝ์งํ๋ค.... ํํ์์ผ ํต์ ํ๋ฆ
๊ฐ๋จ ์์ผ ์์ฝ
์๋ฒ
-
socket.socket() ์ ํตํด ์์ผ ๊ฐ์ฒด ์์ฑ
-
bind() - ํน์ ๋คํธ์ํฌ ์ธํฐํ์ด์ค: Host ์ ํฌํธ ๋ฒํธ : Port ๋ฅผ ์์ผ๊ณผ ์ฐ๊ฒฐ
- HOST๋ hostname, ip address, ๋น ๋ฌธ์์ด ""์ด ๋ ์ ์๋ค.
- ๋น ๋ฌธ์์ด์ด๋ฉด ๋ชจ๋ ๋คํธ์ํฌ ์ธํฐํ์ด์ค๋ก๋ถํฐ์ ์ ์์ ํ์ฉ
- PORT๋ 1-65535 ์ฌ์ด์ ์ซ์๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
-
listen() - ์๋ฒ๊ฐ ํด๋ผ์ด์ธํธ ์ ์ ํ์ฉ
-
accept() - ๋๊ธฐํ๋ค๊ฐ ํด๋ผ์ด์ธํธ๊ฐ ์ ์ํ๋ฉด ์๋ก์ด ์์ผ ๋ฆฌํด
-
recv() - ํด๋ผ์ด์ธํธ๊ฐ ๋ณด๋ธ ๋ฉ์ธ์ง ์์
-
send() - ํด๋ผ์ด์ธํธ๋ก ๋ฉ์ธ์ง ์ ์ก
ํด๋ผ์ด์ธํธ
- socket.socket() - ํด๋ผ์ด์ธํธ์์๋ ์๋ฒ์ ๋ง์ฐฌ๊ฐ์ง๋ก ์์ผ ๊ฐ์ฒด ์์ฑ
- connect() - ์์ผ๊ณผ ์ฐ๊ฒฐ๋ host, port ๋ก ์ ์
- recv() - ์๋ฒ๋ก ๋ถํฐ ๋ฉ์ธ์ง ์์
- send() - ์๋ฒ๋ก ๋ฉ์ธ์ง ์ก์
์ ์ํ ์ :
- ์์ผ์ผ๋ก ๋ฉ์ธ์ง ์ ์กํ ๋๋ byte ๋ก ์ ์กโ๏ธ๋ฐ๋ผ์ twitter api ๋ก ๋ฐ์ tweet ๋ค encode('utf-8') ๋ก ๋ณด๋ด๊ณ decode()๋ก ์ฝ์ด์ผ ํ๋ค.
- tweet๋ค ์ ๋ถ ๋ค ๋ณด๋ด๋ ค๋ฉด raw_tweet = json.dumps( data ).encode('utf-8') ๋ฅผ ํตํด ๋ณด๋ธ๋ค.from tweepy import OAuthHandler from tweepy import Stream from tweepy.streaming import StreamListener import socket import json consumer_key = '' consumer_secret = '' access_token_key = '' access_token_secret = '' class TweetsListener(StreamListener): def __init__(self, csocket): self.client_socket = csocket def on_data(self, data): try: raw_tweet = json.loads( data ) print(raw_tweet['text']) print( raw_tweet['text'].encode('utf-8') ) self.client_socket.send( raw_tweet['text'].encode('utf-8') ) return True except BaseException as e: print("Error on_data: %s" % str(e)) return True def on_error(self, status): print(status) return True def sendData(c_socket): auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token_key, access_token_secret) twitter_stream = Stream(auth, TweetsListener(c_socket)) # Create a Stream twitter_stream.filter(track=['#BTS']) # Starting a Stream if __name__ == "__main__": s = socket.socket() # Create a socket object host = "127.0.0.1" # Get local machine name port = 5555 # Reserve a port for your service. s.bind((host, port)) # Bind to the port print("Listening on port: %s" % str(port)) s.listen(5) # Now wait for client connection. c, addr = s.accept() # Establish connection with client. print( "Received request from: " + str( addr ) ) sendData( c )
์ด๋ ๊ฒ ํด๋ผ์ด์ธํธ์ ์๋ฒ๋ฅผ ํ๋ฒ์ ์ฒ๋ฆฌํ ์ ์์๋๋ฐ,,
๋๋ ํด๋ผ ์์ผ ํ์ผ๊ณผ ์๋ฒ ์์ผ ํ์ผ์ ๋ฐ๋ก ๋ง๋ค์ด์ ์คํ ํ๋ค๊ณ ํ๋ค....๊ทธ์น๋ง ์ด ๊ณผ์ ์ ํตํด ์์ผ ์ฒ๋ฆฌ๊ฐ ์ด๋ป๊ฒ ๋๋์ง๋ ์ ๋๋ก ์๊ฒ ๋์๋ค !
Spark ์ฐ๊ฒฐ
-
twitter api ๋ฅผ ํตํด ๋ค์ด์จ ํธ์๋ค ํด๋ผ์ด์ธํธ๊ฐ 120.0.0.1:5555 ์์ผ ์๋ฒ๋ก ๋ณด๋
-
StreamingContext, which is the main entry point for all streaming functionality.
Create a local StreamingContext with two execution threads, and batch interval of 5 second.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 5 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 5)
- Using this context, create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 5555).
socket_stream = ssc.socketTextStream("127.0.0.1", 5555)
- lines DStream represents the stream of data that will be received from the data server.
lines = socket_stream.window(20) words = lines.flatMap(lambda line: line.split(" ")) #Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint()
- jupyter notebook ์์ 2.3.4 ๋ฒ ์คํ ํ ํ์ด์ฌ์ผ๋ก ์์ฑํ ์์ผ ํ์ผ ์คํ
- ssc.start() ๋ก word count ์คํ
ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
๊ฒฐ๊ณผ
์๋ฏธ ์๋ ๋ฐ์ดํฐ๊ฐ ๋๋ถ๋ถ,,,,๋ถํ์ํ ๊ฒ๋ค ๋ค ์ ๊ฑฐํ๊ณ count ์๋๋ก ์ถ๋ ฅํ ๊ฒ์ด๋ค.
์ฐธ๊ณ :
- ํธ์ํฐ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ : https://hero0926.tistory.com/5
- Spark Streaming Programming Guide : https://spark.apache.org/docs/latest/streaming-programming-guide.html
'it's me > ๐ฉโ๐ป ํ๋ก์ ํธ ์ ๋ฆฌ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Quadcore Team/Trend] ์ธ๊ธฐ ํธ์ ๋ญํน (0) 2020.03.02 [Quadcore Team/Trend]ํด์ํ๊ทธ ๋ถ์ (0) 2020.02.26 QuadCore Team Project (0) 2020.02.01 Authorization System (0) 2020.01.15 - 4์ฃผ์ฐจ ๋์ Milestone : spark ํ์ฉํ์ฌ word count ๋ก ๋จ์ด ๋ถ์ + api ์ค๊ณ