hadoop - RDD is not working when try to persist into database in Apache Spark -
i want persist rdd mysql database table. have used map function iterate through rdd , passed each tuple function make persistence. here want paralalize job master , slave nodes.
but doesn't working fine , doesn't call function make database persistance.
if use collect() such courseset.collect().map(m => sendcourseinfo(m))
instead of courseset.map(m => sendcourseinfo(m))
work fine.
i don't want use collect() here.
i have searched in many articles , unable figure out. can please me resolve this.
below code,
..... x.tostring().split(",")(1), x.tostring().split(",")(2), x.tostring().split(",")(3))) courseset.map(m => sendcourseinfo(m)) } def sendcourseinfo(coursedata: (int, string, string, string)): unit = { try { databaseutil.setjdbcconfiguration() val jdbcconnection: java.sql.connection = databaseutil.getconnection val statement = "{call insert_course (?,?,?,?)}" val callablestatement = jdbcconnection.preparecall(statement) callablestatement.setint(1, coursedata._1) callablestatement.setstring(2, coursedata._2) callablestatement.setstring(3, coursedata._3) callablestatement.setstring(4, coursedata._4) callablestatement.executeupdate } catch { case e: sqlexception => println(e.getstacktrace) } }
you're calling map() on rdd transformation , not action. so, execute need call action like,
courseset.foreach(sendcourseinfo)
extra suggestions you're doing,
whatever x
is, you're casting string, splitting , extracting split. you're doing thrice each element in rdd/collection. so, can optimize this,
x.map(_.tostring.split(",")).map(x=>(x(1),x(2),x(3)))
next have persist data in db, mysql in case. you're using java's usual jdbc connectivity that, creating new connection , operation each element. instead, spark 2.x this,
import org.apache.spark.sql.sparksession import java.util.properties ... case class tableschema(col1:int,col2:string,col3:string,col4:string) val props = new properties() def main(args: array[string]): unit = { val ss = sparksession.builder.appname("test").master("local[*]").getorcreate() import ss.implicits._ ... props.setproperty("username", "username") props.setproperty("password", "password") val df = rdd.map(_.tostring.split(",")).map(x=>tableschema(x(0),x(1),x(2),x(3))).todf() df.write.jdbc(s"jdbc:mysql://${mysqlhost}/${mysqldbname}", "tablename", props) }
let me know if helps, cheers.
Comments
Post a Comment