What Is Spark .read .option(Header"", ""True"") .option(""Inferschema"", ""True"")"

Data Engineering with Apache Spark (Office ii)

This is a continuation of my previous post hither. We brainstorm with the exploration stage of Data Engineering. Our goal here is to learn more than about the dataset. Here we will piece of work on a sample dataset and utilize the findings to the balance. Nosotros volition work with April 2018 data.

Exploring the data

Files organized by year and each file correspond to a monthly data for the yellow or green taxi. The filename has the format

          <TAXI_TYPE>_tripdata_<Yr>-<MONTH>.csv        

Schema

Let'due south offset with schema.

          df = spark.read.pick("header",            True).csv(
["/2018/green_tripdata_2018-04.csv", "/2018/yellow_tripdata_2018-04.csv"])
df.printSchema()
--OUTPUT--
root
|-- VendorID: cord (nullable = true)
|-- tpep_pickup_datetime: string (nullable = true)
|-- tpep_dropoff_datetime: cord (nullable = truthful)
|-- passenger_count: string (nullable = true)
|-- trip_distance: string (nullable = truthful)
|-- RatecodeID: string (nullable = true)
|-- store_and_fwd_flag: cord (nullable = true)
|-- PULocationID: string (nullable = true)
|-- DOLocationID: string (nullable = true)
|-- payment_type: string (nullable = true)
|-- fare_amount: string (nullable = true)
|-- extra: string (nullable = truthful)
|-- mta_tax: cord (nullable = true)
|-- tip_amount: string (nullable = true)
|-- tolls_amount: cord (nullable = true)
|-- improvement_surcharge: cord (nullable = true)
|-- total_amount: string (nullable = true)

Now all the columns are of type String, which is inaccurate. To become the right data types, nosotros can set another option 'inferSchema' as 'True'.

          df = spark.read.option("header",            True).pick("inferSchema",            True).csv(
["/2018/green_tripdata_2018-04.csv", "/2018/yellow_tripdata_2018-04.csv"])
df.printSchema()
--OUTPUT--
root
|-- VendorID: integer (nullable = true)
|-- tpep_pickup_datetime: timestamp (nullable = true)
|-- tpep_dropoff_datetime: timestamp (nullable = truthful)
|-- passenger_count: integer (nullable = true)
|-- trip_distance: double (nullable = true)
|-- RatecodeID: integer (nullable = true)
|-- store_and_fwd_flag: string (nullable = true)
|-- PULocationID: integer (nullable = true)
|-- DOLocationID: integer (nullable = truthful)
|-- payment_type: integer (nullable = true)
|-- fare_amount: double (nullable = truthful)
|-- actress: double (nullable = true)
|-- mta_tax: double (nullable = truthful)
|-- tip_amount: double (nullable = true)
|-- tolls_amount: double (nullable = true)
|-- improvement_surcharge: double (nullable = truthful)
|-- total_amount: double (nullable = true)

We have the right data types for all columns. This manner is plush since Spark has to go through the entire dataset once. Instead, we can pass transmission schema or accept a smaller sample file for inferring the schema.

                      from            pyspark.sql.types            import            StructType, StructField, StringType, TimestampType, IntegerType, FloatType          taxi_schema = StructType(
[StructField("VendorID", IntegerType(), False),
StructField("pickup_datetime", TimestampType(), False),
StructField("dropoff_datetime", TimestampType(), Faux),
StructField("store_and_fwd_flag", StringType(), False),
StructField("RatecodeID", IntegerType(), False),
StructField("PULocationID", IntegerType(), False),
StructField("DOLocationID", IntegerType(), False),
StructField("passenger_count", IntegerType(), False),
StructField("trip_distance", FloatType(), Faux),
StructField("fare_amount", FloatType(), Faux),
StructField("actress", FloatType(), False),
StructField("mta_tax", FloatType(), Fake),
StructField("tip_amount", FloatType(), Faux),
StructField("tolls_amount", FloatType(), Imitation),
StructField("ehail_fee", FloatType(), False),
StructField("improvement_surcharge", FloatType(), False),
StructField("total_amount", FloatType(), Fake),
StructField("payment_type", IntegerType(), False),
StructField("trip_type", IntegerType(), Fake)])
df = spark.read.option("header", True).schema(taxi_schema).csv(
["/2018/green_tripdata_2018-04.csv", "/2018/yellow_tripdata_2018-04.csv"])
df.bear witness(five, truncate=Fake)

Here we have created a manual schema and specified it while reading the CSV file.

                      WARN CSVDataSource: CSV header does non conform to the schema.

Header: VendorID, passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, ,

Schema: VendorID, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type Expected: store_and_fwd_flag but constitute: passenger_count
CSV file: file:///2018/yellow_tripdata_2018-04.csv

At present we get the in a higher place warning before press out the results. We built the schema based on light-green taxi information. The columns 'ehail_fee' and 'trip_type' were missing from yellow taxi data, hence the warning.

Every bit a solution, we can read yellow and green data into split up information-frames with different manual schemas. We volition then add the columns 'ehail_fee' and 'trip_type' to the xanthous with defaults '0.0' and 'N/A'. choice-upward and driblet-off time columns are renamed as 'pickup_datetime' and 'dropoff_datetime' and a new column 'taxi_type' is too added to identify xanthous and greenish taxi. Finally, we perform a matrimony on the information-frames.

                      from            pyspark.sql.functions            import            lit          yellow_df = spark.read.option("header",            True) \
.schema(yellow_taxi_schema) \
.csv("/NYC_TaxiRide/2018/yellow_tripdata_2018-04.csv") \
.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")\
.withColumn("taxi_type", lit("xanthous")) \
.withColumn("ehail_fee", lit(0.0)) \
.withColumn("tripType", lit("N/A"))
green_df = spark.read.selection("header", Truthful) \
.schema(green_taxi_schema) \
.csv("/NYC_TaxiRide/2018/green_tripdata_2018-04.csv") \
.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")\
.withColumn("taxi_type", lit("green"))

taxi_df = yellow_df.union(green_df)

print(f"Number of records = {taxi_df.count():,.0f}")
--OUTPUT--
Number of records = x,105,599

'taxi_df' contains data for both yellow and light-green taxies and has over 10.1 1000000 records.

Permit'due south find the min and max of the 'pickup_datetime' and 'dropoff_datetime' columns on the combined data-frame.

                      from            pyspark.sql.functions            import            max, min          taxi_df.select(min("pickup_datetime"),
max("pickup_datetime"),
min("dropoff_datetime"),
max("dropoff_datetime"))
--OUTPUT--
+--------------------+--------------------+
|min(pickup_datetime)|max(pickup_datetime)|
+--------------------+--------------------+
|2001-01-01 00:07:04 |2029-05-06 20:43:fourteen |
+--------------------+--------------------+
+---------------------+---------------------+
|min(dropoff_datetime)|max(dropoff_datetime)|
+---------------------+---------------------+
|2001-01-01 00:07:30 |2029-05-06 21:03:fourteen |
+---------------------+---------------------+

The dataset contains data from the past and future. These invalid records and must be filtered out. Allow's find the records that are not from Apr 2018.

          non_april_record =            taxi_df.where(
"pickup_datetime <'2018-04-01 00:00:00' or "
"pickup_datetime >'2018-04-thirty 23:59:59'"
)
impress(f"Number of non Apr record : {non_april_record.count()}") --OUTPUT--
Number of non April record : 655

We take found 655 records that are not from April 2018. Nosotros demand to filter out such records and store them separately. To filter out these invalid records, nosotros need to match the year and month from the 'pickup_datetime' column with the corresponding filename.

                      from            pyspark.sql.functions import lit, input_file_name          yellow_df = spark.read.pick("header",            True) \
.schema(yellow_taxi_schema) \
.csv("/2018/yellow_tripdata_2018-04.csv") \
.withColumn("file_name", input_file_name()) \
.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime") \
.withColumn("tripType", lit("Due north/A")) \
.withColumn("ehail_fee", lit(0.0))

green_df = spark.read.option("header", True) \
.schema(green_taxi_schema) \
.csv("/2018/green_tripdata_2018-04.csv") \
.withColumn("file_name", input_file_name()) \
.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

yellow_df.select("file_name").show(one, False) --OUTPUT--
+----------------------------------------+
|file_name |
+----------------------------------------+
|file:///2018/yellow_tripdata_2018-04.csv|
+----------------------------------------+

We are adding a new column 'file_name' with the spark's input_file_name function, returning the absolute file proper noun.

                      from            pyspark.sql.functions import split, reverse, format_string                      def            transform_raw_data(df: DataFrame) -> DataFrame:
return df \
.withColumn("split_file_name",
split(reverse(split up(reverse("file_name"), "/")[0]), "_")) \
.withColumn("taxi_type", col("split_file_name")[0]) \
.withColumn("file_year_and_month",
split up(col("split_file_name")[2], "\.")[0]) \
.withColumn("data_year_month",
concat_ws("-", year("pickup_datetime"),
format_string("%02d", calendar month("pickup_datetime")))) \
.drop("file_name", "split_file_name")

Remember, the filename has the format '<TAXI_TYPE>_tripdata_<Twelvemonth>-<MONTH>.csv'. The higher up part extracts the value for 'taxi_type' and 'file_year_and_month' column from the 'file_name' cavalcade. 'file_year_and_month' column will have the data in the format <YEAR>-<Calendar month>. We besides find the twelvemonth and month for each row and stored in the 'data_year_month' column.

          transformed_yellow_df = transform_raw_data(yellow_df)
transformed_green_df = transform_raw_data(green_df)

The transformations are applied to both yellowish and green taxi data.

          valid_taxi_df = transformed_yellow_df.where('data_year_month == file_year_and_month').union(
transformed_green_df.where('data_year_month == file_year_and_month'))
invalid_taxi_df = transformed_yellow_df.where('data_year_month != file_year_and_month').union(
transformed_green_df.where('data_year_month != file_year_and_month'))

print(f"Number of invalid record : {invalid_taxi_df.count()}")
print(f"Number of valid record : {valid_taxi_df.count()}")

--OUTPUT--
Number of invalid record : 655
Number of valid tape : 10104944

Valid and invalid data are separated filtered out into carve up datasets. We consider but valid datasets for farther processing.

In the adjacent mail, we will further explore the data.Data

Code upto this signal can be found in my Github repository here.

<<Previous Post

smithmardst1990.blogspot.com

Source: https://medium.com/hasifsubair/data-engineering-with-apache-spark-part-2-c4f8150ec98d

0 Response to "What Is Spark .read .option(Header"", ""True"") .option(""Inferschema"", ""True"")""

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel