scala - Access Spark Dataframe field with a Map Type -
my current schema follows
root |-- product: array (nullable = true) | |-- element: string (containsnull = true) |-- items: map (nullable = true) | |-- key: string | |-- value: struct (valuecontainsnull = true) | | |-- _1: string (nullable = true) | | |-- _2: long (nullable = false)
i first check if element in product key in items, check _2 field in value of entry see if smaller value. code follows:
def has(product:seq[string],items:map[string,(string,long,long)]):double={ var count = 0 for(x<- asin) { if(items.contains(x)) { val item = items.get(x) val iitem = item.get val(a,b,c) = iitem if(b<=rank) { count = count + 1 } } } return count.todouble } def hasid = udf((product:seq[string] ,items:map[string,(string,long,long)]) =>has(product,items)/items.size.todouble ) for(rank <- 0 47) { joined =joined.withcolumn("hasid"+rank,hasid(col("product"),col("items"))) }
i getting errors saying
genericrowwithschema cannot cast scala.tuple3
the error appears related
val(a,b,c) = iitem if(b<=rank)
but not able figure out doing wrong.
when passing maptype
or arraytype
column udf's input, tuple values/keys passed org.apache.spark.sql.row
s. you'll have modify udf expect map[string, row]
second argument, , "convert" these row
values tuples using pattern matching:
def hasid = udf((product: seq[string], items: map[string, row]) => has(product, items.mapvalues { case row(s: string, i1: long, i2: long) => (s, i1, i2) }) / items.size.todouble )
note: unrelated question, looks there's other mistakes in code - assume rank
should passed parameter has
? , made more idiomatic removing usages of mutable var
s - altogether, i'm partly guessing need:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.row def has(products: seq[string], items: map[string, (string, long, long)], rank: long): double = products .flatmap(items.get) .map(_._2) .count(_ <= rank) .todouble def hasid(rank: long) = udf((product: seq[string], items: map[string, row]) => { val converteditems = items.mapvalues { case row(s: string, i1: long, i2: long) => (s, i1, i2) } has(product, converteditems, rank) / items.size.todouble }) val result = (0 47).foldleft(joined) { (df, rank) => df.withcolumn("hasid" + rank, hasid(rank)(col("product"), col("items"))) }
Comments
Post a Comment