背景接着上文后续,本文主要介绍使用flume采集sftp服务器指定目录数据到kafka集群。原生flume还是不支持sftp-source,老规矩,还是使用第三方插件,地址: https://github.com/keedio/flume-ftp-source如图所示:需要使用额外的依赖jar:commons-net-3.3.jar、jsch-0.1.54.jar。sftp服务器搭建linux系统默认就是一个sftp服务器,可以不做任何配置,仅仅增加一个用户甚至直接使用root用户就行。不过,安全性差,仅用于测试。如果生产环境考虑安全性,需要配置/etc/ssh/sshd_config,重启sshd服务。详见文章: https://www.cnblogs.com/felixzh/p/15384466.htmlflume-ftp-source插件下载源码编译对应flume1.9.0的插件jargit clone https://github.com/keedio/flume-ftp-source.git
mvn clean package –DskipTests –Dflume.version=1.9.0将打包结果jar和上述依赖jar拷贝到flume/lib文件夹废话少说,直接上配置文件[root@felixzh apache-flume-1.9.0-bin]# catsftp-flume-kafka.conf
agent.sources = sftp-source
agent.sinks = k1
agent.channels = ch
agent.sources.sftp-source.type =org.keedio.flume.source.ftp.source.Source
agent.sources.sftp-source.client.source =sftp
agent.sources.sftp-source.name.server =felixzh1
agent.sources.sftp-source.user = root
agent.sources.sftp-source.password =felixzh
agent.sources.sftp-source.port = 22
#sftp服务器需要采集的目录
agent.sources.sftp-source.working.directory= /home/bigdata/testData
#flume节点状态数据目录
agent.sources.sftp-source.folder = /tmp/
agent.sources.sftp-source.file.name =ftp2-status-file.ser
agent.sources.sftp-source.run.discover.delay=5000
agent.sources.sftp-source.flushlines = true
agent.sources.sftp-source.search.recursive= true
agent.sources.sftp-source.processInUse =false
agent.sources.sftp-source.processInUseTimeout= 30
agent.sources.sftp-source.strictHostKeyChecking= no
agent.sources.sftp-source.knownHosts =~/.ssh/known_hosts
agent.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = sftp-flume
agent.sinks.k1.brokerList = felixzh1:9092
agent.sinks.k1.batchsize = 200
agent.sinks.kafkaSink.requiredAcks=1
agent.sinks.k1.serializer.class =kafka.serializer.StringEncoder
agent.sinks.kafkaSink.zookeeperConnect=felixzh1:2181
agent.channels.ch.type = memory
agent.channels.ch.capacity = 10000
agent.channels.ch.transactionCapacity =10000
agent.channels.hbaseC.keep-alive = 20
agent.sources.sftp-source.channels = ch
agent.sinks.k1.channel = ch配置参数列表m : stands for parameter is mandatory for above source
o : optional
x : not available创建kafka测试topic[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-topics.sh –zookeeper felixzh1:2181 –topic sftp-flume –create–partitions 1 –replication-factor 1
Created topic sftp-flume.Sftp服务器准备测试数据启动flume任务[root@felixzh apache-flume-1.9.0-bin]#./bin/flume-ng agent -n agent -c conf/ -f sftp-flume-kafka.conf-Dflume.root.logger=INFO,console消费kafka测试topic[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-console-consumer.sh –bootstrap-server felixzh1:9092 –topicsftp-flume –from-beginning
本文出自快速备案,转载时请注明出处及相应链接。