python - How does Spark handle Timestamp types during Pandas dataframe conversion? -
i have pandas dataframe timestamp columns of type pandas.tslib.timestamp. looked through pyspark source code 'createdataframe'(link source) , seems convert data numpy record array list:
data = [r.tolist() r in data.to_records(index=false)]
however, timestamp types converted in process list of longs:
> df = pd.dataframe(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s')) > df 0 2017-07-25 11:53:29.353923 1 2017-07-25 11:53:30.353923 2 2017-07-25 11:53:31.353923 3 2017-07-25 11:53:32.353923 4 2017-07-25 11:53:33.353923 > df.to_records(index=false).tolist() [(1500983799614193000l,), (1500983800614193000l,), (1500983801614193000l,), (1500983802614193000l,), (1500983803614193000l,)]
now if pass such list rdd, operations(not touching timestamp column) , call
> spark.createdataframe(rdd,schema) // schema mentioning column timestamptype typeerror: timestamptype can not accept object 1465197332112000000l in type <type 'long'> @ org.apache.spark.api.python.pythonrunner$$anon$1.read(pythonrdd.scala:193) @ org.apache.spark.api.python.pythonrunner$$anon$1.<init>(pythonrdd.scala:234) @ org.apache.spark.api.python.pythonrunner.compute(pythonrdd.scala:152) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:63) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87) @ org.apache.spark.scheduler.task.run(task.scala:99) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:322) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745)
what should do(before converting list rdd) preserve datetime type.
edit 1
a few methods i'm aware of involve post dataframe creation processing are:
add timezone information datetime object in pandas. seems unnecessary , can lead errors depending on own timezone.
convert long timestamp using datetime library.
assuming tstampl input: tstamp = datetime(1970, 1, 1) + timedelta(microseconds=tstampl/1000)
- convert datetime string on pandas dataframe side, cast datetime on spark dataframe side.
as explained in suresh's answer below
however i'm looking simpler way take care of processing before dataframe creation itself.
i tried converting timestamp column string type , apply tolist() on pandas series. use list in spark dataframe , convert timestamp there.
>>> df = pd.dataframe(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s')) >>> df 0 0 2017-07-25 21:51:53.963 1 2017-07-25 21:51:54.963 2 2017-07-25 21:51:55.963 3 2017-07-25 21:51:56.963 4 2017-07-25 21:51:57.963 >>> df1 = df[0].apply(lambda x: x.strftime('%y-%m-%d %h:%m:%s')) >>> type(df1) <class 'pandas.core.series.series'> >>> df1.tolist() ['2017-07-25 21:51:53', '2017-07-25 21:51:54', '2017-07-25 21:51:55', '2017-07-25 21:51:56', '2017-07-25 21:51:57'] pyspark.sql.types import stringtype,timestamptype >>> sdf = spark.createdataframe(df1.tolist(),stringtype()) >>> sdf.printschema() root |-- value: string (nullable = true) >>> sdf = sdf.select(sdf['value'].cast('timestamp')) >>> sdf.printschema() root |-- value: timestamp (nullable = true) >>> sdf.show(5,false) +---------------------+ |value | +---------------------+ |2017-07-25 21:51:53.0| |2017-07-25 21:51:54.0| |2017-07-25 21:51:55.0| |2017-07-25 21:51:56.0| |2017-07-25 21:51:57.0| +---------------------+
Comments
Post a Comment