Creating a Spark Dataframe

Importing Spark libraries

from pyspark.sql import SparkSession

Creating Spark session

spark = SparkSession \
    .builder \
    .appName("Spark_example") \
    .config("spark.sql.caseSensitive","True") \
    .getOrCreate()
spark

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.0.0
Master
local[*]
AppName
Spark_example

Creating a Spark Sample DataFrame

Create sample data

Security event logs

eventLogs = [('Sysmon',1,'Process creation'),
             ('Sysmon',2,'A process changed a file creation time'),
             ('Sysmon',3,'Network connection'),
             ('Sysmon',4,'Sysmon service state changed'),
             ('Sysmon',5,'Process terminated'),
             ('Security',4688,'A process has been created'),
             ('Security',4697,'A service was installed in the system')]
type(eventLogs)
list

Define dataframe schema

from pyspark.sql.types import *
schema = StructType([
   StructField("Channel", StringType(), True),
   StructField("Event_Id", IntegerType(), True),
   StructField("Description", StringType(), True)])

Create Spark datarame

eventLogsDf = spark.createDataFrame(eventLogs,schema)
eventLogsDf.show(truncate = False)
+--------+--------+--------------------------------------+
|Channel |Event_Id|Description                           |
+--------+--------+--------------------------------------+
|Sysmon  |1       |Process creation                      |
|Sysmon  |2       |A process changed a file creation time|
|Sysmon  |3       |Network connection                    |
|Sysmon  |4       |Sysmon service state changed          |
|Sysmon  |5       |Process terminated                    |
|Security|4688    |A process has been created            |
|Security|4697    |A service was installed in the system |
+--------+--------+--------------------------------------+
type(eventLogsDf)
pyspark.sql.dataframe.DataFrame

Exposing Spark DataFrame as a SQL View

eventLogsDf.createOrReplaceTempView('eventLogs')

Testing a SQL-like Query

Filtering on Sysmon event logs

sysmonEvents = spark.sql(
'''
SELECT *
FROM eventLogs
WHERE Channel = 'Sysmon'
''')
sysmonEvents.show(truncate = False)
+-------+--------+--------------------------------------+
|Channel|Event_Id|Description                           |
+-------+--------+--------------------------------------+
|Sysmon |1       |Process creation                      |
|Sysmon |2       |A process changed a file creation time|
|Sysmon |3       |Network connection                    |
|Sysmon |4       |Sysmon service state changed          |
|Sysmon |5       |Process terminated                    |
+-------+--------+--------------------------------------+

Thank you! I hope you enjoyed it!