Is Big data giving you nightmares? Try this!
In this article, we are going to learn how to use PySpark for big data machine learning
Since the boom of the internet and accessibility to mobile phones, humans spend most of their time creating data either willingly or unwillingly which is why there is possibly no way to store data in one computer system.
Below is a chart that shows how exponentially the data volume has grown.
Earlier there were spreadsheets to make predictive models but as time went on, there are programming languages that help us make those models.
From spreadsheets to databases to concepts like cloud computing, we have come a long way when it comes to storing data.
Cloud Computing is nothing but the ability to access the data that is on a different server from your local machine. Now let’s discuss what is meant by distributed computation.
Distributed Computer System
A distributed computer system is a system that works as 1 unit but has its systems or nodes elsewhere which can be accessed by the submit machine as shown in the image below. Now, these systems can be close to each other or they can be miles apart as well.
Below is how the distributed computer systems would look like:-
Benefits of Distributed Computer Systems:-
- Scalability — Such systems are easily scalable as you just have to add more nodes aka the systems
- No Downtime — In cases where one node becomes dysfunctional for a while, the other node can support the operations leading to no downtime at all.
- RAM — With the growth of the data volume, it’s humanly impossible to fit the entire data at once into the memory and have it ready for immediate usage. Frameworks like PySpark, Dask, etc. load the data in chunks allowing the user to process enormous amounts of data that doesn’t even fit into the memory.
Let’s now go ahead and build a regression model using spark. To kick things we import the libraries and build a spark session using SparkSession.
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression").getOrCreate()
The spark variable here is nothing but a connection that is allowing us to connect to the data that is stored in different locations and this is exactly what Cloud Computing helps us to achieve.
For reading a CSV, you can use the below syntax. Make sure to set the header argument to true otherwise you might have a hard time with the names of the column and processing the data.
sparkdf = spark.read.csv(file_path, header =True)
We also experimented and tried to figure out how pandas
will perform on 3567694 rows of data. Just as we expected, it took 22.45 seconds while PySpark took only 6.3 seconds. Now the difference might not seem too much however with a bigger size of data, pandas
is most likely to struggle.
We are going to be predicting the flight prices using the dataset available on Kaggle. When we print the variable sparkdf
here is what we would get
If you notice all the columns are in string format hence we will convert some of them to numbers using
numeric_cols = ['elapsedDays','baseFare','totalFare','seatsRemaining','totalTravelDistance',
'segmentsDepartureTimeEpochSeconds','segmentsDurationInSeconds', 'segmentsDistance']
for col in numeric_cols:
spark_df = spark_df.withColumn(col, sparkdf[col].cast(DoubleType()))
You can use sparkdf.show(5)
to show the top 5 rows of the data frame. Let’s go ahead and plot out the target values distribution
|#Making a list of the target column data
data = [i for i in spark_df.select('totalFare').collect()]
#Converting the data to 1D and only getting 10_000 rows for plotting
data = np.array(data).reshape(100_000).tolist()[:10_000]
#Plotting the data
sns.histplot(data)
plt.gca().spines['top'].set_visible(False)
plt.gca().spines['right'].set_visible(False)
plt.title('Total Fare Distribution')
plt.xlabel('Total Fare')
plt.show()
We can see the total fare is peaking towards the 300 mark and have a count value of more than 1200. Let’s now check out the correlation and the count of null values in our data.
#Checking for the null values
for i in spark_df.columns:
print(i,'with null values:', spark_df.filter(spark_df[i].isNull()).count())
#Checking correlation for numeric_cols
for i in spark_df.select(numeric_cols).columns:
print( "Correlation to Total Fare for", i, spark_df.stat.corr('totalFare',i))
There are a bunch of columns that have null values in them however basis the correlation we’d only want to fill the null value of those columns that explain our target value more.
For example, segmentsDurationInSeconds
have 73000 null values however the correlation of this independent variable with the dependent variable is -0.1048 hence we won’t be treating the null values of this column. We are filling the null values with 0 for only one column since this is the column we intend to use to make final predictions.
spark_df.na.fill(value=0,subset=["totalTravelDistance" ])
We would also be needing to convert our categorical feature values to numbers and vectorize the inputs as well. We will go ahead and encode the 3 columns and save the result to a new data frame.
#Encoding first column
indexer = StringIndexer(inputCol='isBasicEconomy', outputCol='isBasicEconomy_indexed')
indexed = indexer.fit(spark_df).transform(spark_df)
#Encoding second column
indexer = StringIndexer(inputCol='isNonStop', outputCol='isNonStop_indexed')
indexed = indexer.fit(indexed).transform(indexed)
#Encoding third column
indexer = StringIndexer(inputCol='isRefundable', outputCol='isRefundable_indexed')
indexed = indexer.fit(indexed).transform(indexed)
PySpark Linear Regression model expects a vector on inputs to be given which is something that can be done using VectorAssembler. In the inputCols
argument, you pass the list of columns in a string format, and in the outputCol
you mention the name of the transformed column and then apply the transform method.
Our final data frame is now looking like this:-
We are now ready to split the data and train our model
from pyspark.ml.regression import LinearRegression
regressor = LinearRegression(featuresCol='Independent Features', labelCol='totalFare')
#Splitting the data
train_data, test_data = finaldata.randomSplit([0.75, 0.25], seed=42)
#Training the model
history = regressor.fit(train_data)
Okay now that our model has been trained let’s make predictions and see how well our model did
pred_results = history.evaluate(test_data)
pred_results.predictions.show()
Let’s now access some of the attributes of the training.
print("Coefficients: " + str(history.coefficients))
print("Intercept: " + str(history.intercept))
trainingSummary = history.summary
print("RMSE : %f" % trainingSummary.rootMeanSquaredError)
print("R-Squared : %f" % trainingSummary.r2)
#Plotting the errorss/residuals
errors = np.array([i for i in trainingSummary.residuals.collect()]).reshape(74998).tolist()
sns.histplot(errors)
plt.gca().spines['top'].set_visible(False)
plt.gca().spines['right'].set_visible(False)
plt.title('Error Distribution', fontsize=18)
plt.xlabel('Errors')
plt.show()
From the plot above, we conclude that the model Errors follow a normal distribution which is one of the assumptions of Linear Regression and is crucial for the Linear Regression model to work. Also, notice how most of the errors are between the range of -20 and 20 and Some of the errors go high up in the range of 40–60.
The plot above states that our Predictions and the targets are quite close which means our model has done a good job. The model can further be improved by hyperparameter tuning the documentation of which can be found here.
Conclusion:-
- A distributed computer system consists of multiple software components that are on multiple computers but run as a single system
- A distributed computer system is great because it is easily scalable and has no downtime.
- PySpark is useful whenever the dataset is larger than the memory of the system.
- The whole dataset using frameworks like PySpark, Dask, etc is not loaded in the memory when we import the dataset using spark data frames, unlike pandas.
If you liked the article and it proved to be helpful to you, I’d appreciate it if you can give the article a clap and follow me for more.