Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "54441b41",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<pyspark.sql.session.SparkSession object at 0x000001A634E423A0>\n"
]
}
],
"source": [
"import findspark\n",
"findspark.init()\n",
"import pyspark\n",
"\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import IntegerType\n",
"\n",
"\n",
"spark=spark = SparkSession \\\n",
" .builder \\\n",
" .appName(\"US_accidents\") \\\n",
" .getOrCreate()\n",
"print(spark)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "f6198dcd",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read.options(header=\"True\", InferSchema=\"True\", nullValue=\"null\" ).csv(\"dataset/US_Accidents_Dec21_updated.csv\")\n",
"#split the date and creates variable duration stores in minutes\n",
"#cast boolean columns to int\n",
"df = df.withColumn(\"dayofweek\", dayofweek(col(\"Start_Time\")).alias(\"dayofweek\"))\\\n",
" .withColumn(\"year\", year(col(\"Start_Time\")).alias(\"year\"))\\\n",
" .withColumn(\"month\", month(col(\"Start_Time\")).alias(\"month\"))\\\n",
" .withColumn(\"dayofmonth\", dayofmonth(col(\"Start_Time\")).alias(\"dayofmonth\"))\\\n",
" .withColumn(\"hour\", hour(col(\"Start_Time\")).alias(\"hour\"))\\\n",
" .withColumn(\"Duration\", (col(\"End_Time\").cast(\"long\") - col(\"Start_Time\").cast(\"long\"))/60)\\\n",
" .withColumn(\"Amenity\",df.Amenity.cast(IntegerType()))\\\n",
" .withColumn(\"Crossing\",df.Crossing.cast(IntegerType()))\\\n",
" .withColumn(\"Give_Way\",df.Give_Way.cast(IntegerType()))\\\n",
" .withColumn(\"Junction\",df.Junction.cast(IntegerType()))\\\n",
" .withColumn(\"No_Exit\",df.No_Exit.cast(IntegerType()))\\\n",
" .withColumn(\"Railway\",df.Railway.cast(IntegerType()))\\\n",
" .withColumn(\"Roundabout\",df.Roundabout.cast(IntegerType()))\\\n",
" .withColumn(\"Station\",df.Station.cast(IntegerType()))\\\n",
" .withColumn(\"Stop\",df.Stop.cast(IntegerType()))\\\n",
" .withColumn(\"Traffic_Calming\",df.Traffic_Calming.cast(IntegerType()))\\\n",
" .withColumn(\"Traffic_Signal\",df.Traffic_Signal.cast(IntegerType()))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "2f991065",
"metadata": {},
"outputs": [],
"source": [
"#dropping columns\n",
"#drop starttime, endtime, Weather_Tiestamp because I no longer need them\n",
"#drop number because it has 1700000 missing values and extreemly low correlation with target variable\n",
"#drop country because it's always the same \n",
"#drop ID and Description because were causing an error with fitting the model on the pipeline both are unnecessary\n",
"#drop end_lng, end_lat, bump, wind_chill concluded from the correlation analysis (high corr with other variables)\n",
"#drop turningloop because has no correlation with other variables\n",
"#drop sunrizes and sunsets because I dont need them, I have hour\n",
"#drop airport code and Timezone because are not important and have a few missing values\n",
"df = df.drop('Start_Time', 'End_Time', 'Weather_Timestamp', 'Number', 'Country', 'Description','ID',\n",
" 'End_Lng', 'End_Lat', 'Bump', 'Turning_Loop', 'Wind_Chill(F)', 'Sunrise_Sunset','Civil_Twilight',\n",
" 'Nautical_Twilight','Astronomical_Twilight','Airport_Code','Timezone')\n",
"\n",
"#instead of dropping precipitation, fill with 0 because most of the missing data in this column\n",
"#is when the weather_condition wasn't rainy, therefor are no measures for precipitation\n",
"df=df.na.fill(value=0,subset=[\"Precipitation(in)\"])\n",
"\n",
"#remaining columns with missing values:::\n",
"#Street:2; City:137; Zipcode:1319; Temperature:69274; Humidity:73092; Pressure:59200; Visibility:70546;\n",
"#Wind_Direction:73775; Wind_Speed:157944; Weather_Condition:70636;\n",
"df = df.na.drop() \n",
"#drop rows with missing values, in total 187383 rows lost"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "e0999444",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+-------+\n",
"|Severity| count|\n",
"+--------+-------+\n",
"| 3| 135914|\n",
"| 4| 112405|\n",
"| 2|2384141|\n",
"| 1| 25499|\n",
"+--------+-------+\n",
"\n"
]
}
],
"source": [
"newdf=df.groupby(\"Severity\").count().show()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "7d0b3752",
"metadata": {},
"outputs": [],
"source": [
"import pyspark.sql.functions as F\n",
"#orders the dataframe by a random column to create garantee that the partitions are not ordered by date, \n",
"#this ensures that various dates (years) are included\n",
"df = df.select(\"*\").orderBy(F.rand())"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "93f7a82e",
"metadata": {},
"outputs": [],
"source": [
"df.repartition(1).write.option(\"header\",True) \\\n",
" .partitionBy(\"Severity\") \\\n",
" .csv(\"dataset/severity\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}