[Logstash] Jdbc input plugin


Jdbc input 插件

一 简介

目的:

需要定时将大量数据从mysql采集到ElasticSearch,这里使用Logstash作为数据采集器,使用Jdbc input plugin插件来提取数据到logstash

Jdbc input plugin作用

  • 将具有JDBC接口的任何数据库中的数据提取到Logstash中
  • 可以使用cront语法定期查询或者一次性的将数据加载到Logstash中
  • 结果集中的每一行都成为一个事件,结果集中的列被转换为事件中的字段

 

二 安装

是jdcb集成插件的一个组件,只需安装logstash即可,由于此插件没有和JDBC驱动程序库一起打包,因此需要使用jdbc_driver_library配置选项将所有的jdbc驱动程序传递给插件

环境要求:

  1. elasticsearch-7.13.2
  2. kibana-7.13.2-darwin-x86_64
  3. logstash-7.13.2
  4. logstash-input-jdbc 插件
  5. mysql-connector-java-8.0.25 (驱动)
  6. mysql数据库

 

三 使用

用法

编写logstash-mysql.conf

input{
     jdbc {
        # 驱动类名
        jdbc_driver_library => "/Users/www/elk/7.13.2/mysql-connector-java-8.0.25/mysql-connector-java-8.0.25.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver" 
        jdbc_default_timezone => "Asia/Shanghai"
         # mysql 数据库链接,shop_trades为数据库名,zeroDateTimeBehaviro防止因时间格式为00-00导致报错
         jdbc_connection_string => "jdbc:mysql://192.168.3.53:3355/shop_trades?zeroDateTimeBehaviro=convertToNull"
         # 连接数据库用户名
         jdbc_user => "cishop"
         # 连接数据库密码
         jdbc_password => "*****"
         # 是否启用分页读取
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000" 
         # 设置监听间隔  各字段含义(由左至右) 分、时、天、月、年,全部为*默认含义为每分钟都更新
         schedule => "* * * * *"
         # 是否记录上次执行的结果
         record_last_run => "true"
        # 使用其它字段追踪,而不是用时间
         use_column_value => "true"
         tracking_column => "id"
         last_run_metadata_path => "/Users/www/elk/7.13.2/logstash-7.13.2/log-record/order_statics.txt"
         # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
         clean_run => false
         # 直接写sql语句用这个
         statement => "SELECT * FROM `leiyuan` WHERE id > :sql_last_value"
         type => "order_statics" # 如果数据库字段含有type,此数据会被替换掉,不建议这么使用
         add_field => {table_name => "order_statics"}
       }
}
filter {
  json {
    source => "message"
    remove_field => ["message"]
  }
}

output {
  # stdout {codec => rubydebug }
  stdout {codec => json_lines }
  elasticsearch {
    hosts => ["http://192.168.3.132:9200"]
    index => "mysql-shop_trades-order_statics"
    document_id => "%{id}"
  }
}

执行单个配置文件

# 校验是否有语法错误
bin/logstash -f config/logstash-mysql.conf --config.test_and_exit // 第一次校验配置是否正确

# 执行
bin/logstash -f config/logstash-mysql.conf

# 启动多个logstash文件:需要配置pipelines.yml

 

对多配置文件的引用


[root@VM235 config]# less pipelines.yml |grep -v "#"

 - pipeline.id: mysql
   pipeline.workers: 1
   pipeline.batch.size: 125
   path.config: "/opt/ci123/elk/logstash-6.7.0/config/config.d/mysql.d/*.conf"

 - pipeline.id: application
   pipeline.workers: 1
   pipeline.batch.size: 125
   path.config: "/opt/ci123/elk/logstash-6.7.0/config/config.d/application.conf"
   

 

# 校验
1. 开启config/logstash.yml中的 config.test_and_exit: true
2. bin/logstash
# (画外音:pipelines.yml启动不成功,或找不到pipelines.yml时,很有可能是语法错误,yml对语法邀请非常严格,需仔细检查)

# 启动
bin/logstash  --config.reload.automatic

 

显示以下结果说明,数据正在通过定时脚本导入

[2021-07-05T16:43:00,041][INFO ][logstash.inputs.jdbc     ][main][leiyuan] (0.001561s) SELECT * FROM `leiyuan` WHERE id > 0
{"@timestamp":"2021-07-05T08:43:00.213Z","dated":"2021-07-05T01:28:30.000Z","@version":"1","type":"jdbc","name":"第一名","id":1}
[2021-07-05T16:44:00,063][INFO ][logstash.inputs.jdbc     ][main][leiyuan] (0.002067s) SELECT * FROM `leiyuan` WHERE id > 1
[2021-07-05T16:45:00,066][INFO ][logstash.inputs.jdbc     ][main][leiyuan] (0.001203s) 
{"@timestamp":"2021-07-05T08:52:00.286Z","dated":"2021-07-03T23:19:18.000Z","@version":"1","type":"jdbc","name":"第二名","id":2}
{"@timestamp":"2021-07-05T08:52:00.299Z","dated":"2021-06-30T23:15:00.000Z","@version":"1","type":"jdbc","name":"第三名","id":3}
[2021-07-05T16:53:00,312][INFO ][logstash.inputs.jdbc     ][main][leiyuan] (0.002697s) SELECT * FROM `leiyuan` WHERE id > 3
[2021-07-05T16:53:00,312][INFO ][logstash.inputs.jdbc     ][main][leiyuan] (0.002697s) SELECT * FROM `leiyuan` WHERE id > 3

 

 

 


其他文件:

发表评论