Data Analysis with Spark.SQL: Transforming

Creating SQL view from Mordor APT29 dataset

Create Spark session

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark_Data_Analysis") \
    .config("spark.sql.caseSensitive","True") \
    .getOrCreate()

Expose the dataframe as a SQL view

apt29Json = '../datasets/apt29_evals_day1_manual_2020-05-01225525.json'

apt29Df = spark.read.json(apt29Json)

apt29Df.createOrReplaceTempView('apt29')

Transforming data with Spark Built-In functions

Convert ProcessId (String) to Integer format

IntegerProcessId = spark.sql(
'''
SELECT ProcessId, cast(ProcessId as Integer) as IntegerProcessId
FROM apt29
WHERE lower(Channel) LIKE '%sysmon%'
    AND EventID = 1
''')

print('This dataframe has {} records!!'.format(IntegerProcessId.count()))
IntegerProcessId.printSchema()
IntegerProcessId.show(n = 5, truncate = False)
This dataframe has 447 records!!
root
 |-- ProcessId: string (nullable = true)
 |-- IntegerProcessId: integer (nullable = true)

+---------+----------------+
|ProcessId|IntegerProcessId|
+---------+----------------+
|8524     |8524            |
|5156     |5156            |
|2772     |2772            |
|5944     |5944            |
|4152     |4152            |
+---------+----------------+
only showing top 5 rows

Convert ProcessId (Integer) to Hexadecimal format

HexadecimalProcessId = spark.sql(
'''
SELECT ProcessId, hex(cast(ProcessId as Integer)) as HexadecimalProcessId
FROM apt29
WHERE lower(Channel) LIKE '%sysmon%'
    AND EventID = 1
''')

print('This dataframe has {} records!!'.format(HexadecimalProcessId.count()))
HexadecimalProcessId.printSchema()
HexadecimalProcessId.show(n = 5, truncate = False)
This dataframe has 447 records!!
root
 |-- ProcessId: string (nullable = true)
 |-- HexadecimalProcessId: string (nullable = true)

+---------+--------------------+
|ProcessId|HexadecimalProcessId|
+---------+--------------------+
|8524     |214C                |
|5156     |1424                |
|2772     |AD4                 |
|5944     |1738                |
|4152     |1038                |
+---------+--------------------+
only showing top 5 rows

Transforming data with Spark User Defined Functions (UDF)

Calculate the number of characters of Commad Line values in Sysmon 1 (Process Creation) events

  • Define function

def LenCommand(value):
    Length = len(value)
    return Length
  • Import pyspark.sql.types

from pyspark.sql.types import *
  • Register UDF

spark.udf.register("LengthCommand", LenCommand, IntegerType())
<function __main__.LenCommand(value)>
  • Use UDF

commandLine = spark.sql(
'''
SELECT CommandLine, LengthCommand(CommandLine) as LengthCommandLine
FROM apt29
WHERE Channel LIKE '%Sysmon%'
    AND EventID = 1
''')

print('This dataframe has {} records!!'.format(commandLine.count()))
commandLine.printSchema()
commandLine.show(n = 5, truncate = 80)
This dataframe has 447 records!!
root
 |-- CommandLine: string (nullable = true)
 |-- LengthCommandLine: integer (nullable = true)

+--------------------------------------------------------------------------------+-----------------+
|                                                                     CommandLine|LengthCommandLine|
+--------------------------------------------------------------------------------+-----------------+
|                                     "C:\ProgramData\victim\‮cod.3aka3.scr" /S|               43|
|\\?\C:\windows\system32\conhost.exe --headless --width 80 --height 25 --signa...|               99|
|                                                   "C:\windows\system32\cmd.exe"|               29|
|                                                                      powershell|               10|
|"C:\windows\system32\SearchProtocolHost.exe" Global\UsGthrFltPipeMssGthrPipe6...|              308|
+--------------------------------------------------------------------------------+-----------------+
only showing top 5 rows

Thank you! I hope you enjoyed it!