apache spark dataset - Create new Columns from the existing column by splitting it -
format conversion of input spark dataframe dataframe1
+-----+---------------+------------------------------------------------------------------------------------------------------------------+ |table| err_timestamp| err_message | +-----+---------------+------------------------------------------------------------------------------------------------------------------+ | t1|7/26/2017 13:56|[error = ri_violation, field = user_id, value = 'null'] | | t2|7/26/2017 13:58|[error = null_check, field = geo_id, value = 'null'] [error = datatype_check, field = emp_id, value = 'fiwoere8'] | +-----+---------------+------------------------------------------------------------------------------------------------------------------+ to output dataframe2 transpose of entire row , column shown below.
+-----+--------------+---------+--------------+-----------+ |table| err_date|err_field| err_type| err_value| +-----+--------------+---------+--------------+-----------+ | t1|7/26/2017 0:00| user_id| ri_voilation| null| | t2|7/26/2017 0:00| geo_id| null_check| null| | t2|7/26/2017 0:00| emp_id|datatype_check|fdsadfsda68| +-----+--------------+---------+--------------+-----------+
here solution of need, can still minimize steps in cases.
import spark.implicits._ //create dummy data val df = spark.sparkcontext.parallelize(seq( ("t1", "7/26/2017 13:56", "[error = ri_violation, field = user_id, value = null]"), ("t2", "7/26/2017 13:58", "[error = null_check, field = geo_id, value = null] [error = datatype_check, field = emp_id, value = fiwoere8]") )).todf("table", "err_timestamp", "err_message") //create udf split string , create array of string val splitvalue = udf ((value : string ) => { "\\[(.*?)\\]".r.findallmatchin(value) .map(x => x.tostring().replaceall("\\[", "").replaceall("\\]", "")) .toseq }) //update column explode arrays of string val df1 = df.withcolumn("err_message", explode(splitvalue($"err_message"))) df1.show(false) +-----+---------------+--------------------------------------------------------+ |table|err_timestamp |err_message | +-----+---------------+--------------------------------------------------------+ |t1 |7/26/2017 13:56|error = ri_violation, field = user_id, value = null | |t2 |7/26/2017 13:58|error = null_check, field = geo_id, value = null | |t2 |7/26/2017 13:58|error = datatype_check, field = emp_id, value = fiwoere8| +-----+---------------+--------------------------------------------------------+ val splitexpr = split($"err_message", ",") //create 3 new columns splitting in key value df1.withcolumn("err_field", split(splitexpr(1), "=")(1)) .withcolumn("err_type", split(splitexpr(0), "=")(1)) .withcolumn("err_value", split(splitexpr(2), "=")(1)) .drop("err_message") .show(false) output:
+-----+---------------+---------+---------------+---------+ |table|err_timestamp |err_field|err_type |err_value| +-----+---------------+---------+---------------+---------+ |t1 |7/26/2017 13:56| user_id | ri_violation | null | |t2 |7/26/2017 13:58| geo_id | null_check | null | |t2 |7/26/2017 13:58| emp_id | datatype_check| fiwoere8| +-----+---------------+---------+---------------+---------+ hope helps!
Comments
Post a Comment