hadoop - RDD is not working when try to persist into database in Apache Spark -


i want persist rdd mysql database table. have used map function iterate through rdd , passed each tuple function make persistence. here want paralalize job master , slave nodes.

but doesn't working fine , doesn't call function make database persistance.

if use collect() such courseset.collect().map(m => sendcourseinfo(m)) instead of courseset.map(m => sendcourseinfo(m)) work fine.

i don't want use collect() here.

i have searched in many articles , unable figure out. can please me resolve this.

below code,

 .....   x.tostring().split(",")(1),   x.tostring().split(",")(2),   x.tostring().split(",")(3)))   courseset.map(m => sendcourseinfo(m)) }  def sendcourseinfo(coursedata: (int, string, string, string)): unit = {     try {       databaseutil.setjdbcconfiguration()        val jdbcconnection: java.sql.connection = databaseutil.getconnection        val statement = "{call insert_course (?,?,?,?)}"       val callablestatement = jdbcconnection.preparecall(statement)       callablestatement.setint(1, coursedata._1)       callablestatement.setstring(2, coursedata._2)       callablestatement.setstring(3, coursedata._3)       callablestatement.setstring(4, coursedata._4)        callablestatement.executeupdate     } catch {       case e: sqlexception => println(e.getstacktrace)     } } 

you're calling map() on rdd transformation , not action. so, execute need call action like,

courseset.foreach(sendcourseinfo) 

extra suggestions you're doing,

whatever x is, you're casting string, splitting , extracting split. you're doing thrice each element in rdd/collection. so, can optimize this,

x.map(_.tostring.split(",")).map(x=>(x(1),x(2),x(3))) 

next have persist data in db, mysql in case. you're using java's usual jdbc connectivity that, creating new connection , operation each element. instead, spark 2.x this,

import org.apache.spark.sql.sparksession import java.util.properties  ...  case class tableschema(col1:int,col2:string,col3:string,col4:string) val props = new properties()  def main(args: array[string]): unit = {  val ss = sparksession.builder.appname("test").master("local[*]").getorcreate()  import ss.implicits._  ...  props.setproperty("username", "username") props.setproperty("password", "password")      val df = rdd.map(_.tostring.split(",")).map(x=>tableschema(x(0),x(1),x(2),x(3))).todf()  df.write.jdbc(s"jdbc:mysql://${mysqlhost}/${mysqldbname}", "tablename", props)  } 

let me know if helps, cheers.


Comments

Popular posts from this blog

node.js - Node js - Trying to send POST request, but it is not loading javascript content -

javascript - Replicate keyboard event with html button -

javascript - Web audio api 5.1 surround example not working in firefox -