scala - Getting a Unique Count over a Particular Time Frame with Spark DataFrames -
i'm trying figure out if i'm trying accomplish possible in spark. let's have csv if read in dataframe looks so:
+---------------------+-----------+-------+-------------+ | timestamp | customer | user | application | +---------------------+-----------+-------+-------------+ | 2017-01-01 00:00:01 | customer1 | user1 | app1 | | 2017-01-01 12:00:05 | customer1 | user1 | app1 | | 2017-01-01 14:00:03 | customer1 | user2 | app2 | | 2017-01-01 23:50:50 | customer1 | user1 | app1 | | 2017-01-02 00:00:02 | customer1 | user1 | app1 | +---------------------+-----------+-------+-------------+
i'm trying produce dataframe includes count of number of times unique user customer has visited application in last 24 hours. result so:
+---------------------+-----------+-------+-------------+----------------------+ | timestamp | customer | user | application | uniqueuservisitedapp | +---------------------+-----------+-------+-------------+----------------------+ | 2017-01-01 00:00:01 | customer1 | user1 | app1 | 0 | | 2017-01-01 12:00:05 | customer1 | user2 | app1 | 1 | | 2017-01-01 13:00:05 | customer1 | user2 | app1 | 2 | | 2017-01-01 14:00:03 | customer1 | user1 | app1 | 2 | | 2017-01-01 23:50:50 | customer1 | user3 | app1 | 2 | | 2017-01-01 23:50:51 | customer2 | user4 | app2 | 0 | | 2017-01-02 00:00:02 | customer1 | user1 | app1 | 3 | +---------------------+-----------+-------+-------------+----------------------+
so can tumbling window following code below, that's not quite looking for.
val data = spark.read.csv('path/to/csv') val tumblingwindow = data .groupby(col("customer"), col("application"), window(data.col("timestamp"), "24 hours")) .agg(countdistinct("user")).as("uniqueusersvisitedapp")
the result this:
+-----------+-------------+-------------------------+-----------------------+ | customer | application | window | uniqueusersvisitedapp | +-----------+-------------+-------------------------+-----------------------+ | customer1 | app1 | [2017-01-01 00:00:00... | 2 | | customer2 | app2 | [2017-01-01 00:00:00... | 1 | | customer1 | app1 | [2017-01-02 00:00:00... | 1 | +-----------+-------------+-------------------------+-----------------------+
any appreciated.
i have tried using pyspark window function, creating subpartition each date , apply count on them.not sure how efficient are. here code snippet,
>>> pyspark.sql import functions f >>> pyspark.sql.types import timestamptype >>> l = [('2017-01-01 00:00:01','customer1','user1','app1'),('2017-01-01 12:00:05','customer1','user1','app1'),('2017-01-01 14:00:03','customer1','user2','app2'),('2017-01-01 23:50:50','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user1','app1'),('2017-01-02 12:00:02','customer1','user1','app1'),('2017-01-03 14:00:02','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user2','app2'),('2017-01-01 16:04:01','customer1','user1','app1'),('2017-01-01 23:59:01','customer1','user1','app1'),('2017-01-01 18:00:01','customer1','user2','app2')] >>> df = spark.createdataframe(l,['timestamp','customer','user','application']) >>> df = df.withcolumn('timestamp',df['timestamp'].cast('timestamp')).withcolumn('date',f.to_date(f.col('timestamp'))) >>> df.show() +-------------------+---------+-----+-----------+----------+ | timestamp| customer| user|application| date| +-------------------+---------+-----+-----------+----------+ |2017-01-01 00:00:01|customer1|user1| app1|2017-01-01| |2017-01-01 12:00:05|customer1|user1| app1|2017-01-01| |2017-01-01 14:00:03|customer1|user2| app2|2017-01-01| |2017-01-01 23:50:50|customer1|user1| app1|2017-01-01| |2017-01-02 00:00:02|customer1|user1| app1|2017-01-02| |2017-01-02 12:00:02|customer1|user1| app1|2017-01-02| |2017-01-03 14:00:02|customer1|user1| app1|2017-01-03| |2017-01-02 00:00:02|customer1|user2| app2|2017-01-02| |2017-01-01 16:04:01|customer1|user1| app1|2017-01-01| |2017-01-01 23:59:01|customer1|user1| app1|2017-01-01| |2017-01-01 18:00:01|customer1|user2| app2|2017-01-01| +-------------------+---------+-----+-----------+----------+ >>> df.printschema() root |-- timestamp: timestamp (nullable = true) |-- customer: string (nullable = true) |-- user: string (nullable = true) |-- application: string (nullable = true) |-- date: date (nullable = true) >>> w = window.partitionby('customer','user','application','date').orderby('timestamp') >>> diff = f.coalesce(f.datediff("timestamp", f.lag("timestamp", 1).over(w)), f.lit(0)) >>> subpartition = f.count(diff<1).over(w) >>> df.select("*",(subpartition-1).alias('count')).drop('date').orderby('customer','user','application','timestamp').show() +-------------------+---------+-----+-----------+-----+ | timestamp| customer| user|application|count| +-------------------+---------+-----+-----------+-----+ |2017-01-01 00:00:01|customer1|user1| app1| 0| |2017-01-01 12:00:05|customer1|user1| app1| 1| |2017-01-01 16:04:01|customer1|user1| app1| 2| |2017-01-01 23:50:50|customer1|user1| app1| 3| |2017-01-01 23:59:01|customer1|user1| app1| 4| |2017-01-02 00:00:02|customer1|user1| app1| 0| |2017-01-02 12:00:02|customer1|user1| app1| 1| |2017-01-03 14:00:02|customer1|user1| app1| 0| |2017-01-01 14:00:03|customer1|user2| app2| 0| |2017-01-01 18:00:01|customer1|user2| app2| 1| |2017-01-02 00:00:02|customer1|user2| app2| 0| +-------------------+---------+-----+-----------+-----+
Comments
Post a Comment