Skip to content
Permalink
Browse files
Undersample data to CSV from original dataset
  • Loading branch information
lopesoll committed Feb 28, 2023
1 parent 5d29a26 commit 001e83e5131fea17d8279df9ac946348333f7947
Showing 1 changed file with 167 additions and 0 deletions.
@@ -0,0 +1,167 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "54441b41",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<pyspark.sql.session.SparkSession object at 0x000001A634E423A0>\n"
]
}
],
"source": [
"import findspark\n",
"findspark.init()\n",
"import pyspark\n",
"\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import IntegerType\n",
"\n",
"\n",
"spark=spark = SparkSession \\\n",
" .builder \\\n",
" .appName(\"US_accidents\") \\\n",
" .getOrCreate()\n",
"print(spark)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "f6198dcd",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read.options(header=\"True\", InferSchema=\"True\", nullValue=\"null\" ).csv(\"dataset/US_Accidents_Dec21_updated.csv\")\n",
"#split the date and creates variable duration stores in minutes\n",
"#cast boolean columns to int\n",
"df = df.withColumn(\"dayofweek\", dayofweek(col(\"Start_Time\")).alias(\"dayofweek\"))\\\n",
" .withColumn(\"year\", year(col(\"Start_Time\")).alias(\"year\"))\\\n",
" .withColumn(\"month\", month(col(\"Start_Time\")).alias(\"month\"))\\\n",
" .withColumn(\"dayofmonth\", dayofmonth(col(\"Start_Time\")).alias(\"dayofmonth\"))\\\n",
" .withColumn(\"hour\", hour(col(\"Start_Time\")).alias(\"hour\"))\\\n",
" .withColumn(\"Duration\", (col(\"End_Time\").cast(\"long\") - col(\"Start_Time\").cast(\"long\"))/60)\\\n",
" .withColumn(\"Amenity\",df.Amenity.cast(IntegerType()))\\\n",
" .withColumn(\"Crossing\",df.Crossing.cast(IntegerType()))\\\n",
" .withColumn(\"Give_Way\",df.Give_Way.cast(IntegerType()))\\\n",
" .withColumn(\"Junction\",df.Junction.cast(IntegerType()))\\\n",
" .withColumn(\"No_Exit\",df.No_Exit.cast(IntegerType()))\\\n",
" .withColumn(\"Railway\",df.Railway.cast(IntegerType()))\\\n",
" .withColumn(\"Roundabout\",df.Roundabout.cast(IntegerType()))\\\n",
" .withColumn(\"Station\",df.Station.cast(IntegerType()))\\\n",
" .withColumn(\"Stop\",df.Stop.cast(IntegerType()))\\\n",
" .withColumn(\"Traffic_Calming\",df.Traffic_Calming.cast(IntegerType()))\\\n",
" .withColumn(\"Traffic_Signal\",df.Traffic_Signal.cast(IntegerType()))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "2f991065",
"metadata": {},
"outputs": [],
"source": [
"#dropping columns\n",
"#drop starttime, endtime, Weather_Tiestamp because I no longer need them\n",
"#drop number because it has 1700000 missing values and extreemly low correlation with target variable\n",
"#drop country because it's always the same \n",
"#drop ID and Description because were causing an error with fitting the model on the pipeline both are unnecessary\n",
"#drop end_lng, end_lat, bump, wind_chill concluded from the correlation analysis (high corr with other variables)\n",
"#drop turningloop because has no correlation with other variables\n",
"#drop sunrizes and sunsets because I dont need them, I have hour\n",
"#drop airport code and Timezone because are not important and have a few missing values\n",
"df = df.drop('Start_Time', 'End_Time', 'Weather_Timestamp', 'Number', 'Country', 'Description','ID',\n",
" 'End_Lng', 'End_Lat', 'Bump', 'Turning_Loop', 'Wind_Chill(F)', 'Sunrise_Sunset','Civil_Twilight',\n",
" 'Nautical_Twilight','Astronomical_Twilight','Airport_Code','Timezone')\n",
"\n",
"#instead of dropping precipitation, fill with 0 because most of the missing data in this column\n",
"#is when the weather_condition wasn't rainy, therefor are no measures for precipitation\n",
"df=df.na.fill(value=0,subset=[\"Precipitation(in)\"])\n",
"\n",
"#remaining columns with missing values:::\n",
"#Street:2; City:137; Zipcode:1319; Temperature:69274; Humidity:73092; Pressure:59200; Visibility:70546;\n",
"#Wind_Direction:73775; Wind_Speed:157944; Weather_Condition:70636;\n",
"df = df.na.drop() \n",
"#drop rows with missing values, in total 187383 rows lost"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "e0999444",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+-------+\n",
"|Severity| count|\n",
"+--------+-------+\n",
"| 3| 135914|\n",
"| 4| 112405|\n",
"| 2|2384141|\n",
"| 1| 25499|\n",
"+--------+-------+\n",
"\n"
]
}
],
"source": [
"newdf=df.groupby(\"Severity\").count().show()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "7d0b3752",
"metadata": {},
"outputs": [],
"source": [
"import pyspark.sql.functions as F\n",
"#orders the dataframe by a random column to create garantee that the partitions are not ordered by date, \n",
"#this ensures that various dates (years) are included\n",
"df = df.select(\"*\").orderBy(F.rand())"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "93f7a82e",
"metadata": {},
"outputs": [],
"source": [
"df.repartition(1).write.option(\"header\",True) \\\n",
" .partitionBy(\"Severity\") \\\n",
" .csv(\"dataset/severity\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit 001e83e

Please sign in to comment.