最近在做 Flink SQL 任务方面的开发,有这样一种情况,用户自己上传自定义的 UDF Jar 包,这里你可以理解为是用户自己定义的函数 Jar包,然后在写的 Flink SQL 任务的时候,需要能够用到 Jar 包中定义的 UDF。最开始想的是将 UDF Jar 包放到 HDFS 上面,每次用的时候,下载下来,直接配置一下 Flink 提交作业时的相关参数就可以了,但这中间也走了一些弯路,这里记录一下,也防止大家再次走坑。Flink 命令行 Jar 参数配置我们使用 Flink On Yarn Per Job 模式运行任务,提交任务使用 flink run 命令来进行提交作业,具体提交命令如下:./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
-m 指定运行模式在 Yarn上运行。有时候我们程序中有需要用到自己定义的 Jar 包任务,查了官方文档,倒是查到了 -C 的使用说明,官方的英文注释如下:Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL.官网说实话对于 -C 这个参数解释其实非常少,这里我也在网上找了一下资料,看了一些其他网上其他仁兄的博客,整体的解释整理如下:1. -C 后面指定的 URL 必须是一个能够在提交作业客户端,JobMaster 和 TaskExecutor 都被访问到的位置,记住要都能访问。2. -C 后面指定的 URL 从 client 端的提交到 JobMaster 的分发到 TaskExecutor 的访问的过程中,不会发生文件移动的动作,在1.4.2和1.5.0和1.6.0的版本中都是这样。3. URL 支持的协议包括 file,ftp,gopher,http,https,jar,mailto,netdoc,亦即java中URL类支持的协议类型。注意:不能放在hdfs上。还有一个参数是 -yt , -yt 这个参数后面配置一个你本地的目录,这个目录存放的是你需要上传到每个 Task ClassPath 下的 jar包。当指定了-yt 值后,客户端会将目录中的jar上传到hdfs中本应用的lib目录中,在tm下载之后,会存在于tm的classpath中。使用方法如下:./bin/flink run -m yarn-cluster -yt 你的本地目录
如果你的实时任务不适用 HDFS 来存储用户的自定义 Jar包时,你可以将其放到共享存储上面,或者放到一个 HTTP 客户端的都能访问下载的地方,注意这个 HTTP 协议是都要能够访问到的地方。我们这边不想弄别的存储,就想使用HDFS,我们自己使用自定义任务实践过,的确发现 -C直接指定HDFS行不通。其实HDFS 上面的文件也支持 HTTP 接口的,但是由于存在主备切换,导致访问 HDFS 的路径就变了,由于很难一直去监控HDFS集群准备切换,所以想了其他的办法。解决方法我们这边整体流程是,首先,用户上传的自定义 Jar 包每次上传我们放到 HDFS上面,每次使用的时候,从HDFS上面下载到本地的一个临时目录,这个目录可以结合具体的用户来取,具体看你怎么使用。然后在使用的时候,我会通过 -yt 指定需要使用 jar包所在的本地目录,由于 -yt指定的目录里面的jar会最终到 Task运行的 classpath中,所以任务运行就可以直接进行加载。但是由于本地 flink run命令在提交作业的时候,会在本地预执行 jar 包里面的代码来形成 JobGraph,所以你还需要指定一下本地的使用到的 Jar包路径,否则会报错。这里使用 -C指定本地使用的 Jar包 ,你可以使用多次 -C 来指定本地使用的 Jar包。具体命令如下:flink run -m yarn-cluster -yt /user/yourname/udfjars/ -C /user/yourname/udfjars/hello_world.jar -c xxxx yourtask.jar
其他其他方面,发现在创建 RemoteStreamEnvironment的时候,可以指定使用到的 Jar文件:所以在创建 StreamExecutionEnvironment,可以指定你所需要的Jar:String [] udfJars = new String[](“hdfs://a.jar”);
RemoteStreamEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( host, port, udfJars)
不过这种方法我没有测试过,你如果可以的话,可以测试测试。我是Lake,如果你觉得我的文章对你有帮助的话,欢迎你点赞转发或者关注我,你的一个小小的鼓励,就是我前进的最大动力。往期文章导读:1. Flink 实时计算-深入理解 Checkpoint 和 Savepoint2. Flink 实时计算 – 维表 Join 的实现3. 实时计算Flink – 1.9版本特性学习和Blink SQL Parser 功能使用
本文出自快速备案,转载时请注明出处及相应链接。