以java API方式提交spark作業
一、文章背景

在初期學習Spark的時候是以命令行的方式提交Job到集群環境中運行的,試想當一個作業需要重復去執行的時候且linux腳本不會搞,是不是很尷尬!隨著對Spark的深入了解和查看官網提供的文檔示例,了解到spark提供了以sparkLauncher作為spark job提交的唯一入口,可以用Java API編程的方式提交spark job,可以在IDEA中通過創建sparkLauncher對象,進行參數設置后直接點擊Run 運行包含Job的Main類就能成功提交job進行運行。還可以集成到spring項目中,避免了以拼接cmd命令的方式集成到項目中帶來的安全隱患。
二、實現樣例
2.1 主函數樣例
public class TestSparkLauncher {
public static void main(String[] args) throws IOException, InterruptedException {
// 用于配置運行spark的環境變量
HashMap env = new HashMap();
env.put("HADOOP_CONF_DIR", "環境上安裝的hadoop配置文件目錄");
env.put("JAVA_HOME", "環境上的java home");
// 用于指定spark運行時使用的配置文件,默認加載的是環境上安裝的spark home下的conf目錄
env.put("SPARK_CONF_DIR", "自定義的spark配置文件目錄");
SparkLauncher sparkLauncher = new SparkLauncher(env);
sparkLauncher.setAppName("spark job 名稱");
sparkLauncher.setAppResource(" spark jar包在hdfs上的路徑");
sparkLauncher.setSparkHome("環境上安裝的spark路徑");
sparkLauncher.setMainClass(" spark jar包的運行主函數名稱");
sparkLauncher.setDeployMode("spark 運行模式 client 或 cluster 二選一");
// 提交spark job 獲取process
Process process = sparkLauncher.launch();
// client模式下用于輸出運行日志
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
// client模式下用于監控spark job 運行結果
int exitCode = process.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
}
2.2 記錄日志線程樣例
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream_" + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
三、選用這種方式的優劣
優勢:通過SparkLanuncher.lanunch()方法獲取一個process進程,然后調用進程的process.waitFor()方法等待線程返回結果,獲取的輸出信息一切都在掌握之中;
劣勢:使用這種方式需要自己管理運行過程中的輸出信息,比較麻煩。
四、實現過程中遇到的問題
4.1.運行時找不到java_home
在用于配置spark的運行時環境變量的env集合中添加java_home配置或者在sparkLauncher對象內setJavaHome
4.2.開啟kerberos認證后,job提交運行失敗
在sparkLauncher對象中setConf中以key-value形式配置認證文件及名稱
4.3.spark版本兼容較差,
如果日志文件中出現序列化ID不想等的問題,請查看集成的spring項目中的sparkjar包是否與環境安裝的spark版本一致。
以上為項目實現過程demo以及部分問題總結,不足之處,請多多指教。
Java API spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。