目录[-]

Flume目前为止没有提供官方的S3 Sink。但是有一个可行的选项HDFS Sink。HDFS Sink 可以使用hadoop-aws.jar来完成S3的写入工作。

首先下载hadoop的包,需要注意的是hadoop-aws、Flume、S3三者之间有很大的版本依存关系,我自己尝试了好几个hadoop版本才成功写入S3。成功的版本是hadoop2.7。flume1.8和flume1.9都是可以的。

hadoop所有发行版本可以在这里下载到https://archive.apache.org/dist/hadoop/common/ 。下载tar包解压,将其jar包路径配置到 FLUME_CLASSPATH 。FLUME_CLASSPATH在Flume的conf路径下的flume-env.sh中:

mv flume-env.sh.template flume-env.sh

向flume-env.sh中添加:

FLUME_CLASSPATH="/opt/hadoop-2.7.7/share/hadoop/common/lib/*:/opt/hadoop-2.7.7/share/hadoop/tools/lib/*:/opt/hadoop-2.7.7/share/hadoop/common/hadoop-common-2.7.7.jar"

然后在flume的conf/路径下创建连接s3配置文件core-site.xml:

<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>your_s3_access_key</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>your_s3_secret_key=</value>
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name>
        <value>true</value>
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>s3.ap-south-1.amazonaws.com(change it to you aws region)</value>
    </property>
</configuration>

最后配置Sink:

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = s3a://<bucket.name>/<path>/%Y%m%d   # 路径换成你自己的
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 5000
a1.sinks.k1.hdfs.rollInterval = 28800
a1.sinks.k1.hdfs.fileType = DataStream

然后正常启动flume即可,如果出现AWS相关报错,可以尝试切换hadoop的大版本,需要注意的是hadoop2.7及以上的版本才开始支持S3A。

参考: https://medium.com/inspiredbrilliance/upload-files-to-aws-s3-using-apache-flume-c3464f6a2092 https://stackoverflow.com/questions/26028096/use-flume-to-stream-data-to-s3 https://blog.csdn.net/fct2001140269/article/details/93339222