apache kafka - PySpark on Dataproc stops with SocketTimeoutException -


we trying run spark job on dataproc cluster using pyspark 2.2.0 except spark job stops after seemingly random amount of time passes following error message:

17/07/25 00:52:48 error org.apache.spark.api.python.pythonrdd: error while sending iterator java.net.sockettimeoutexception: accept timed out @ java.net.plainsocketimpl.socketaccept(native method) @ java.net.abstractplainsocketimpl.accept(abstractplainsocketimpl.java:409) @ java.net.serversocket.implaccept(serversocket.java:545 @ java.net.serversocket.accept(serversocket.java:513) @ org.apache.spark.api.python.pythonrdd$$anon$2.run(pythonrdd.scala:702) 

the error take couple minutes happen or take 3 hours. personal experience, spark job runs 30 minutes 1 hour before hitting error.

once spark job hits error, stops. no matter how long wait, outputs nothing. on yarn resourcemanager, application status still labeled "running" , must ctrl+c terminate program. @ point, application labelled "finished".

i run spark job using /path/to/spark/bin/spark-submit --jars /path/to/jar/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar spark_job.py command on master node's console. jar file necessary because spark job streams messages kafka (running on same cluster spark job) , pushes messages same kafka different topic.

i've looked @ other answers on site (primarily this , this) , have been helpful haven't been able track down in log might state caused executors die. far, i've monitored nodes during task through yarn resourcemanager gone through logs located in /var/logs/hadoop-yarn directory in every node. "clue" find in log org.apache.spark.executor.coarsegrainedexecutorbackend: received signal term line written dead executor's logs.

as last ditch effort, attempted increase cluster's memory size in hopes issue go away hasn't. originally, cluster running on 1 master 2 workers cluster 4vcpu, 15gb memory. created new dataproc cluster, time 1 master , 3 workers, workers each having 8vcpu 52gb memory (master has same specs previous).

what know is:
1. where/how can see exception causing executors terminated?
2. issue how spark configured?
3. dataproc image version "preview". possibly cause of error?
, ultimately,
4. how resolve issue? other steps can take?

this spark job needs continuously stream kafka indefinite amount of time error fixed rather prolonging time takes error occur.

here screenshots yarn resourcemanager demonstrate seeing:

cluster metrics cluster metrics

executor summary enter image description here

the screenshots before spark job stopped error.

and spark configuration file located in /path/to/spark/conf/spark-defaults.conf (did not change default setting dataproc):

spark.master yarn spark.submit.deploymode client spark.yarn.jars=local:/usr/lib/spark/jars/* spark.eventlog.enabled true spark.eventlog.dir hdfs://highmem-m/user/spark/eventlog  # dynamic allocation on yarn spark.dynamicallocation.enabled true spark.dynamicallocation.minexecutors 1 spark.executor.instances 10000 spark.dynamicallocation.maxexecutors 10000 spark.shuffle.service.enabled true spark.scheduler.minregisteredresourcesratio 0.0  spark.yarn.historyserver.address highmem-m:18080 spark.history.fs.logdirectory hdfs://highmem-m/user/spark/eventlog  spark.executor.cores 2 spark.executor.memory 4655m spark.yarn.executor.memoryoverhead 465  # overkill spark.yarn.am.memory 4655m spark.yarn.am.memoryoverhead 465  spark.driver.memory 3768m spark.driver.maxresultsize 1884m spark.rpc.message.maxsize 512  # add alpn bigtable spark.driver.extrajavaoptions  spark.executor.extrajavaoptions   # disable parquet metadata caching uri re-encoding logic # not work gcs uris (b/28306549). net effect of # parquet metadata read both driver side , executor side. spark.sql.parquet.cachemetadata=false  # user-supplied properties. #mon jul 24 23:12:12 utc 2017 spark.executor.cores=4 spark.executor.memory=18619m spark.driver.memory=3840m spark.driver.maxresultsize=1920m spark.yarn.am.memory=640m spark.executorenv.pythonhashseed=0 

i'm not quite sure user-supplied properties came from.

edit:
additional information clusters: use zookeeper, kafka, , jupyter initialization action scripts found @ https://github.com/googlecloudplatform/dataproc-initialization-actions in order of zookeeper -> kafka -> jupyter (unfortunately don't have enough reputation post more 2 links @ moment)

edit 2:
@dennis's insightful questions, ran spark job while paying particular attention executors have higher on heap storage memory used. noticed is executors worker #0 have higher storage memory usage compared other executors. stdout file executors of worker #0 empty. these 3 lines repeated many times on in stderr:

17/07/27 16:32:01 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:01 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:01 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:04 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:04 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:04 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:07 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:07 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:07 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:09 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:09 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:09 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:10 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:10 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:10 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:13 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:13 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:13 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:14 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:14 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:14 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:15 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:15 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:15 info kafka.utils.verifiableproperties: property zookeeper.connect overridden  17/07/27 16:32:18 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:18 info kafka.utils.verifiableproperties: property group.id overridden  17/07/27 16:32:18 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 

it seems repeating every 1~3 seconds.

as stdout , stderr other executors other worker nodes, empty.

edit 3:
mentioned @dennis's comments, kept kafka topic spark job consuming replication factor of 1. found i've forgotten add worker #2 zookeeper.connect in kafka config file , forgot give consumer streaming messages kafka in spark group id. i've fixed places (remade topic replication factor of 3) , observed workload focuses on worker #1. following suggestions @dennis, i've run sudo jps after ssh-ing worker #1 , following output:

[removed section save character space; error messages failed call jmap didn't hold useful information]

edit 4:
i'm seeing in worker #1 executors' stdout files:

2017-07-27 22:16:24 full thread dump openjdk 64-bit server vm (25.131-b11 mixed mode): ===truncated=== heap  psyounggen      total 814592k, used 470009k [0x000000063c180000, 0x000000069e600000, 0x00000007c0000000)   eden space 799744k, 56% used [0x000000063c180000,0x0000000657e53598,0x000000066ce80000)   space 14848k, 97% used [0x000000069d780000,0x000000069e5ab1b8,0x000000069e600000)     space 51200k, 0% used [0x0000000698200000,0x0000000698200000,0x000000069b400000)  paroldgen       total 574464k, used 180616k [0x0000000334400000, 0x0000000357500000, 0x000000063c180000)   object space 574464k, 31% used [0x0000000334400000,0x000000033f462240,0x0000000357500000)  metaspace       used 49078k, capacity 49874k, committed 50048k, reserved 1093632k   class space    used 6054k, capacity 6263k, committed 6272k, reserved 1048576k 

and

2017-07-27 22:06:44 full thread dump openjdk 64-bit server vm (25.131-b11 mixed mode): ===truncated=== heap  psyounggen      total 608768k, used 547401k [0x000000063c180000, 0x000000066a280000, 0x00000007c0000000)   eden space 601088k, 89% used [0x000000063c180000,0x000000065d09c498,0x0000000660c80000)   space 7680k, 99% used [0x0000000669b00000,0x000000066a2762c8,0x000000066a280000)     space 36864k, 0% used [0x0000000665a80000,0x0000000665a80000,0x0000000667e80000)  paroldgen       total 535552k, used 199304k [0x0000000334400000, 0x0000000354f00000, 0x000000063c180000)   object space 535552k, 37% used [0x0000000334400000,0x00000003406a2340,0x0000000354f00000)  metaspace       used 48810k, capacity 49554k, committed 49792k, reserved 1093632k   class space    used 6054k, capacity 6263k, committed 6272k, reserved 1048576k 

when error happened, executor worker #2 received signal term , labeled dead. @ time, dead executor.

strangely, spark job picked again after 10 minutes or so. looking @ spark ui interface, executors worker #1 active , rest dead. first time has happened.

edit 5:
again, following @dennis's suggestions (thank you, @dennis!), time ran sudo -u yarn jmap -histo <pid>. top 10 of memory hogging classes coarsegrainedexecutorbackend after 10 minutes:

 num     #instances         #bytes  class name ----------------------------------------------    1:        244824      358007944  [b    2:        194242      221184584  [i    3:       2062554      163729952  [c    4:        746240       35435976  [ljava.lang.object;    5:           738       24194592  [lorg.apache.spark.unsafe.memory.memoryblock;    6:        975513       23412312  java.lang.string    7:        129645       13483080  java.io.objectstreamclass    8:        451343       10832232  java.lang.stringbuilder    9:         38880       10572504  [z   10:        120807        8698104  java.lang.reflect.field 

also, i've encountered new type of error caused executor die. produced failed tasks highlighted in spark ui , found in executor's stderr:

17/07/28 00:44:03 error org.apache.spark.executor.executor: exception in task 0.0 in stage 6821.0 (tid 2585) java.lang.assertionerror: assertion failed     @ scala.predef$.assert(predef.scala:156)     @ org.apache.spark.storage.blockinfo.checkinvariants(blockinfomanager.scala:84)     @ org.apache.spark.storage.blockinfo.readercount_$eq(blockinfomanager.scala:66)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:367)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:366)     @ scala.option.foreach(option.scala:257)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:366)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:361)     @ scala.collection.iterator$class.foreach(iterator.scala:893)     @ scala.collection.abstractiterator.foreach(iterator.scala:1336)     @ org.apache.spark.storage.blockinfomanager.releasealllocksfortask(blockinfomanager.scala:361)     @ org.apache.spark.storage.blockmanager.releasealllocksfortask(blockmanager.scala:736)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:342)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:748) 17/07/28 00:44:03 error org.apache.spark.executor.executor: exception in task 0.1 in stage 6821.0 (tid 2586) java.lang.assertionerror: assertion failed     @ scala.predef$.assert(predef.scala:156)     @ org.apache.spark.storage.blockinfo.checkinvariants(blockinfomanager.scala:84)     @ org.apache.spark.storage.blockinfo.readercount_$eq(blockinfomanager.scala:66)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:367)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:366)     @ scala.option.foreach(option.scala:257)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:366)     @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:361)     @ scala.collection.iterator$class.foreach(iterator.scala:893)     @ scala.collection.abstractiterator.foreach(iterator.scala:1336)     @ org.apache.spark.storage.blockinfomanager.releasealllocksfortask(blockinfomanager.scala:361)     @ org.apache.spark.storage.blockmanager.releasealllocksfortask(blockmanager.scala:736)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:342)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:748) 17/07/28 00:44:03 error org.apache.spark.util.utils: uncaught exception in thread stdout writer /opt/conda/bin/python java.lang.assertionerror: assertion failed: block rdd_5480_0 not locked reading     @ scala.predef$.assert(predef.scala:170)     @ org.apache.spark.storage.blockinfomanager.unlock(blockinfomanager.scala:299)     @ org.apache.spark.storage.blockmanager.releaselock(blockmanager.scala:720)     @ org.apache.spark.storage.blockmanager$$anonfun$1.apply$mcv$sp(blockmanager.scala:516)     @ org.apache.spark.util.completioniterator$$anon$1.completion(completioniterator.scala:46)     @ org.apache.spark.util.completioniterator.hasnext(completioniterator.scala:35)     @ org.apache.spark.interruptibleiterator.hasnext(interruptibleiterator.scala:37)     @ scala.collection.iterator$class.foreach(iterator.scala:893)     @ org.apache.spark.interruptibleiterator.foreach(interruptibleiterator.scala:28)     @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:509)     @ org.apache.spark.api.python.pythonrunner$writerthread$$anonfun$run$3.apply(pythonrdd.scala:333)     @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1954)     @ org.apache.spark.api.python.pythonrunner$writerthread.run(pythonrdd.scala:269) 17/07/28 00:44:03 error org.apache.spark.util.sparkuncaughtexceptionhandler: uncaught exception in thread thread[stdout writer /opt/conda/bin/python,5,main] java.lang.assertionerror: assertion failed: block rdd_5480_0 not locked reading     @ scala.predef$.assert(predef.scala:170)     @ org.apache.spark.storage.blockinfomanager.unlock(blockinfomanager.scala:299)     @ org.apache.spark.storage.blockmanager.releaselock(blockmanager.scala:720)     @ org.apache.spark.storage.blockmanager$$anonfun$1.apply$mcv$sp(blockmanager.scala:516)     @ org.apache.spark.util.completioniterator$$anon$1.completion(completioniterator.scala:46)     @ org.apache.spark.util.completioniterator.hasnext(completioniterator.scala:35)     @ org.apache.spark.interruptibleiterator.hasnext(interruptibleiterator.scala:37)     @ scala.collection.iterator$class.foreach(iterator.scala:893)     @ org.apache.spark.interruptibleiterator.foreach(interruptibleiterator.scala:28)     @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:509)     @ org.apache.spark.api.python.pythonrunner$writerthread$$anonfun$run$3.apply(pythonrdd.scala:333)     @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1954)     @ org.apache.spark.api.python.pythonrunner$writerthread.run(pythonrdd.scala:269) 

edit 6:
time, took jmap after 40 minutes of running:

 num     #instances         #bytes  class name ----------------------------------------------    1:         23667      391136256  [b    2:         25937       15932728  [i    3:        159174       12750016  [c    4:           334       10949856  [lorg.apache.spark.unsafe.memory.memoryblock;    5:         78437        5473992  [ljava.lang.object;    6:        125322        3007728  java.lang.string    7:         40931        2947032  java.lang.reflect.field    8:         63431        2029792  com.esotericsoftware.kryo.registration    9:         20897        1337408  com.esotericsoftware.kryo.serializers.unsafecachefields$unsafeobjectfield   10:         20323         975504  java.util.hashmap 

these results of ps ux:

  user       pid %cpu %mem    vsz   rss tty      stat start   time command yarn       601  0.8  0.9 3008024 528812 ?      sl   16:12   1:17 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -dproc_nodema yarn      6086  6.3  0.0  96764 24340 ?        r    18:37   0:02 /opt/conda/bin/python -m pyspark.daemon yarn      8036  8.2  0.0  96296 24136 ?        s    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon yarn      8173  9.4  0.0  97108 24444 ?        s    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon yarn      8240  9.0  0.0  96984 24576 ?        s    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon yarn      8329  7.6  0.0  96948 24720 ?        s    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon yarn      8420  8.5  0.0  96240 23788 ?        r    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon yarn      8487  6.0  0.0  96864 24308 ?        s    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon yarn      8554  0.0  0.0  96292 23724 ?        s    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon yarn      8564  0.0  0.0  19100  2448 pts/0    r+   18:37   0:00 ps ux yarn     31705  0.0  0.0  13260  2756 ?        s    17:56   0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app yarn     31707  0.0  0.0  13272  2876 ?        ss   17:56   0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java  yarn     31713  0.4  0.7 2419520 399072 ?      sl   17:56   0:11 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -xmx6 yarn     31771  0.0  0.0  13260  2740 ?        s    17:56   0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app yarn     31774  0.0  0.0  13284  2800 ?        ss   17:56   0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java  yarn     31780 11.1  1.4 21759016 752132 ?     sl   17:56   4:31 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -xmx1 yarn     31883  0.1  0.0  96292 27308 ?        s    17:56   0:02 /opt/conda/bin/python -m pyspark.daemon 

the pid of coarsegrainedexecutorbackend 31780 in case.

edit 7:
increasing heartbeatinterval in spark settings did not change anything, makes sense in hindsight.

i created short bash script reads kafka console consumer 5 seconds , writes messages text file. text file uploaded hadoop spark streams from. tested whether timeout related kafka through method.

  • streaming hadoop , outputting kafka spark caused sockettimeout
  • streaming kafka directly , not outputting kafka spark caused sockettimeout
  • streaming hadoop , not outputting kafka spark caused sockettimeout

so moved on assumption kafka had nothing timeout.

we installed stackdriver monitoring see memory usage timeout occurred. nothing interesting metrics; memory usage looked relatively stable throughout (hovering around 10~15% @ busiest nodes).

we guessed perhaps communication between worker nodes causing issue. right now, our amount of data traffic low 1 worker can handle workload relative ease.

running spark job on single node cluster while streaming kafka brokers different cluster seemed have stopped sockettimeout... except assertionerror documented above occurs.

per @dennis's suggestion, created new cluster (also single node) without jupyter initialization script time means spark runs on python v2.7.9 (without anaconda). first run, spark encountered sockettimeoutexception in 15 seconds. second time ran on 2 hours, failing same assertionerror. i'm starting wonder if problem spark's internals. third run ran 40 minutes , ran sockettimeoutexception.


Comments

Popular posts from this blog

node.js - Node js - Trying to send POST request, but it is not loading javascript content -

javascript - Replicate keyboard event with html button -

javascript - Web audio api 5.1 surround example not working in firefox -