hadoop - Access hdfs file from spark worker node -
i working on spark application needs access , update object stored file in hdfs. i'm unable figure out how can it?
if i'm creating filesystem hdfs object , using it:
boolean fileexists = hdfs.exists(new org.apache.hadoop.fs.path(filepath)); if (fileexists){ javardd<myobject> modelrdd = sc.objectfile(filepath); } i get:
error executor: exception in task 110.0 in stage 1.0 (tid 112) java.lang.nullpointerexception
this part of code runs @ worker i'm assuming fails because doesn't have access spark context. in such case, how can access hdfs file?
this hdfs file resides @ driver node. can change replace hdfs hive, storing data byte array in hive, hive context access not possible worker node.
adding full code better understanding:
public class myprogram { private static javasparkcontext sc; private static hivecontext hivecontext; private static string objectpersistencedir = "/metadata/objects"; private static org.apache.hadoop.fs.filesystem hdfs; private static string namenodeuri = "hdfs://<mymachineurl>:9000"; // create , maintain cache of objects every run session //private static hashmap<string, myobject> cacheobjects; public static void main(string ... args) { system.out.println("inside constructor: creating spark context , hive context"); system.out.println("starting spark context , sql context"); sc = new javasparkcontext(new sparkconf()); hivecontext = new hivecontext(sc); //cacheobjects= new hashmap<>(); //dataframe loadedobjects= hivecontext.sql("select id, filepath saved_objects name = 'test'"); //list<row> rows = loadedobjects.collectaslist(); //for(row row : rows){ // string key = (string) row.get(0) ; // string value = (string) row.get(1); // javardd<myobject> objectrdd = sc.objectfile(value); // cacheobjects.put(key, objectrdd.first()); //} dataframe partitioneddf = hivecontext.sql('select * mydata'); string partitioncolumnname = "id"; javardd<row> partitionedrecs = partitioneddf.repartition(partitioneddf.col(partitioncolumnname)).javardd(); flatmapfunction<iterator<row>, myobject> flatmapsetup = new flatmapfunction<java.util.iterator<row>, myobject>() { list<myobject> lm_list = new arraylist<>(); myobject object = null; @override public list<myobject> call(java.util.iterator<row> it) throws exception { // every row, create record , update object while (it.hasnext()) { row row = it.next(); if (object == null) { string objectkey = "" + id; //object = cacheobjects.get(objectkey); string modelpath = modelpersistencedir + "/" +'test'+ "/" + id; javardd<myobject> objectrdd = sc.objectfile(objectpath); object = objectrdd.collect().get(0); // object not in cache means not created if(object == null){ if (object == null){ objectdef objectdef = new objectdef('test'); object = new myobject(objectdef); } } } /* / update on object */ string objectkey = "" + id ; cacheobjects.put(objectkey, object); // algorithm step 2.6 : save in hive, add list lm_list.add(object); } // while has next ends return lm_list; } // call -- iterator ends };//); //map partition ends //todo_nidhi put objects in collectedobject hive list<myobject> collectedobject = partitionedrecs.mappartitions(flatmapsetup).collect(); }
Comments
Post a Comment