以java API方式提交spark作業

      網友投稿 1550 2025-04-01

      一、文章背景


      在初期學習Spark的時候是以命令行的方式提交Job到集群環境中運行的,試想當一個作業需要重復去執行的時候且linux腳本不會搞,是不是很尷尬!隨著對Spark的深入了解和查看官網提供的文檔示例,了解到spark提供了以sparkLauncher作為spark job提交的唯一入口,可以用Java API編程的方式提交spark job,可以在IDEA中通過創建sparkLauncher對象,進行參數設置后直接點擊Run 運行包含Job的Main類就能成功提交job進行運行。還可以集成到spring項目中,避免了以拼接cmd命令的方式集成到項目中帶來的安全隱患。

      二、實現樣例

      以java API方式提交spark作業

      2.1 主函數樣例

      public class TestSparkLauncher {

      public static void main(String[] args) throws IOException, InterruptedException {

      // 用于配置運行spark的環境變量

      HashMap env = new HashMap();

      env.put("HADOOP_CONF_DIR", "環境上安裝的hadoop配置文件目錄");

      env.put("JAVA_HOME", "環境上的java home");

      // 用于指定spark運行時使用的配置文件,默認加載的是環境上安裝的spark home下的conf目錄

      env.put("SPARK_CONF_DIR", "自定義的spark配置文件目錄");

      SparkLauncher sparkLauncher = new SparkLauncher(env);

      sparkLauncher.setAppName("spark job 名稱");

      sparkLauncher.setAppResource(" spark jar包在hdfs上的路徑");

      sparkLauncher.setSparkHome("環境上安裝的spark路徑");

      sparkLauncher.setMainClass(" spark jar包的運行主函數名稱");

      sparkLauncher.setDeployMode("spark 運行模式 client 或 cluster 二選一");

      // 提交spark job 獲取process

      Process process = sparkLauncher.launch();

      // client模式下用于輸出運行日志

      InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");

      Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");

      inputThread.start();

      InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");

      Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");

      errorThread.start();

      System.out.println("Waiting for finish...");

      // client模式下用于監控spark job 運行結果

      int exitCode = process.waitFor();

      System.out.println("Finished! Exit code:" + exitCode);

      }

      2.2 記錄日志線程樣例

      public class InputStreamReaderRunnable implements Runnable {

      private BufferedReader reader;

      private String name;

      public InputStreamReaderRunnable(InputStream is, String name) {

      this.reader = new BufferedReader(new InputStreamReader(is));

      this.name = name;

      }

      public void run() {

      System.out.println("InputStream_" + name + ":");

      try {

      String line = reader.readLine();

      while (line != null) {

      System.out.println(line);

      line = reader.readLine();

      }

      reader.close();

      } catch (IOException e) {

      e.printStackTrace();

      }

      }

      }

      三、選用這種方式的優劣

      優勢:通過SparkLanuncher.lanunch()方法獲取一個process進程,然后調用進程的process.waitFor()方法等待線程返回結果,獲取的輸出信息一切都在掌握之中;

      劣勢:使用這種方式需要自己管理運行過程中的輸出信息,比較麻煩。

      四、實現過程中遇到的問題

      4.1.運行時找不到java_home

      在用于配置spark的運行時環境變量的env集合中添加java_home配置或者在sparkLauncher對象內setJavaHome

      4.2.開啟kerberos認證后,job提交運行失敗

      在sparkLauncher對象中setConf中以key-value形式配置認證文件及名稱

      4.3.spark版本兼容較差,

      如果日志文件中出現序列化ID不想等的問題,請查看集成的spring項目中的sparkjar包是否與環境安裝的spark版本一致。

      以上為項目實現過程demo以及部分問題總結,不足之處,請多多指教。

      Java API spark

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:規劃求解在哪弄(規劃求解怎么用)
      下一篇:華為IoT Openlab 華為云市場IoT硬件測試資料下載匯總
      相關文章
      亚洲国产天堂久久久久久| 波多野结衣亚洲一级| 亚洲性色AV日韩在线观看| 亚洲区视频在线观看| 久久综合亚洲色一区二区三区| 亚洲av日韩综合一区在线观看| 亚洲中文字幕无码日韩| 亚洲中文字幕无码不卡电影 | 中文字幕专区在线亚洲| 国产乱辈通伦影片在线播放亚洲| 亚洲精品无码永久在线观看| 亚洲欧洲中文日韩av乱码| 亚洲AV日韩精品一区二区三区| 亚洲AV无码成人精品区大在线| 亚洲欧美在线x视频| 在线精品自拍亚洲第一区| 亚洲av午夜精品一区二区三区| 亚洲av午夜精品一区二区三区 | 亚洲精品欧洲精品| 亚洲国产成人综合| 亚洲精品午夜国产va久久| 亚洲精品无码少妇30P| 亚洲AV无码一区二区三区牲色| 色婷婷亚洲一区二区三区| 亚洲国产精品综合久久一线| 中文字幕亚洲一区二区三区 | 久久精品夜色国产亚洲av| 亚洲国产精品久久久久网站| 亚洲韩国在线一卡二卡| 亚洲伊人久久大香线蕉影院| 一本色道久久综合亚洲精品蜜桃冫| 亚洲午夜精品久久久久久app| 亚洲成av人片在线天堂无| 深夜国产福利99亚洲视频| 国产亚洲av片在线观看18女人| 国产亚洲美日韩AV中文字幕无码成人| 亚洲日韩中文无码久久| 日韩亚洲Av人人夜夜澡人人爽| 亚洲国产精品美女| 亚洲成a人无码亚洲成av无码| 亚洲精品第一国产综合境外资源|