ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [4์ฃผ์ฐจ] - word count with Spark
    it's me/๐Ÿ‘ฉ‍๐Ÿ’ป ํ”„๋กœ์ ํŠธ ์ •๋ฆฌ 2020. 2. 1. 02:39

     

    ์„œ๋ฒ„๊ฐœ๋ฐœ์บ ํ”„ 4์ฃผ์ฐจ์ธ ์ด๋ฒˆ์ฃผ๋ถ€ํ„ฐ๋Š” ๋ณธ๊ฒฉ์ ์œผ๋กœ ํŒ€ ํ”„๋กœ์ ํŠธ๋ฅผ ์‹œ์ž‘ํ–ˆ๋‹ค.

     

    1. 4์ฃผ์ฐจ ๋‚˜์˜ Milestone : spark ํ™œ์šฉํ•˜์—ฌ word count ๋กœ ๋‹จ์–ด ๋ถ„์„ + api ์„ค๊ณ„
      • ์ŠคํŒŒํฌ ์ฑ… 2.3์žฅ ํ•™์Šต 
      • ์ŠคํŒŒํฌ ์ฑ… 12.13์žฅ ํ•™์Šต 
      • ์ŠคํŒŒํฌ ์ฑ… 21์žฅ ์ŠคํŠธ๋ฆฌ๋ฐ ์ฒ˜๋ฆฌ ํ•™์Šต
      • ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ฐ์ดํ„ฐ ๋ฐ›์•„์™€์„œ word count ์‹คํ–‰

    ์ด ์ค‘ api ์„ค๊ณ„๋ฅผ ์ œ์™ธํ•˜๊ณ ๋Š” ๊ทธ๋ž˜๋„ ๋‹ค ์ง„ํ–‰ํ–ˆ๋‹ค !

     

    โญ๏ธ ์ด๋ฒˆ์ฃผ ์ •๋ฆฌ

    Spark - word count

     

    Spark ์—์„œ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ 

     

    Spark Streaming -  ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๋ณด๋‹ค ์งง์€ ๊ฐ„๊ฒฉ(๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜) ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์ŠคํŒŒํฌ์˜ ์„œ๋ธŒ ๋ชจ๋“ˆ

    • kafka
    • flume
    • kinesis
    • Tcp sockets
    • ํŒŒ์ผ ์‹œ์Šคํ…œ ex) hdfs , s3 ...

     

    ์œผ๋กœ๋ถ€ํ„ฐ data source ๋“ค์„ ๋ฐ›์•„์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

     

     

    ๋‚˜๋Š” ์•„์ง kafka ์—ฐ๊ฒฐ์„ ํ•˜์ง€ ์•Š์•˜๊ธฐ ๋•Œ๋ฌธ์— Tcp socket ํ†ต์‹ ์„ ์ด์šฉํ•˜์—ฌ twitter api๋กœ ๋ฐ›์•„์˜จ raw data ๋ฅผ spark ์™€ ์—ฐ๊ฒฐํ–ˆ๋‹ค.

    ์‚ฌ์‹ค ์†Œ์ผ“ ํ†ต์‹ ์€ ์ฒ˜์Œ์ด๋ผ ์ด๊ฒƒ๋ถ€ํ„ฐ ๋งŽ์ด ์‚ฝ์งˆํ–ˆ๋‹ค.... ํ›„ํ›„

     

     

    ์†Œ์ผ“ ํ†ต์‹  ํ๋ฆ„

     

    ๊ฐ„๋‹จ ์†Œ์ผ“ ์š”์•ฝ

     

    ์„œ๋ฒ„

    1. socket.socket() ์„ ํ†ตํ•ด ์†Œ์ผ“ ๊ฐ์ฒด ์ƒ์„ฑ

    2. bind() - ํŠน์ • ๋„คํŠธ์›Œํฌ ์ธํ„ฐํŽ˜์ด์Šค: Host ์™€ ํฌํŠธ ๋ฒˆํ˜ธ : Port ๋ฅผ ์†Œ์ผ“๊ณผ ์—ฐ๊ฒฐ

      • HOST๋Š” hostname, ip address, ๋นˆ ๋ฌธ์ž์—ด ""์ด ๋  ์ˆ˜ ์žˆ๋‹ค.
      • ๋นˆ ๋ฌธ์ž์—ด์ด๋ฉด ๋ชจ๋“  ๋„คํŠธ์›Œํฌ ์ธํ„ฐํŽ˜์ด์Šค๋กœ๋ถ€ํ„ฐ์˜ ์ ‘์†์„ ํ—ˆ์šฉ
      • PORT๋Š” 1-65535 ์‚ฌ์ด์˜ ์ˆซ์ž๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
    3. listen() - ์„œ๋ฒ„๊ฐ€ ํด๋ผ์ด์–ธํŠธ ์ ‘์† ํ—ˆ์šฉ

    4. accept() - ๋Œ€๊ธฐํ•˜๋‹ค๊ฐ€ ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์ ‘์†ํ•˜๋ฉด ์ƒˆ๋กœ์šด ์†Œ์ผ“ ๋ฆฌํ„ด

    5. recv() - ํด๋ผ์ด์–ธํŠธ๊ฐ€ ๋ณด๋‚ธ ๋ฉ”์„ธ์ง€ ์ˆ˜์‹ 

    6. send() - ํด๋ผ์ด์–ธํŠธ๋กœ ๋ฉ”์„ธ์ง€ ์ „์†ก

    ํด๋ผ์ด์–ธํŠธ

    1. socket.socket() - ํด๋ผ์ด์–ธํŠธ์—์„œ๋„ ์„œ๋ฒ„์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ์†Œ์ผ“ ๊ฐ์ฒด ์ƒ์„ฑ
    2. connect() - ์†Œ์ผ“๊ณผ ์—ฐ๊ฒฐ๋œ host, port ๋กœ ์ ‘์†
    3. recv() - ์„œ๋ฒ„๋กœ ๋ถ€ํ„ฐ ๋ฉ”์„ธ์ง€ ์ˆ˜์‹ 
    4. send() - ์„œ๋ฒ„๋กœ ๋ฉ”์„ธ์ง€ ์†ก์‹ 
    ์œ ์˜ํ•  ์  : 
    - ์†Œ์ผ“์œผ๋กœ ๋ฉ”์„ธ์ง€ ์ „์†กํ•  ๋•Œ๋Š” byte ๋กœ ์ „์†กโ—๏ธ๋”ฐ๋ผ์„œ twitter api ๋กœ ๋ฐ›์€ tweet ๋“ค encode('utf-8') ๋กœ ๋ณด๋‚ด๊ณ  decode()๋กœ ์ฝ์–ด์•ผ ํ•œ๋‹ค.
    - tweet๋“ค ์ „๋ถ€ ๋‹ค ๋ณด๋‚ด๋ ค๋ฉด raw_tweet = json.dumps( data ).encode('utf-8') ๋ฅผ ํ†ตํ•ด ๋ณด๋‚ธ๋‹ค. 

     

    from tweepy import OAuthHandler
        from tweepy import Stream
        from tweepy.streaming import StreamListener
        import socket
        import json
        
        consumer_key = ''
        consumer_secret = ''
        access_token_key = ''
        access_token_secret = ''
        
        
        class TweetsListener(StreamListener):
        
          def __init__(self, csocket):
              self.client_socket = csocket
        
          def on_data(self, data):
              try:
                  raw_tweet = json.loads( data )
                  print(raw_tweet['text'])
                  print( raw_tweet['text'].encode('utf-8') )
                  self.client_socket.send( raw_tweet['text'].encode('utf-8') )
                  return True
              except BaseException as e:
                  print("Error on_data: %s" % str(e))
              return True
        
          def on_error(self, status):
              print(status)
              return True
        
        def sendData(c_socket):
          auth = OAuthHandler(consumer_key, consumer_secret)
          auth.set_access_token(access_token_key, access_token_secret)
        
          twitter_stream = Stream(auth, TweetsListener(c_socket)) # Create a Stream
          twitter_stream.filter(track=['#BTS']) # Starting a Stream
        
        if __name__ == "__main__":
          s = socket.socket()         # Create a socket object
          host = "127.0.0.1"          # Get local machine name
          port = 5555                 # Reserve a port for your service.
          s.bind((host, port))        # Bind to the port
        
          print("Listening on port: %s" % str(port))
        
          s.listen(5)                 # Now wait for client connection.
          c, addr = s.accept()        # Establish connection with client.
        
          print( "Received request from: " + str( addr ) )
        
          sendData( c )

    ์ด๋ ‡๊ฒŒ ํด๋ผ์ด์–ธํŠธ์™€ ์„œ๋ฒ„๋ฅผ ํ•œ๋ฒˆ์— ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์—ˆ๋Š”๋ฐ,,

    ๋‚˜๋Š” ํด๋ผ ์†Œ์ผ“ ํŒŒ์ผ๊ณผ ์„œ๋ฒ„ ์†Œ์ผ“ ํŒŒ์ผ์„ ๋”ฐ๋กœ ๋งŒ๋“ค์–ด์„œ ์‹คํ–‰ ํ–ˆ๋‹ค๊ณ  ํ•œ๋‹ค....๊ทธ์น˜๋งŒ ์ด ๊ณผ์ •์„ ํ†ตํ•ด ์†Œ์ผ“ ์ฒ˜๋ฆฌ๊ฐ€ ์–ด๋–ป๊ฒŒ ๋˜๋Š”์ง€๋Š” ์ œ๋Œ€๋กœ ์•Œ๊ฒŒ ๋˜์—ˆ๋‹ค !

     

     

    Spark ์—ฐ๊ฒฐ

    • twitter api ๋ฅผ ํ†ตํ•ด ๋“ค์–ด์˜จ ํŠธ์œ—๋“ค ํด๋ผ์ด์–ธํŠธ๊ฐ€ 120.0.0.1:5555 ์†Œ์ผ“ ์„œ๋ฒ„๋กœ ๋ณด๋ƒ„

    • StreamingContext, which is the main entry point for all streaming functionality.

      Create a local StreamingContext with two execution threads, and batch interval of 5 second.

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    # Create a local StreamingContext with two working thread and batch interval of 5 second
    sc = SparkContext("local[2]", "NetworkWordCount")
    ssc = StreamingContext(sc, 5)
    • Using this context, create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 5555).
    socket_stream = ssc.socketTextStream("127.0.0.1", 5555)
    • lines DStream represents the stream of data that will be received from the data server.
    lines = socket_stream.window(20)
    words = lines.flatMap(lambda line: line.split(" "))
    #Count each word in each batch
    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    
    # Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.pprint()
    • jupyter notebook ์—์„œ 2.3.4 ๋ฒˆ ์‹คํ–‰ ํ›„ ํŒŒ์ด์ฌ์œผ๋กœ ์ž‘์„ฑํ•œ ์†Œ์ผ“ ํŒŒ์ผ ์‹คํ–‰ 
    • ssc.start() ๋กœ word count ์‹คํ–‰
    ssc.start()             # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

     

    ๊ฒฐ๊ณผ

    ์˜๋ฏธ ์—†๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ๋Œ€๋ถ€๋ถ„,,,,๋ถˆํ•„์š”ํ•œ ๊ฒƒ๋“ค ๋‹ค ์ œ๊ฑฐํ•˜๊ณ  count ์ˆœ๋Œ€๋กœ ์ถœ๋ ฅํ•  ๊ฒƒ์ด๋‹ค.

     


    ์ฐธ๊ณ  :

     

    Spark Streaming - Spark 2.4.4 Documentation

    Spark Streaming Programming Guide Overview Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesi

    spark.apache.org

     

    ๋Œ“๊ธ€

Designed by Tistory.