Spark-部署springboot项目
更新时间 2021-09-18 14:42:57    浏览 0   

TIP

本文主要是介绍 Spark-部署springboot项目 。

# 部署spring boot项目到spark上

# 先说一下开发环境大概情况:

IDE:IntelliJ IDEA

使用spring boot和gradle进行项目的构建

jdk版本 1.8,这里注意一下,开发环境打jar包时的jdk版本要和spark配置里的jdk的版本一致。

# 步骤

1、完成coding后,把当前module打成jar包,具体可参考 这篇文章 (opens new window),自行修改build.gradle打包。

成功后从目录build/libs/${module_name}.jar下取出${module_name}.jar放到部署了spark的机器上。

2、对于spring boot的项目,它的启动类一般是org.springframework.boot.loader.JarLauncher。具体可以打开${module_name}.jar查看META-INF目录下的MANIFEST.MF文件,Main-Class这一项后面的值即为启动class。这里要记下。

3、在部署了spark的机器上,在${module_name}.jar包所在的目录下新建一个start.sh文件。(这个jar包也可以放在HDFS上,我这里是放在了本地)。编辑start.sh文件如下

#!/bin/bash

spark-submit --master yarn \
    --num-executors 1 \
    --executor-memory 2G \
    --driver-memory 2G \
    --executor-cores 2 \
    --total-executor-cores 2 \
    --class org.springframework.boot.loader.JarLauncher \
    ${module_name}.jar \

这里spark-submit涉及的参数比较简单,可根据自己实际情况进行修改。其中--class要指定项目的启动类,在spring boot里,即为上述类名。

4、执行启动文件即可:bash start.sh

# 可能遇到的问题及解决思路

1、打包环境和spark部署环境的jdk版本不一致。

这种情况下spark-submit的时候一般会报

java.lang.UnsupportedClassVersionError unsupported major.minor version 52.0这个error。

可以在打包的时候指定jdk版本,比如在当前module和当前project的build.gradle文件里修改

sourceCompatibility=1.8
targetCompatibility=1.8

或者修改部署spark的机器上的JAVA_HOME。

# 【----------------------------】

# Spark任务提交与SpringBoot项目集成


# 原理

利用官方提供的SparkLauncher java接口来使用java代码提交Spark任务到Spark集群

# 实现

  • 一个SparkPI 的计算demo,并打包成jar
  • 新建SpringBoot项目,项目加入spark-core依赖,否则无法找到SparkLuncher

# 关键配置

 	<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <sala-version>2.11</sala-version>
        <spark-version>2.4.4</spark-version>
        <scope-type>compile</scope-type>
    </properties>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${sala-version}</artifactId>
        <version>${spark-version}</version>
        <scope>${scope-type}</scope>
   </dependency>

# 新建个sparkController,内部代码如下:

package com.ly.spark.oneline.controller;

import org.apache.spark.launcher.SparkLauncher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
public class SparkLauncherController {

    @GetMapping(value = "/sparkPI")
    public String submitTast(){
        HashMap env = new HashMap();
        //hadoop、spark环境变量读取
        env.put("HADOOP_CONF_DIR" ,  System.getenv().getOrDefault("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"));
        env.put("JAVA_HOME", System.getenv().getOrDefault("JAVA_HOME","/usr/local/java/jdk1.8.0_151"));
        //创建SparkLauncher启动器
        SparkLauncher handle = new SparkLauncher(env)
                .setSparkHome("/home/spark/spark-2.4.4-bin-hadoop2.7")
                .setAppResource("/home/sino/spark-model-1.0/lib/spark-model-1.0.jar")
                .setMainClass("com.sinovatio.spark.JobStarter")
                .setMaster("yarn")
                .setDeployMode("client")
                .setConf("spark.yarn.queue","sino")
                .setConf("spark.app.id", "luncher-test")
                .setConf("spark.driver.memory", "1g")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.instances", "2")
                .setConf("spark.executor.cores", "2")
                .setConf("spark.default.parallelism", "12")
                .setConf("spark.driver.allowMultipleContexts","true")
                .setVerbose(true);

        try {
            //任务提交
            Process process = handle.launch();
            //日志跟踪子线程
            InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
            Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
            inputThread.start();

            InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
            Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
            errorThread.start();

            System.out.println("Waiting for finish...");
            int exitCode = process.waitFor();
            System.out.println("Finished! Exit code:" + exitCode);
            return "status: "+exitCode;

        }catch (Exception e){
            e.printStackTrace();
            return "status: "+1;
        }

    }
    @GetMapping(value = "/hello")
    public String hello(){
        return "this is hello page!";
    }

}
    

# 子线程日志监控类:

package com.ly.spark.oneline.controller;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class InputStreamReaderRunnable implements Runnable {
 
    private BufferedReader reader;
 
    private String name;
 
    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }
 
    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = reader.readLine();
            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  • SpringBoot项目打包,与sparkPi项目一起放到hadoop集群里某台机器上
  • 启动该SpringBoot项目,访问该rest接口: http://ip:port/sparkPI 实现任务提交,成功并返回状态码0,否则任务失败

以上参考官网与 :https://blog.csdn.net/u011244682/article/details/79170134

# 参考文章

  • https://blog.csdn.net/u011054678/article/details/79075881
  • https://blog.csdn.net/feiying0canglang/article/details/114173420
更新时间: 2021-09-18 14:42:57
  0
手机看
公众号
讨论
左栏
全屏
上一篇
下一篇
扫一扫 手机阅读
可分享给好友和朋友圈