scala - Getting a Unique Count over a Particular Time Frame with Spark DataFrames -


i'm trying figure out if i'm trying accomplish possible in spark. let's have csv if read in dataframe looks so:

+---------------------+-----------+-------+-------------+ |      timestamp      | customer  | user  | application | +---------------------+-----------+-------+-------------+ | 2017-01-01 00:00:01 | customer1 | user1 | app1        | | 2017-01-01 12:00:05 | customer1 | user1 | app1        | | 2017-01-01 14:00:03 | customer1 | user2 | app2        | | 2017-01-01 23:50:50 | customer1 | user1 | app1        | | 2017-01-02 00:00:02 | customer1 | user1 | app1        | +---------------------+-----------+-------+-------------+ 

i'm trying produce dataframe includes count of number of times unique user customer has visited application in last 24 hours. result so:

+---------------------+-----------+-------+-------------+----------------------+ |      timestamp      | customer  | user  | application | uniqueuservisitedapp | +---------------------+-----------+-------+-------------+----------------------+ | 2017-01-01 00:00:01 | customer1 | user1 | app1        |                    0 | | 2017-01-01 12:00:05 | customer1 | user2 | app1        |                    1 | | 2017-01-01 13:00:05 | customer1 | user2 | app1        |                    2 | | 2017-01-01 14:00:03 | customer1 | user1 | app1        |                    2 | | 2017-01-01 23:50:50 | customer1 | user3 | app1        |                    2 | | 2017-01-01 23:50:51 | customer2 | user4 | app2        |                    0 | | 2017-01-02 00:00:02 | customer1 | user1 | app1        |                    3 | +---------------------+-----------+-------+-------------+----------------------+ 

so can tumbling window following code below, that's not quite looking for.

val data = spark.read.csv('path/to/csv')  val tumblingwindow = data     .groupby(col("customer"), col("application"), window(data.col("timestamp"), "24 hours"))     .agg(countdistinct("user")).as("uniqueusersvisitedapp") 

the result this:

+-----------+-------------+-------------------------+-----------------------+ | customer  | application |         window          | uniqueusersvisitedapp | +-----------+-------------+-------------------------+-----------------------+ | customer1 | app1        | [2017-01-01 00:00:00... |                     2 | | customer2 | app2        | [2017-01-01 00:00:00... |                     1 | | customer1 | app1        | [2017-01-02 00:00:00... |                     1 | +-----------+-------------+-------------------------+-----------------------+ 

any appreciated.

i have tried using pyspark window function, creating subpartition each date , apply count on them.not sure how efficient are. here code snippet,

>>> pyspark.sql import functions f >>> pyspark.sql.types import timestamptype  >>> l = [('2017-01-01 00:00:01','customer1','user1','app1'),('2017-01-01 12:00:05','customer1','user1','app1'),('2017-01-01 14:00:03','customer1','user2','app2'),('2017-01-01 23:50:50','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user1','app1'),('2017-01-02 12:00:02','customer1','user1','app1'),('2017-01-03 14:00:02','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user2','app2'),('2017-01-01 16:04:01','customer1','user1','app1'),('2017-01-01 23:59:01','customer1','user1','app1'),('2017-01-01 18:00:01','customer1','user2','app2')] >>> df = spark.createdataframe(l,['timestamp','customer','user','application']) >>> df = df.withcolumn('timestamp',df['timestamp'].cast('timestamp')).withcolumn('date',f.to_date(f.col('timestamp'))) >>> df.show() +-------------------+---------+-----+-----------+----------+ |          timestamp| customer| user|application|      date| +-------------------+---------+-----+-----------+----------+ |2017-01-01 00:00:01|customer1|user1|       app1|2017-01-01| |2017-01-01 12:00:05|customer1|user1|       app1|2017-01-01| |2017-01-01 14:00:03|customer1|user2|       app2|2017-01-01| |2017-01-01 23:50:50|customer1|user1|       app1|2017-01-01| |2017-01-02 00:00:02|customer1|user1|       app1|2017-01-02| |2017-01-02 12:00:02|customer1|user1|       app1|2017-01-02| |2017-01-03 14:00:02|customer1|user1|       app1|2017-01-03| |2017-01-02 00:00:02|customer1|user2|       app2|2017-01-02| |2017-01-01 16:04:01|customer1|user1|       app1|2017-01-01| |2017-01-01 23:59:01|customer1|user1|       app1|2017-01-01| |2017-01-01 18:00:01|customer1|user2|       app2|2017-01-01| +-------------------+---------+-----+-----------+----------+  >>> df.printschema() root  |-- timestamp: timestamp (nullable = true)  |-- customer: string (nullable = true)  |-- user: string (nullable = true)  |-- application: string (nullable = true)  |-- date: date (nullable = true)  >>> w = window.partitionby('customer','user','application','date').orderby('timestamp') >>> diff = f.coalesce(f.datediff("timestamp", f.lag("timestamp", 1).over(w)), f.lit(0)) >>> subpartition = f.count(diff<1).over(w) >>> df.select("*",(subpartition-1).alias('count')).drop('date').orderby('customer','user','application','timestamp').show() +-------------------+---------+-----+-----------+-----+ |          timestamp| customer| user|application|count| +-------------------+---------+-----+-----------+-----+ |2017-01-01 00:00:01|customer1|user1|       app1|    0| |2017-01-01 12:00:05|customer1|user1|       app1|    1| |2017-01-01 16:04:01|customer1|user1|       app1|    2| |2017-01-01 23:50:50|customer1|user1|       app1|    3| |2017-01-01 23:59:01|customer1|user1|       app1|    4| |2017-01-02 00:00:02|customer1|user1|       app1|    0| |2017-01-02 12:00:02|customer1|user1|       app1|    1| |2017-01-03 14:00:02|customer1|user1|       app1|    0| |2017-01-01 14:00:03|customer1|user2|       app2|    0| |2017-01-01 18:00:01|customer1|user2|       app2|    1| |2017-01-02 00:00:02|customer1|user2|       app2|    0| +-------------------+---------+-----+-----------+-----+ 

Comments

Popular posts from this blog

node.js - Node js - Trying to send POST request, but it is not loading javascript content -

javascript - Replicate keyboard event with html button -

javascript - Web audio api 5.1 surround example not working in firefox -