scala - Access Spark Dataframe field with a Map Type -


my current schema follows

root |-- product: array (nullable = true) |    |-- element: string (containsnull = true) |-- items: map (nullable = true) |    |-- key: string |    |-- value: struct (valuecontainsnull = true) |    |    |-- _1: string (nullable = true) |    |    |-- _2: long (nullable = false) 

i first check if element in product key in items, check _2 field in value of entry see if smaller value. code follows:

def has(product:seq[string],items:map[string,(string,long,long)]):double={ var count = 0 for(x<- asin) {     if(items.contains(x))     {         val item = items.get(x)         val iitem = item.get         val(a,b,c) = iitem         if(b<=rank)         {             count = count + 1         }     } } return count.todouble }  def hasid = udf((product:seq[string] ,items:map[string,(string,long,long)])  =>has(product,items)/items.size.todouble )  for(rank <- 0 47) {     joined =joined.withcolumn("hasid"+rank,hasid(col("product"),col("items"))) } 

i getting errors saying

genericrowwithschema cannot cast scala.tuple3 

the error appears related

val(a,b,c) = iitem     if(b<=rank) 

but not able figure out doing wrong.

when passing maptype or arraytype column udf's input, tuple values/keys passed org.apache.spark.sql.rows. you'll have modify udf expect map[string, row] second argument, , "convert" these row values tuples using pattern matching:

def hasid = udf((product: seq[string], items: map[string, row]) =>   has(product, items.mapvalues {     case row(s: string, i1: long, i2: long) => (s, i1, i2)   }) / items.size.todouble ) 

note: unrelated question, looks there's other mistakes in code - assume rank should passed parameter has? , made more idiomatic removing usages of mutable vars - altogether, i'm partly guessing need:

import org.apache.spark.sql.functions._ import org.apache.spark.sql.row  def has(products: seq[string], items: map[string, (string, long, long)], rank: long): double = products   .flatmap(items.get)   .map(_._2)   .count(_ <= rank)   .todouble  def hasid(rank: long) = udf((product: seq[string], items: map[string, row]) => {   val converteditems = items.mapvalues {     case row(s: string, i1: long, i2: long) => (s, i1, i2)   }   has(product, converteditems, rank) / items.size.todouble })  val result = (0 47).foldleft(joined) {   (df, rank) => df.withcolumn("hasid" + rank, hasid(rank)(col("product"), col("items"))) } 

Comments

Popular posts from this blog

node.js - Node js - Trying to send POST request, but it is not loading javascript content -

javascript - Replicate keyboard event with html button -

javascript - Web audio api 5.1 surround example not working in firefox -