一 简介
目的:
需要定时将大量数据从mysql采集到ElasticSearch,这里使用Logstash作为数据采集器,使用Jdbc input plugin插件来提取数据到logstash
Jdbc input plugin作用
- 将具有JDBC接口的任何数据库中的数据提取到Logstash中
- 可以使用cront语法定期查询或者一次性的将数据加载到Logstash中
- 结果集中的每一行都成为一个事件,结果集中的列被转换为事件中的字段
二 安装
是jdcb集成插件的一个组件,只需安装logstash即可,由于此插件没有和JDBC驱动程序库一起打包,因此需要使用jdbc_driver_library配置选项将所有的jdbc驱动程序传递给插件
环境要求:
- elasticsearch-7.13.2
- kibana-7.13.2-darwin-x86_64
- logstash-7.13.2
- logstash-input-jdbc 插件
- mysql-connector-java-8.0.25 (驱动)
- mysql数据库
三 使用
用法
编写logstash-mysql.conf
input{
jdbc {
# 驱动类名
jdbc_driver_library => "/Users/www/elk/7.13.2/mysql-connector-java-8.0.25/mysql-connector-java-8.0.25.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,shop_trades为数据库名,zeroDateTimeBehaviro防止因时间格式为00-00导致报错
jdbc_connection_string => "jdbc:mysql://192.168.3.53:3355/shop_trades?zeroDateTimeBehaviro=convertToNull"
# 连接数据库用户名
jdbc_user => "cishop"
# 连接数据库密码
jdbc_password => "*****"
# 是否启用分页读取
jdbc_paging_enabled => "true"
jdbc_page_size => "1000"
# 设置监听间隔 各字段含义(由左至右) 分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 是否记录上次执行的结果
record_last_run => "true"
# 使用其它字段追踪,而不是用时间
use_column_value => "true"
tracking_column => "id"
last_run_metadata_path => "/Users/www/elk/7.13.2/logstash-7.13.2/log-record/order_statics.txt"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
# 直接写sql语句用这个
statement => "SELECT * FROM `leiyuan` WHERE id > :sql_last_value"
type => "order_statics" # 如果数据库字段含有type,此数据会被替换掉,不建议这么使用
add_field => {table_name => "order_statics"}
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
# stdout {codec => rubydebug }
stdout {codec => json_lines }
elasticsearch {
hosts => ["http://192.168.3.132:9200"]
index => "mysql-shop_trades-order_statics"
document_id => "%{id}"
}
}
执行单个配置文件
# 校验是否有语法错误
bin/logstash -f config/logstash-mysql.conf --config.test_and_exit // 第一次校验配置是否正确
# 执行
bin/logstash -f config/logstash-mysql.conf
# 启动多个logstash文件:需要配置pipelines.yml
对多配置文件的引用
[root@VM235 config]# less pipelines.yml |grep -v "#"
- pipeline.id: mysql
pipeline.workers: 1
pipeline.batch.size: 125
path.config: "/opt/ci123/elk/logstash-6.7.0/config/config.d/mysql.d/*.conf"
- pipeline.id: application
pipeline.workers: 1
pipeline.batch.size: 125
path.config: "/opt/ci123/elk/logstash-6.7.0/config/config.d/application.conf"
# 校验
1. 开启config/logstash.yml中的 config.test_and_exit: true
2. bin/logstash
# (画外音:pipelines.yml启动不成功,或找不到pipelines.yml时,很有可能是语法错误,yml对语法邀请非常严格,需仔细检查)
# 启动
bin/logstash --config.reload.automatic
显示以下结果说明,数据正在通过定时脚本导入
[2021-07-05T16:43:00,041][INFO ][logstash.inputs.jdbc ][main][leiyuan] (0.001561s) SELECT * FROM `leiyuan` WHERE id > 0
{"@timestamp":"2021-07-05T08:43:00.213Z","dated":"2021-07-05T01:28:30.000Z","@version":"1","type":"jdbc","name":"第一名","id":1}
[2021-07-05T16:44:00,063][INFO ][logstash.inputs.jdbc ][main][leiyuan] (0.002067s) SELECT * FROM `leiyuan` WHERE id > 1
[2021-07-05T16:45:00,066][INFO ][logstash.inputs.jdbc ][main][leiyuan] (0.001203s)
{"@timestamp":"2021-07-05T08:52:00.286Z","dated":"2021-07-03T23:19:18.000Z","@version":"1","type":"jdbc","name":"第二名","id":2}
{"@timestamp":"2021-07-05T08:52:00.299Z","dated":"2021-06-30T23:15:00.000Z","@version":"1","type":"jdbc","name":"第三名","id":3}
[2021-07-05T16:53:00,312][INFO ][logstash.inputs.jdbc ][main][leiyuan] (0.002697s) SELECT * FROM `leiyuan` WHERE id > 3
[2021-07-05T16:53:00,312][INFO ][logstash.inputs.jdbc ][main][leiyuan] (0.002697s) SELECT * FROM `leiyuan` WHERE id > 3
其他文件: