一、Maxwell介绍
Maxwell是一个守护程序,一个应用程序,能够读取MySQL Binlogs然后解析输出为json。支持数据输出到Kafka中,支持表和库过滤。
→ Reference:http://maxwells-daemon.io
→ Download: https://github.com/zendesk/maxwell/releases/download/v1.10.3/maxwell-1.10.3.tar.gz
→ Source: https://github.com/zendesk/maxwell
二、配置MySQL->Maxwell->Kafka->Flume->HDFS
1. MySQL配置要求
配置要求
1 2 3 4 5 |
[mysqld] server-id=1 log-bin=master binlog_format=row binlog_row_image=FULL |
权限要求
1 2 3 4 |
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'maxwell'; GRANT ALL on maxwell.* to 'maxwell'@'localhost' identified by 'maxwell'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'localhost'; |
2. 安装配置Kafka
确认已安装java运行环境,直接解压Kafka即可使用。
1 |
$ tar xvf kafka_2.10-0.10.2.1.tgz -C /usr/local/elk |
解压后,编辑配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
$ cat /usr/local/elk/kafka_2.10-0.10.2.1/config/server.properties ############################# Server Basics ############################# broker.id=0 delete.topic.enable=true ############################# Socket Server Settings ############################# listeners=PLAINTEXT://0.0.0.0:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# log.flush.interval.messages=10000 log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 |
kafka需要依赖zookeeper,所以需要先启动zookeeper。
1 |
$ nohup /usr/local/elk/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh /usr/local/elk/kafka_2.10-0.10.2.1/config/zookeeper.properties & |
启动Kafka Server:(指定JMX_PORT端口,可以通过Kafka-manager获取统计信息)
1 |
$ nohup /usr/local/elk/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh /usr/local/elk/kafka_2.10-0.10.2.1/config/server.properties & |
3. 安装配置Flume
去Apache官网下载Flume二进制安装包,然后解压即可。
1 2 |
tar xvf apache-flume-1.7.0-bin.tar.gz -C /usr/local/ ln -sv /usr/local/apache-flume-1.7.0-bin/ /usr/local/flume |
设置环境变量
1 2 3 4 |
$ cat /etc/profile.d/flume.sh export FLUME_HOME=/usr/local/flume export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=$PATH:$FLUME_HOME/bin |
查看Flume版本
1 2 3 4 5 |
$ flume-ng version Flume 1.7.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707 Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016 |
创建配置文件和环境变量
1 2 |
$ cp -fr /usr/local/flume/conf/flume-conf.properties.template /usr/local/flume/conf/flume.conf $ cp -fr /usr/local/flume/conf/flume-env.sh.template /usr/local/flume/conf/flume-env.sh |
如果上面的JAVA_HOME设置好了,这里其实不需要设置flume-env.sh,也可以选择配置。
1 2 3 4 5 6 7 8 9 10 11 12 |
$ cat /usr/local/flume/conf/flume-env.sh # Enviroment variables can be set here. export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-3.b12.el7_3.x86_64/jre # Give Flume more memory and pre-allocate, enable remote monitoring via JMX export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" # as it may result in logging sensitive user information or encryption secrets. export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true " # Note that the Flume conf directory is always included in the classpath. #FLUME_CLASSPATH="" |
4. 安装配置Maxwell
Maxwell存储在MySQL服务器本身所需要的所有状态,在schema_database选项指定的数据库中。默认情况下, 数据库被命名为maxwell。
1 |
$ cd /usr/local/maxwell/;./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --port='3306' --producer=stdout |
MySQL创造点数据
1 2 3 4 5 6 7 8 9 10 |
mysql> create database hadoop charset utf8; Query OK, 1 row affected (0.02 sec) mysql> use hadoop; Database changed mysql> create table test(id int,name varchar(10),address varchar(20)); Query OK, 0 rows affected (0.00 sec) mysql> insert into test values(1,'dkey','ShangHai'); Query OK, 1 row affected (0.01 sec) |
然后可以看到Maxwell的输出信息:
1 2 3 4 5 6 |
04:16:48,341 INFO OpenReplicator - starting replication at mysql-bin.000004:6777 04:18:18,654 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000004:136974], lastHeartbeat=1497601097500] after applying "create database hadoop charset utf8" to hadoop, new schema id is 2 04:20:24,430 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000004:255163], lastHeartbeat=1497601224355] after applying "create table test(id int,name varchar(10),address varchar(20))" to hadoop, new schema id is 3 {"database":"hadoop","table":"test","type":"insert","ts":1497601280,"xid":929,"commit":true,"data":{"id":1,"name":"dkey","address":"ShangHai"}} |
5. 数据输出到HDFS
Kafka创建topic
1 |
$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell --partitions 20 --replication-factor 1 |
查看主题
1 |
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper=127.0.0.1:2181 maxwell |
查看主题详情
1 2 3 |
$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper=127.0.0.1:2181 --describe --topic maxwell Topic:maxwell PartitionCount:1 ReplicationFactor:1 Configs: Topic: maxwell Partition: 0 Leader: 0 Replicas: 0 Isr: 0 |
提供一份Flume配置文件(从Kafka收集日志到HDFS)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
$ cat /usr/local/flume/conf/mysql-flume-hdfs.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.zookeeperConnect = 127.0.0.1:2181 a1.sources.r1.topic = maxwell a1.sources.r1.groupId = flume a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.kafka.consumer.timeout.ms = 100 # Describe the sink a1.sinks.k1.type = hdfs #a1.sinks.k1.hdfs.path = /mysql/%{topic}/%y-%m-%d a1.sinks.k1.hdfs.path = hdfs://10.10.0.186:8020/mysql/%{topic}/%y-%m-%d a1.sinks.k1.hdfs.rollInterval = 5 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
启动Flume
1 |
$ nohup flume-ng agent --conf /usr/local/flume/conf --conf-file /usr/local/flume/conf/mysql-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,console & |
如果启动Flume时报错:ERROR – org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows.java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType.
可能是因为你的Flume是独立部署,需要依赖Hadoop HDFS的jar包,解决方法也很简单,就是在Flume主机上解压好Hadoop的二进制安装包,然后输出Hadoop环境变量即可,Flume会根据环境变量自动找到相关的依赖jar包。具体可以看:Hadoop实战:Flume输入日志到HDFS报错解决
另外,当Flume-ng正常运行后,写入HDFS时报错:java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=root, access=WRITE, inode=”/”:hadoop:supergroup:drwxr-xr-x.
这个提示很明显,就是没有写入权限(因为你当前运行flume-ng的用户不是Hadoop用户),解决方案也很简单,就是切换到Hadoop用户执行flume-ng命令即可。或者开启HDFS允许所有用户进行文件写入,默认可能你没有开启。具体可以看:Hadoop实战:Flume输入日志到HDFS报错解决
启动Maxwell
1 2 3 4 5 6 7 |
$ cd /usr/local/maxwell/;./bin/maxwell \ --user='maxwell' \ --password='maxwell' \ --host='127.0.0.1' \ --port='3306' \ --producer=kafka \ --kafka.bootstrap.servers=127.0.0.1:9092 |
三、测试MySQL->Maxwell->Kafka->Flume->HDFS
相关组件现在都已经跑通了,接下来就是测试了,我们在MySQL插入一条数据:
1 2 |
mysql> insert into hadoop.test values(5,'dkey5','Shanghai'); Query OK, 1 row affected (0.00 sec) |
查看Kafka队列
1 2 3 |
$ /usr/local/kafka/bin/kafka-console-consumer.sh -zookeeper=127.0.0.1:2181 --from-beginning --topic maxwell Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. {"database":"hadoop","table":"test","type":"insert","ts":1497607783,"xid":2414,"commit":true,"data":{"id":5,"name":"dkey5","address":"Shanghai"}} |
然后去HDFS查看:
1 2 3 |
[hadoop@hadoop-nn ~]$ hdfs dfs -ls /mysql/maxwell/17-06-19/ Found 1 items -rw-r--r-- 3 hadoop supergroup 148 2017-06-19 03:57 /mysql/maxwell/17-06-19/FlumeData.1497859019506 |
1 2 |
[hadoop@hadoop-nn ~]$ hdfs dfs -cat /mysql/maxwell/17-06-19/FlumeData.1497859019506 {"database":"hadoop","table":"test","type":"insert","ts":1497859014,"xid":372064,"commit":true,"data":{"id":5,"name":"dkey5","address":"Shanghai"}} |
会自动创建相关目录,并生成一个文件。
四、总结
整个MySQL->Maxwell->Flume->HDFS流程算是跑通了,但是此时也仅限于玩一玩而已,包括Flume和Kakfa都得深入学习一下。另外,我们可以看到写入HDFS的数据时json的,可能还需要提取只需要的数据,另外,对于update或delete操作目前还不知道要怎么处理。生产使用难度很大。