作者:Eddy  历史版本:1  更新时间:2024-05-13 18:33

适用于v3.3.8+
该方案是延时同步,列表数据会根据logstash设置的数据同步时间出现延时,即数据不会马上在列表中展示;

软件及配置准备

  • Elasticsearch 6.8.1
  • Logstash 6.8.1
  • ibps-logstash-client.jar,用于接收IBPS推送出来的logstash配置文件并添加到logstashpipelines.yml配置文件中;
  • RabbitMQ3.x,配置文件推送使用的是MQ广播,暂时只支持RabbitMQ,它与ibps使用的是同一个MQ同一个用户同一个虚拟主机路径;
  • 数据库驱动文件,本文以mysql为例;
  • ik分词配置文件logstash.ik.json

软件安装

Elasticsearch安装

http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv0uf532kaj

Logstash安装

http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv11f1sq13t

RabbitMQ3安装

http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkinltsmcie9

ibps-logstash-client.jar安装

如果logstash有多台服务器,那么需要修改源码工程ibps-provider-logstash-clientcom.lc.ibps.logstash.client.rabbitmq.consumer.RabbitLogstashQueueConsumer的注解@Queue,广播消息需要同步配置到每一个logstash服务器中并保存对应的数据同步配置文件;

  • 该程序是ibps提供的一个客户端程序,直接打包源码工程ibps-provider-logstash-client即可;

  • 配置参数(通过环境变量方式设置,以windows为例)

    • logstash管道配置文件路径,LOGSTASH_PIPELINES=D:\docker\logstash\conf.6.8.1\pipelines.yml
    • logstash同步文件目录(即ibps在数据模版功能中生成的logstash同步mysql数据至elasticsearch的配置文件存放的目录),LOGSTASH_PIPELINES_DIR=D:\docker\logstash\conf.6.8.1\logstash\mysql
    • RabbitMQ主机IP,SPRING_RABBITMQ_HOST=192.168.3.220
    • RabbitMQ用户名,SPRING_RABBITMQ_USERNAME=ibpsoptimize
    • RabbitMQ密码,SPRING_RABBITMQ_PASSWORD=ibpsoptimize
    • RabbitMQ虚拟主机路径,SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
  • 客户端的启动脚本start.batstart.sh

    • start.bat

      @echo off
      set LOGSTASH_PIPELINES=D:\\docker\\logstash\\conf.6.8.1\\pipelines.yml
      set LOGSTASH_PIPELINES_DIR=D:\\docker\\logstash\\conf.6.8.1\\logstash\\mysql
      set SPRING_RABBITMQ_HOST=192.168.3.118
      set SPRING_RABBITMQ_USERNAME=ibpsoptimize
      set SPRING_RABBITMQ_PASSWORD=ibpsoptimize
      set SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
      java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ibps-logstash-client.jar
      pause
    • start.sh

      #!/bin/bash
      shome=$(cd `dirname $0`; pwd)
      # set variables
      export LOGSTASH_PIPELINES=/opt/docker/logstash-es/logstash_conf/pipelines.yml
      export LOGSTASH_PIPELINES_DIR=/opt/docker/logstash-es/logstash_conf/logstash/mysql
      export SPRING_RABBITMQ_HOST=192.168.3.220
      export SPRING_RABBITMQ_USERNAME=ibpsoptimize
      export SPRING_RABBITMQ_PASSWORD=ibpsoptimize
      export SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
      # run client
      java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ${shome}/ibps-logstash-client.jar >/dev/null 2>&1 &

IBPS启用Elasticsearch

  • 启用全局配置com.lc.db.elasticsearch.enabled:true
  • 启用业务列表的ES检索配置,以员工列表为例
    • 在platform服务中添加配置项elasticsearch.com.lc.ibps.org.party.repository.impl.PartyEmployeeRepositoryImpl.query.enabled: true
    • 配置项规则elasticsearch.[class full name].[method].enabled
    • 规则代码com.lc.ibps.base.framework.repository.IRepository.isElasticsearchOpenning(String)
  • 编写json配置,配置es排序文件

文件存放在Po类同一个下

{
    "query": {
        "parameters": [
            {
                "key": "Q^user_id_^NE",
                "value": "-1"
            },
            {
                "key": "Q^status_^NE",
                "value": "deleted"
            }
        ],
        "sorts": [
            {
                "field": "create_time_",
                "order": "DESC"
            }
        ]
    }
}
  • 手动创建logstash同步配置文件
input {
  jdbc {    
    jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-5.1.36.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.3.240:3306/ibps_boot_v3_optimize_05?useUnicode=true&characterEncoding=utf-8&autoReconnect=true"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "100000"
    schedule => "* * * * *"
    statement => "SELECT    * FROM IBPS_PARTY_EMPLOYEE LEFT JOIN ( SELECT ID_ USER_ID_, ACCOUNT_, IS_SUPER_ FROM IBPS_PARTY_USER ) A ON USER_ID_ =
 ID_ LEFT JOIN ( SELECT ID_ ORG_ID_, NAME_ ORG_NAME_ FROM IBPS_PARTY_ORG ) B ON ORG_ID_ = GROUP_ID_ WHERE update_time_ >= :sql_last_value order by upd
ate_time_ asc"
    type => "jdbc"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time_"
    record_last_run => true
    lowercase_column_names => true
    last_run_metadata_path => "/usr/share/logstash/data/sync_point_of_ibps_party_employee"
  }
}

filter {
    ruby {
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    ruby {
        code => "event.set('create_time_', event.get('create_time_').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('update_time_', event.get('update_time_').time.localtime + 8*60*60)" 
    }
}

output {
    elasticsearch {
        hosts => ["192.168.3.220:9200"]
        index => "ibps_party_employee"
        document_id => "%{id_}"
        document_type => "ibps_party_employee"
        template_overwrite => true
        template => "/usr/share/logstash/pipeline/logstash.ik.json"
    }
    stdout {
        codec => json_lines
    }
}
  • 添加pipelines.yml文件引入
- pipeline.id: ibps_party_employee
  path.config: "/usr/share/logstash/pipeline/mysql/ibps_party_employee.conf"