作者:Eddy  历史版本:1  最后编辑:Eddy  更新时间:2025-08-08 15:26
适用于
v3.3.8+;
该方案是延时同步,列表数据会根据logstash设置的数据同步时间出现延时,即数据不会马上在列表中展示;
软件及配置准备
- Elasticsearch 
6.8.1; - Logstash 
6.8.1; ibps-logstash-client.jar,用于接收IBPS推送出来的logstash配置文件并添加到logstash的pipelines.yml配置文件中;- RabbitMQ3.x,配置文件推送使用的是MQ广播,
暂时只支持RabbitMQ,它与ibps使用的是同一个MQ同一个用户同一个虚拟主机路径; - 数据库驱动文件,本文以
mysql为例; - ik分词配置文件
logstash.ik.json; 
软件安装
Elasticsearch安装
http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv0uf532kaj
Logstash安装
http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv11f1sq13t
RabbitMQ3安装
http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkinltsmcie9
ibps-logstash-client.jar安装
如果logstash有多台服务器,那么需要修改源码工程
ibps-provider-logstash-client中com.lc.ibps.logstash.client.rabbitmq.consumer.RabbitLogstashQueueConsumer的注解@Queue,广播消息需要同步配置到每一个logstash服务器中并保存对应的数据同步配置文件;
该程序是ibps提供的一个客户端程序,直接打包源码工程
ibps-provider-logstash-client即可;配置参数(通过环境变量方式设置,以
windows为例)- logstash管道配置文件路径,LOGSTASH_PIPELINES=D:\docker\logstash\conf.6.8.1\pipelines.yml
 - logstash同步文件目录(即ibps在数据模版功能中生成的logstash同步mysql数据至elasticsearch的配置文件存放的目录),LOGSTASH_PIPELINES_DIR=D:\docker\logstash\conf.6.8.1\logstash\mysql
 - RabbitMQ主机IP,SPRING_RABBITMQ_HOST=192.168.3.220
 - RabbitMQ用户名,SPRING_RABBITMQ_USERNAME=ibpsoptimize
 - RabbitMQ密码,SPRING_RABBITMQ_PASSWORD=ibpsoptimize
 - RabbitMQ虚拟主机路径,SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
 
客户端的启动脚本
start.bat、start.sh;start.bat
@echo off set LOGSTASH_PIPELINES=D:\\docker\\logstash\\conf.6.8.1\\pipelines.yml set LOGSTASH_PIPELINES_DIR=D:\\docker\\logstash\\conf.6.8.1\\logstash\\mysql set SPRING_RABBITMQ_HOST=192.168.3.118 set SPRING_RABBITMQ_USERNAME=ibpsoptimize set SPRING_RABBITMQ_PASSWORD=ibpsoptimize set SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ibps-logstash-client.jar pausestart.sh
#!/bin/bash shome=$(cd `dirname $0`; pwd) # set variables export LOGSTASH_PIPELINES=/opt/docker/logstash-es/logstash_conf/pipelines.yml export LOGSTASH_PIPELINES_DIR=/opt/docker/logstash-es/logstash_conf/logstash/mysql export SPRING_RABBITMQ_HOST=192.168.3.220 export SPRING_RABBITMQ_USERNAME=ibpsoptimize export SPRING_RABBITMQ_PASSWORD=ibpsoptimize export SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize # run client java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ${shome}/ibps-logstash-client.jar >/dev/null 2>&1 &
IBPS启用Elasticsearch
- 启用全局配置
com.lc.db.elasticsearch.enabled:true; - 启用业务列表的
ES检索配置,以员工列表为例- 在platform服务中添加配置项
elasticsearch.com.lc.ibps.org.party.repository.impl.PartyEmployeeRepositoryImpl.query.enabled: true; - 配置项规则
elasticsearch.[class full name].[method].enabled; - 规则代码
com.lc.ibps.base.framework.repository.IRepository.isElasticsearchOpenning(String) 
 - 在platform服务中添加配置项
 - 编写json配置,配置es排序文件
 
文件存放在Po类同一个下
{
    "query": {
        "parameters": [
            {
                "key": "Q^user_id_^NE",
                "value": "-1"
            },
            {
                "key": "Q^status_^NE",
                "value": "deleted"
            }
        ],
        "sorts": [
            {
                "field": "create_time_",
                "order": "DESC"
            }
        ]
    }
}- 手动创建logstash同步配置文件
 
input {
  jdbc {    
    jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-5.1.36.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.3.240:3306/ibps_boot_v3_optimize_05?useUnicode=true&characterEncoding=utf-8&autoReconnect=true"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "100000"
    schedule => "* * * * *"
    statement => "SELECT    * FROM IBPS_PARTY_EMPLOYEE LEFT JOIN ( SELECT ID_ USER_ID_, ACCOUNT_, IS_SUPER_ FROM IBPS_PARTY_USER ) A ON USER_ID_ =
 ID_ LEFT JOIN ( SELECT ID_ ORG_ID_, NAME_ ORG_NAME_ FROM IBPS_PARTY_ORG ) B ON ORG_ID_ = GROUP_ID_ WHERE update_time_ >= :sql_last_value order by upd
ate_time_ asc"
    type => "jdbc"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time_"
    record_last_run => true
    lowercase_column_names => true
    last_run_metadata_path => "/usr/share/logstash/data/sync_point_of_ibps_party_employee"
  }
}
filter {
    ruby {
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    ruby {
        code => "event.set('create_time_', event.get('create_time_').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('update_time_', event.get('update_time_').time.localtime + 8*60*60)" 
    }
}
output {
    elasticsearch {
        hosts => ["192.168.3.220:9200"]
        index => "ibps_party_employee"
        document_id => "%{id_}"
        document_type => "ibps_party_employee"
        template_overwrite => true
        template => "/usr/share/logstash/pipeline/logstash.ik.json"
    }
    stdout {
        codec => json_lines
    }
}
- 添加pipelines.yml文件引入
 
- pipeline.id: ibps_party_employee
  path.config: "/usr/share/logstash/pipeline/mysql/ibps_party_employee.conf"