Please wait...
Apache Spark is an open source cluster computing framework. It is a powerful engine for process speed, easy to use, higher level libraries, SQL queries, streaming data, Machine learning, and Graph processing.
Today I’ll show a tutorial of spark streaming from text files that files will generate dynamically in a specific directory and spark has a functionality to read that files when they created.
You need to read following:
- Apache Spark (https://databricks.com/spark/about)
- Pyspark (Python library https://pypi.python.org/pypi/pyspark)
Installations:
- Apache Spark (https://spark.apache.org/downloads.html)
- pyspark in virtualenv using (pip install pysprak)
Spark Python (pyspark) has many inbuilt API(http://spark.apache.org/docs/latest/api/python/index.html) to perform many operations easily with spark
Now I’m going to start coding part for spark streaming in python using pyspark library
Firstly we'll write python code for creating dynamic data files in a folder with any content. I create a file.py in a directory and also have a lorem.txt file that has dummy text data.
According to below code I'm selecting randomly content from lorem.txt file and putting that content in log.txt files. Here I'm creating 30 logs file in log directory that will generate in each 5 second. like: log1.txt, log2.txt ... log30.txt .
file.py:
from random import randint import time """ This is use for create 30 file one by one in each 5 seconds interval. These files will store content dynamically from 'lorem.txt' using below code """ def main(): a = 1 with open('lorem.txt', 'r') as file: # reading content from 'lorem.txt' file lines = file.readlines() while a <= 30: totalline = len(lines) linenumber = randint(0, totalline - 10) with open('log/log{}.txt'.format(a), 'w') as writefile: writefile.write(' '.join(line for line in lines[linenumber:totalline])) print('creating file log{}.txt'.format(a)) a += 1 time.sleep(5) if __name__ == '__main__': main()
Now I'm writing code for the spark that will read content from each file and will calculate word count of each file dummy data.
In below code, I'm using pyspark API for implement wordcount task for each file. Spark will read a directory in each 3 seconds and read file content that generated after execution of the streaming process of spark. It'll not read already existing files in the log directory.
streaming.py:
import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext """ This is use for create streaming of text from txt files that creating dynamically from files.py code. This spark streaming will execute in each 3 seconds and It'll show number of words count from each files dynamically """ def main(): sc = SparkContext(appName="PysparkStreaming") ssc = StreamingContext(sc, 3) #Streaming will execute in each 3 seconds lines = ssc.textFileStream('log/') #'log/ mean directory name counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: a + b) counts.pprint() ssc.start() ssc.awaitTermination() if __name__ == "__main__": main()
Now firstly we will start execution of spark streaming before file generating process. So for this execution spark should be ready in the system.
We'll run below command in terminal for start streaming process.
spark-submit streaming.py
This will start spark streaming process.
Now execute file.py from python that will create log files in log directory and spark streaming will read them.
python file.py
So above screenshot showing when python file.py creating new files in log directory that same time spark also showing the count of words right side in a screenshot.
I'm sharing a video of this tutorial.