apache kafka - PySpark on Dataproc stops with SocketTimeoutException -
we trying run spark job on dataproc cluster using pyspark 2.2.0 except spark job stops after seemingly random amount of time passes following error message:
17/07/25 00:52:48 error org.apache.spark.api.python.pythonrdd: error while sending iterator java.net.sockettimeoutexception: accept timed out @ java.net.plainsocketimpl.socketaccept(native method) @ java.net.abstractplainsocketimpl.accept(abstractplainsocketimpl.java:409) @ java.net.serversocket.implaccept(serversocket.java:545 @ java.net.serversocket.accept(serversocket.java:513) @ org.apache.spark.api.python.pythonrdd$$anon$2.run(pythonrdd.scala:702)
the error take couple minutes happen or take 3 hours. personal experience, spark job runs 30 minutes 1 hour before hitting error.
once spark job hits error, stops. no matter how long wait, outputs nothing. on yarn resourcemanager, application status still labeled "running" , must ctrl+c terminate program. @ point, application labelled "finished".
i run spark job using /path/to/spark/bin/spark-submit --jars /path/to/jar/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar spark_job.py
command on master node's console. jar file necessary because spark job streams messages kafka (running on same cluster spark job) , pushes messages same kafka different topic.
i've looked @ other answers on site (primarily this , this) , have been helpful haven't been able track down in log might state caused executors die. far, i've monitored nodes during task through yarn resourcemanager gone through logs located in /var/logs/hadoop-yarn
directory in every node. "clue" find in log org.apache.spark.executor.coarsegrainedexecutorbackend: received signal term
line written dead executor's logs.
as last ditch effort, attempted increase cluster's memory size in hopes issue go away hasn't. originally, cluster running on 1 master 2 workers cluster 4vcpu, 15gb memory. created new dataproc cluster, time 1 master , 3 workers, workers each having 8vcpu 52gb memory (master has same specs previous).
what know is:
1. where/how can see exception causing executors terminated?
2. issue how spark configured?
3. dataproc image version "preview". possibly cause of error?
, ultimately,
4. how resolve issue? other steps can take?
this spark job needs continuously stream kafka indefinite amount of time error fixed rather prolonging time takes error occur.
here screenshots yarn resourcemanager demonstrate seeing:
the screenshots before spark job stopped error.
and spark configuration file located in /path/to/spark/conf/spark-defaults.conf
(did not change default setting dataproc):
spark.master yarn spark.submit.deploymode client spark.yarn.jars=local:/usr/lib/spark/jars/* spark.eventlog.enabled true spark.eventlog.dir hdfs://highmem-m/user/spark/eventlog # dynamic allocation on yarn spark.dynamicallocation.enabled true spark.dynamicallocation.minexecutors 1 spark.executor.instances 10000 spark.dynamicallocation.maxexecutors 10000 spark.shuffle.service.enabled true spark.scheduler.minregisteredresourcesratio 0.0 spark.yarn.historyserver.address highmem-m:18080 spark.history.fs.logdirectory hdfs://highmem-m/user/spark/eventlog spark.executor.cores 2 spark.executor.memory 4655m spark.yarn.executor.memoryoverhead 465 # overkill spark.yarn.am.memory 4655m spark.yarn.am.memoryoverhead 465 spark.driver.memory 3768m spark.driver.maxresultsize 1884m spark.rpc.message.maxsize 512 # add alpn bigtable spark.driver.extrajavaoptions spark.executor.extrajavaoptions # disable parquet metadata caching uri re-encoding logic # not work gcs uris (b/28306549). net effect of # parquet metadata read both driver side , executor side. spark.sql.parquet.cachemetadata=false # user-supplied properties. #mon jul 24 23:12:12 utc 2017 spark.executor.cores=4 spark.executor.memory=18619m spark.driver.memory=3840m spark.driver.maxresultsize=1920m spark.yarn.am.memory=640m spark.executorenv.pythonhashseed=0
i'm not quite sure user-supplied properties
came from.
edit:
additional information clusters: use zookeeper, kafka, , jupyter initialization action scripts found @ https://github.com/googlecloudplatform/dataproc-initialization-actions
in order of zookeeper -> kafka -> jupyter (unfortunately don't have enough reputation post more 2 links @ moment)
edit 2:
@dennis's insightful questions, ran spark job while paying particular attention executors have higher on heap storage memory used. noticed is executors worker #0 have higher storage memory usage compared other executors. stdout file executors of worker #0 empty. these 3 lines repeated many times on in stderr:
17/07/27 16:32:01 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:01 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:01 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:04 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:04 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:04 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:07 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:07 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:07 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:09 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:09 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:09 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:10 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:10 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:10 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:13 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:13 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:13 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:14 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:14 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:14 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:15 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:15 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:15 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/07/27 16:32:18 info kafka.utils.verifiableproperties: verifying properties 17/07/27 16:32:18 info kafka.utils.verifiableproperties: property group.id overridden 17/07/27 16:32:18 info kafka.utils.verifiableproperties: property zookeeper.connect overridden
it seems repeating every 1~3 seconds.
as stdout , stderr other executors other worker nodes, empty.
edit 3:
mentioned @dennis's comments, kept kafka topic spark job consuming replication factor of 1. found i've forgotten add worker #2 zookeeper.connect in kafka config file , forgot give consumer streaming messages kafka in spark group id. i've fixed places (remade topic replication factor of 3) , observed workload focuses on worker #1. following suggestions @dennis, i've run sudo jps
after ssh-ing worker #1 , following output:
[removed section save character space; error messages failed call jmap didn't hold useful information]
edit 4:
i'm seeing in worker #1 executors' stdout files:
2017-07-27 22:16:24 full thread dump openjdk 64-bit server vm (25.131-b11 mixed mode): ===truncated=== heap psyounggen total 814592k, used 470009k [0x000000063c180000, 0x000000069e600000, 0x00000007c0000000) eden space 799744k, 56% used [0x000000063c180000,0x0000000657e53598,0x000000066ce80000) space 14848k, 97% used [0x000000069d780000,0x000000069e5ab1b8,0x000000069e600000) space 51200k, 0% used [0x0000000698200000,0x0000000698200000,0x000000069b400000) paroldgen total 574464k, used 180616k [0x0000000334400000, 0x0000000357500000, 0x000000063c180000) object space 574464k, 31% used [0x0000000334400000,0x000000033f462240,0x0000000357500000) metaspace used 49078k, capacity 49874k, committed 50048k, reserved 1093632k class space used 6054k, capacity 6263k, committed 6272k, reserved 1048576k
and
2017-07-27 22:06:44 full thread dump openjdk 64-bit server vm (25.131-b11 mixed mode): ===truncated=== heap psyounggen total 608768k, used 547401k [0x000000063c180000, 0x000000066a280000, 0x00000007c0000000) eden space 601088k, 89% used [0x000000063c180000,0x000000065d09c498,0x0000000660c80000) space 7680k, 99% used [0x0000000669b00000,0x000000066a2762c8,0x000000066a280000) space 36864k, 0% used [0x0000000665a80000,0x0000000665a80000,0x0000000667e80000) paroldgen total 535552k, used 199304k [0x0000000334400000, 0x0000000354f00000, 0x000000063c180000) object space 535552k, 37% used [0x0000000334400000,0x00000003406a2340,0x0000000354f00000) metaspace used 48810k, capacity 49554k, committed 49792k, reserved 1093632k class space used 6054k, capacity 6263k, committed 6272k, reserved 1048576k
when error happened, executor worker #2 received signal term
, labeled dead. @ time, dead executor.
strangely, spark job picked again after 10 minutes or so. looking @ spark ui interface, executors worker #1 active , rest dead. first time has happened.
edit 5:
again, following @dennis's suggestions (thank you, @dennis!), time ran sudo -u yarn jmap -histo <pid>
. top 10 of memory hogging classes coarsegrainedexecutorbackend
after 10 minutes:
num #instances #bytes class name ---------------------------------------------- 1: 244824 358007944 [b 2: 194242 221184584 [i 3: 2062554 163729952 [c 4: 746240 35435976 [ljava.lang.object; 5: 738 24194592 [lorg.apache.spark.unsafe.memory.memoryblock; 6: 975513 23412312 java.lang.string 7: 129645 13483080 java.io.objectstreamclass 8: 451343 10832232 java.lang.stringbuilder 9: 38880 10572504 [z 10: 120807 8698104 java.lang.reflect.field
also, i've encountered new type of error caused executor die. produced failed tasks highlighted in spark ui , found in executor's stderr:
17/07/28 00:44:03 error org.apache.spark.executor.executor: exception in task 0.0 in stage 6821.0 (tid 2585) java.lang.assertionerror: assertion failed @ scala.predef$.assert(predef.scala:156) @ org.apache.spark.storage.blockinfo.checkinvariants(blockinfomanager.scala:84) @ org.apache.spark.storage.blockinfo.readercount_$eq(blockinfomanager.scala:66) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:367) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:366) @ scala.option.foreach(option.scala:257) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:366) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:361) @ scala.collection.iterator$class.foreach(iterator.scala:893) @ scala.collection.abstractiterator.foreach(iterator.scala:1336) @ org.apache.spark.storage.blockinfomanager.releasealllocksfortask(blockinfomanager.scala:361) @ org.apache.spark.storage.blockmanager.releasealllocksfortask(blockmanager.scala:736) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:342) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:748) 17/07/28 00:44:03 error org.apache.spark.executor.executor: exception in task 0.1 in stage 6821.0 (tid 2586) java.lang.assertionerror: assertion failed @ scala.predef$.assert(predef.scala:156) @ org.apache.spark.storage.blockinfo.checkinvariants(blockinfomanager.scala:84) @ org.apache.spark.storage.blockinfo.readercount_$eq(blockinfomanager.scala:66) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:367) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2$$anonfun$apply$2.apply(blockinfomanager.scala:366) @ scala.option.foreach(option.scala:257) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:366) @ org.apache.spark.storage.blockinfomanager$$anonfun$releasealllocksfortask$2.apply(blockinfomanager.scala:361) @ scala.collection.iterator$class.foreach(iterator.scala:893) @ scala.collection.abstractiterator.foreach(iterator.scala:1336) @ org.apache.spark.storage.blockinfomanager.releasealllocksfortask(blockinfomanager.scala:361) @ org.apache.spark.storage.blockmanager.releasealllocksfortask(blockmanager.scala:736) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:342) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:748) 17/07/28 00:44:03 error org.apache.spark.util.utils: uncaught exception in thread stdout writer /opt/conda/bin/python java.lang.assertionerror: assertion failed: block rdd_5480_0 not locked reading @ scala.predef$.assert(predef.scala:170) @ org.apache.spark.storage.blockinfomanager.unlock(blockinfomanager.scala:299) @ org.apache.spark.storage.blockmanager.releaselock(blockmanager.scala:720) @ org.apache.spark.storage.blockmanager$$anonfun$1.apply$mcv$sp(blockmanager.scala:516) @ org.apache.spark.util.completioniterator$$anon$1.completion(completioniterator.scala:46) @ org.apache.spark.util.completioniterator.hasnext(completioniterator.scala:35) @ org.apache.spark.interruptibleiterator.hasnext(interruptibleiterator.scala:37) @ scala.collection.iterator$class.foreach(iterator.scala:893) @ org.apache.spark.interruptibleiterator.foreach(interruptibleiterator.scala:28) @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:509) @ org.apache.spark.api.python.pythonrunner$writerthread$$anonfun$run$3.apply(pythonrdd.scala:333) @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1954) @ org.apache.spark.api.python.pythonrunner$writerthread.run(pythonrdd.scala:269) 17/07/28 00:44:03 error org.apache.spark.util.sparkuncaughtexceptionhandler: uncaught exception in thread thread[stdout writer /opt/conda/bin/python,5,main] java.lang.assertionerror: assertion failed: block rdd_5480_0 not locked reading @ scala.predef$.assert(predef.scala:170) @ org.apache.spark.storage.blockinfomanager.unlock(blockinfomanager.scala:299) @ org.apache.spark.storage.blockmanager.releaselock(blockmanager.scala:720) @ org.apache.spark.storage.blockmanager$$anonfun$1.apply$mcv$sp(blockmanager.scala:516) @ org.apache.spark.util.completioniterator$$anon$1.completion(completioniterator.scala:46) @ org.apache.spark.util.completioniterator.hasnext(completioniterator.scala:35) @ org.apache.spark.interruptibleiterator.hasnext(interruptibleiterator.scala:37) @ scala.collection.iterator$class.foreach(iterator.scala:893) @ org.apache.spark.interruptibleiterator.foreach(interruptibleiterator.scala:28) @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:509) @ org.apache.spark.api.python.pythonrunner$writerthread$$anonfun$run$3.apply(pythonrdd.scala:333) @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1954) @ org.apache.spark.api.python.pythonrunner$writerthread.run(pythonrdd.scala:269)
edit 6:
time, took jmap
after 40 minutes of running:
num #instances #bytes class name ---------------------------------------------- 1: 23667 391136256 [b 2: 25937 15932728 [i 3: 159174 12750016 [c 4: 334 10949856 [lorg.apache.spark.unsafe.memory.memoryblock; 5: 78437 5473992 [ljava.lang.object; 6: 125322 3007728 java.lang.string 7: 40931 2947032 java.lang.reflect.field 8: 63431 2029792 com.esotericsoftware.kryo.registration 9: 20897 1337408 com.esotericsoftware.kryo.serializers.unsafecachefields$unsafeobjectfield 10: 20323 975504 java.util.hashmap
these results of ps ux
:
user pid %cpu %mem vsz rss tty stat start time command yarn 601 0.8 0.9 3008024 528812 ? sl 16:12 1:17 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -dproc_nodema yarn 6086 6.3 0.0 96764 24340 ? r 18:37 0:02 /opt/conda/bin/python -m pyspark.daemon yarn 8036 8.2 0.0 96296 24136 ? s 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon yarn 8173 9.4 0.0 97108 24444 ? s 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon yarn 8240 9.0 0.0 96984 24576 ? s 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon yarn 8329 7.6 0.0 96948 24720 ? s 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon yarn 8420 8.5 0.0 96240 23788 ? r 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon yarn 8487 6.0 0.0 96864 24308 ? s 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon yarn 8554 0.0 0.0 96292 23724 ? s 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon yarn 8564 0.0 0.0 19100 2448 pts/0 r+ 18:37 0:00 ps ux yarn 31705 0.0 0.0 13260 2756 ? s 17:56 0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app yarn 31707 0.0 0.0 13272 2876 ? ss 17:56 0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java yarn 31713 0.4 0.7 2419520 399072 ? sl 17:56 0:11 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -xmx6 yarn 31771 0.0 0.0 13260 2740 ? s 17:56 0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app yarn 31774 0.0 0.0 13284 2800 ? ss 17:56 0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java yarn 31780 11.1 1.4 21759016 752132 ? sl 17:56 4:31 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -xmx1 yarn 31883 0.1 0.0 96292 27308 ? s 17:56 0:02 /opt/conda/bin/python -m pyspark.daemon
the pid
of coarsegrainedexecutorbackend
31780
in case.
edit 7:
increasing heartbeatinterval
in spark settings did not change anything, makes sense in hindsight.
i created short bash script reads kafka console consumer 5 seconds , writes messages text file. text file uploaded hadoop spark streams from. tested whether timeout related kafka through method.
- streaming hadoop , outputting kafka spark caused sockettimeout
- streaming kafka directly , not outputting kafka spark caused sockettimeout
- streaming hadoop , not outputting kafka spark caused sockettimeout
so moved on assumption kafka had nothing timeout.
we installed stackdriver monitoring see memory usage timeout occurred. nothing interesting metrics; memory usage looked relatively stable throughout (hovering around 10~15% @ busiest nodes).
we guessed perhaps communication between worker nodes causing issue. right now, our amount of data traffic low 1 worker can handle workload relative ease.
running spark job on single node cluster while streaming kafka brokers different cluster seemed have stopped sockettimeout... except assertionerror
documented above occurs.
per @dennis's suggestion, created new cluster (also single node) without jupyter initialization script time means spark runs on python v2.7.9 (without anaconda). first run, spark encountered sockettimeoutexception
in 15 seconds. second time ran on 2 hours, failing same assertionerror
. i'm starting wonder if problem spark's internals. third run ran 40 minutes , ran sockettimeoutexception
.
Comments
Post a Comment