Elasticsearch Query DsL查询


Query DsL查询

一 Elasticsearch简介

Elasticsearch 是一个开源的搜索引擎,Elasticsearch 使用 Java 编写的,它的内部使用 Lucene 做索引与搜索,但是它的目的是使全文检索变得简单, 通过隐藏 Lucene 的复杂性,取而代之的提供一套简单一致的 RESTful API。

  • 一个分布式的实时文档存储,每个字段 可以被索引与搜索
  • 一个分布式实时分析搜索引擎
  • 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据

 

二 安装并运行

已经在其他文档中详细介绍,此次仅做简单步骤介绍

# 安装:
$   wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.13.3-linux-x86_64.tar.gz
$   tar -xzf elasticsearch-7.13.3-linux-x86_64.tar.gz 
$   cd elasticsearch-7.13.3/
# 运行
sh bin/elasticsearch

# 访问
$ curl http://192.168.3.14:9200/ 
{
  "name" : "87DNZWU",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "e3A3l85MSZuZlRhxj6IB2w",
  "version" : {
    "number" : "6.7.0",
    "build_flavor" : "default",
    "build_type" : "zip",
    "build_hash" : "8453f77",
    "build_date" : "2019-03-21T15:32:29.844721Z",
    "build_snapshot" : false,
    "lucene_version" : "7.7.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

 

三 Query DSL 基本结构

查询表达式(Query DSL)是一种非常灵活又富有表现力的查询语言, Elasticsearch 使用它可以以简单的 JSON 接口来展现 Lucene 功能的绝大部分

// 查询
GET /_search  // 查找整个ES中所有索引的内容
{
  "query": {}, //具体的查询语句对象
  "from": 0,   //从第几条数据开始返回
  "size": 100, //返回的条数 默认ES最多返回10000条
  "highlight": { //高亮
    "pre_tags": {}, //高亮内容的前面标签 一般都是html比如<b> <p>这种
    "post_tags": {},//高亮内容的后面标签 一般都是html比如</b> </p>这种
    "fields": { //需要高亮的字段
    }
  },
  "sort": [{ //排序
    "FIELD": { //排序的字段(需要填上具体的字段名)
      "order": "desc"
    }
  }],
  "_source": "{field}" //指定返回的字段
}

// 结果
{
    "took": 350,  // 整个搜索请求消耗了多少毫秒
    "timed_out": false, // 表示本次查询是否超时,如果为true也会返回结果,只是数据可能不完整
    "_shards": { //  显示查询中参与的分片信息,成功多少分片失败多少分片等
        "total": 5, 
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 5, // 匹配到的文档总数
        "max_score": 1, // 为文档中所有_score的最大值
        "hits": [
            {
                "_index": "mysql-shop_trades-order_item_label_binds",
                "_type": "doc",
                "_id": "591935",
                "_score": 1,
                "_source": {
                    "id": 591935,
                    "updated_at": "2021-05-20T06:26:09.000Z",
                    "@version": "1",
                    "bind_item_label_id": 729,
                    "label_type": "brand",
                    "created_at": "2021-05-20T06:26:09.000Z",
                    "@timestamp": "2021-07-07T07:31:36.262Z",
                    "is_deleted": 0,
                    "table_name": "order_item_label_binds",
                    "bar_code": "6907925004486"
                }
            }
        ]
    }
}

 

四 指定索引搜索

 

上述查询会搜索ES中的所有索引,但通常只需要去固定一个或几个索引中搜索,搜索全部无疑会造成资源的浪费,在ES中可以通过以下几种方法来指定索引

  1. 指定一个固定的索引,ops-coffee-nginx-2019.05.15为索引名字
GET /mysql-shop_trades-order_statics/_search

以上表示在mysql-shop_trades-order_statics索引下查找数据

  1. 指定多个固定索引,多个索引名字用逗号分割
GET /mysql-shop_trades-order_statics,mysql-shop_trades-order_item_labels/_search

  1. 用*号匹配,在匹配到的所有索引下查找数据
GET /mysql-shop_trades-*/_search

这里也可以用逗号分割多个匹配索引

五 DSL查询

1、筛选字段

// 筛选_source的数据,单个字段
GET /_search
{
  "_source": "bar_code",  
  "query": {}
}

// 筛选_source的数据,多个字段
{
  "_source": {
    "includes": ["store_id", "sku_id"]
  },
  "query": {}
}

// 对字段进行转换
{
  "docvalue_fields": [
    {
      "field": "updated_at",
      "format": "yyyy-MM-dd HH:mm:ss"
    },
    {
      "field": "num",
      "format": "long" // 没有作用,懵逼...
    }
  ], 
  "query": {}
}

 

2、多条件查询 (where)

  1. constant_score:装另一个查询的查询,固定分数查询,支持filter查询,不支持match查询:
    {
        "constant_score": {
            "filter": {
                "match": {
                    "name": "小米"
                }
            },
            "boost": 10
        }
    }
    

     

  2. bool:主要与其他关键字组合使用,多条件的查询必须要用bool包在外层,然后再根据具体的业务来拼接。

{
  "query": {
    "bool": {
      "should": [{}], //满足其中一个对象查询条件就行 像sql里的or
      "must": [{}],   //必须满足所有对象的查询条件 就像sql里的and
      "must_not": [{}] //必须不满足所有对象的查询条件 就像sql里的and !=
    }
  }
}

 

  1. must: 类似于SQL中的AND,必须包含
  2. must_not: 类似于SQL中的NOT,必须不包含
  3. should: 满足这些条件中的任何条件都会增加评分_score,不满足也不影响,should只会影响查询结果的_score值,并不会影响结果的内容
  4. filter: 与must相似,但不会对结果进行相关性评分_score,大多数情况下我们对于日志的需求都无相关性的要求,所以建议查询的过程中多用filter

3、group by:

ES本身没有group关键词搜索,但支持聚合查询,,需要使用关键字aggs

// 单个字段 group by
{
  "query":{},//这里省略你的查询条件
  "aggs": {
    "age_group": {//这个是指你要返回字段名
      "terms": { //这里还可以用其它关键词 这里terms才能实现group by效果
        "field": "age",//groupby的字段
        "size":1 //返回的条数 相当于group by limit
      }
    }
  }
}

// 多字段group by (如 group by sku_id,store_id)
// 方法一:script
{
  "query":{},
  "aggs": {
    "age_group": {
      "terms": {
        "script":{
          "source": """ 's' + doc['store_id'] + '_s' + doc['sku_id'] """,
          "lang": "painless"
        },
        "size": 10
      }
    }
  }
}

// 方法二:copy to
1. 设置mapping中的多个字段,copy_to 为同一个字段(skuId_storeId)
2. 搜索新字段
{
  "query":{},
  "aggs": {
    "list": {
      "terms": { 
        "field": "skuId_storeId
        "size":1
      }
    }
  }
}

// 方法三:multi_terms (使用高版本,目前6.7不支持)
{
  "aggs": {
    "genres_and_products": {
      "multi_terms": {
        "terms": [{
          "field": "genre" 
        }, {
          "field": "product"
        }]
      }
    }
  }
}

4、order by

order by:注意日期格式和数值格式才支持排序;文本不支持,如果要排序, 需把字段设置为not analysis

// 单排序
{
    "query": {
        "sort": {
            "id": "desc"
        }
    }
}

// avg按照平均值排序
{
    "query": {
        "sort": [
            {
                "id": "desc"
            },
            {
                "price": {
                    "order": "asc",
                    "mode": "avg"
                }
            }
        ]
    }
}

5、count(distinct)

{
  "query":{},
  "aggs": {
    "total_sku_id": {
      "cardinality":{ "field": "sku_id"}
    },
     "total_entity_store_id": { // 非数字类型,无法使用field排序,可以对field增加fieldData = true,或者对field.keyword排序,建议使用后者,高效内存消耗低
      "cardinality":{ "field": "entity_store_id.keyword"}
    }
  }
}

 

6、SUM

{
  "query":{},
  "aggs": {
    "total_pay_num": {
      "sum": {"field": "num"}
    },
     "total_cost_fee": {
      "sum": {"field": "cost_fee"}
    }
  }
}

 

7、distinct :

select distinct(id) from table

{
  "query":{},
  "collapse": {
      "field": "id" //你需要distinct的字段
   }, 
}

 

8、limit

1. 分页:
    1. form:从第几个开始查询,最开始是0
    2. size:即limit
    3. 使用size,size最大可获取数量是xx个
2. 获取所有数据的三种方式
    1. scroll深度滚动需要根据scroll_id和循环取,取完后,需删除scroll,减小内存开销(深度滚动高效,用于处理大量数据,不适合实时获取)
    2.调整索引index.max_result_window的大小,默认10000 (大小与堆内存成正比,这个限制内存)
    3.search_after:请求需增加前一个结果的排序,(实时游标,可根据索引更新和删除而改变 )
    4。 如果是group by查询获取所有数据, 获取需要使用到cardinality查询预估总数,再使用partition、num_partitions分区依次获取数据

9、搜索关键字

  1. match:自定字段,根据字段关键字进行搜索,会分割关键词,匹配到含有一个多多个词的匹配
  2. query_string:全文搜索
  3. match_phrase:不分割关键词 {"match_phrase": {"name":"婴幼儿奶粉"}}
  4. term: 类似SQL where field = x,主要用于数字匹配;如果要匹配文本,会自动分词,不能精准查询,需把字段设置成not analysis
    {
      "query": {
        "term": {"bind_item_label_id": 729}
      }
    }
    

     

  5. terms: 类似SQL where field in (x,x),主要用于数字匹配,
    {
      "query": {
        "terms": {"bind_item_label_id": [703,729]}
      }
    }
    

     

  6. range:: 查询价格在1000-2000的商品
{
    "query": {
        "range": {
            "price": {
                "gte": 1000,
                "lte": 2000
            }
        }
    }
}

 

  1. filter:判断文档是否满足条件
{
    "query": {
        "bool": {
            "filter": {
                "term": {
                    "price": 1999
                }
            }
        }
    }
}

Elasticsearch:Aggregation

  1. metric:度量聚合,主要针对number类型的数据,需要es做较多的计算工作(类似SQL的SUM、MAX、AVG、MIN、Cardinality、stats<属于多值分析>等)
  2. bucket:桶聚合,划分不同步的桶,将数据分配到不同的桶,(类似SQL中的group by)
  3. Pipeline Aggregation:管道分析类型,对其他聚合结果进行二次聚合
  4. Matrix Aggregation:矩阵分析类型,支持对多个字段的操作并提供一个结果矩阵

term aggregation

  • size 可以通过size返回top size的文档,该术语聚合针对顶层术语(不包含嵌套词根),其搜索过程是将请求向所有分点发送请求,每个分片节点返回size条数据,然后聚合所有分片的结果(会对各分片返回的同样词根的数数值进行相加),最终从中挑选size条记录返回给客户端。从这个过程也可以看出,其结果并不是准确的,而是一个近似值。
  • Shard Size 为了提高该聚合的精确度,可以通过shard_size参数设置协调节点向各个分片请求的词根个数,然后在协调节点进行聚合,最后只返回size个词根给到客户端,shard_size >= size,如果shard_size设置小于size,ES会自动将其设置为size,默认情况下shard_size建议设置为(1.5 * size + 10)。
// 单个字段 group by
{
  "query":{},//这里省略你的查询条件
  "aggs": {
    "age_group": {//这个是指你要返回字段名
      "terms": { //这里还可以用其它关键词 这里terms才能实现group by效果
        "field": "age",//groupby的字段
        "size":1 //返回的条数 相当于group by limit
      }
    }
  }
}

// 返回结果格式
{
  ...
  "aggregations" : {
      "list" : {
        "doc_count_error_upper_bound" : 0, // 该值表示未进入最终术语列表的术语的最大潜在文档计数
        "sum_other_doc_count" : 90 // 该值表示未进入最终术语列表的术语的最大潜在文档计数
        "buckets" : [ // 返回doc_count排名最前的10个,受size参数的影响
          {
            "key" : "1",
            "doc_count" : 24,
            "total_refund_fee" : {
              "value" : 0.0
            },
            "total_cost_fee" : {
              "value" : -14976.0
            },
          }
        ]
      }
    }
    }
}

 

 

match_phrase :查询分析文本,创建词组查询

https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query-phrase.html#query-dsl-match-query-phrase

 

 

举例子

GET mysql-shop_trades-order_item_label_binds/_search/?scroll=1m
{
  "docvalue_fields": [
    {
      "field": "updated_at",
      "format": "yyyy-MM-dd HH:mm:ss"
    }
  ], 
  "size": 1000,
  "sort": {"id":"desc"},
  "query": {
    "bool": {
      "must": [
        {"match": {"is_deleted": 0}},
        {"match": {"label_type": "brand"}},
        {
          "constant_score": {
            "filter": {
              "terms": {
                "bind_item_label_id": [703, 2, 729]
                }
              }
            }
        }
      ]
    }
    }
}




GET mysql-shop_trades-order_item_label_binds/_search/?scroll=1m
{
  "_source": "bar_code", 
  "query": {
    "bool": {
      "filter": [
        {"match": {"is_deleted": 0}},
        {"match_phrase": {"label_type": "brand"}},
        {"terms": {"bind_item_label_id": [703, 2, 729]}}
      ]
    }
  },
  "aggs": {
    "bar_code_group": {
      "terms": {
        "field": "bar_code.keyword",
        "size": 10 
      }
    }
  }
}


GET mysql-shop_trades-order_item_label_binds,mysql-shop_trades-order_statics/_search
{
  "query": {
    "bool": {
      "filter": [
        {"match_phrase": {"sys_name": "yiqigou"}},
        {"range": {"num": {"lte": 2000}}},
        {"range": {"return_num": {"gte": -1000}}},
        {"range": {"total_price": {"lte": 1000000}}},
          {"match": {"id": 60}},
        {"term": {"order_type": 0}},
        {"term": {"item_type": 0}},
        {"range": {"date": {
          "gte": "2020-01-21 00:00:00",
          "lte": "2021-07-22 00:00:00",
          "format": "yyyy-MM-dd HH:mm:ss",
          "time_zone": "+08:00"
        }}}
      ],
      "must_not": [
        {"terms": {"store_id": [165]}}
      ]
    }
  }
}

 

 

设置fieldData

// 第一步,创建索引 (如果已经有索引,直接看第二步)
PUT mysql-shop_trades-order_statics2
{
  "mappings": {
    "_doc": {
      "properties": {
        "entity_store_id": { 
          "type": "text",
          "fields": {
            "keyword": { 
              "type": "keyword"
            }
          }
        }
      }
    }
  }
}

// 第二步 设置fieldData为true
PUT mysql-shop_trades-order_statics/_mapping/_doc
{
  "properties": {
    "entity_store_id": {
      "type":     "text",
      "fielddata": true
    }
  }
}

// 第三步 可以查看该索引的Mapping结构,fieldData是否加上去
{
  "mapping": {
    "doc": {
      "properties": {
        "entity_store_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          },
          "fielddata": true
        }
      }
    }
  }
}

延伸

 

 

设置 max_result_window

PUT /mysql-shop_trades-order_statics/_settings
{
  "index": {
    "max_result_window": 100000
  }
}

 

Spring Boot 入门

一、开发基础

  • Java基础(两到三小时过一遍)
  • Java开发环境配置必须使用JDK1.8
  • IDE安装(优先使用IntelliJ IDEA)

二、名词解释

  • Spring:JAVA开发应用框架
  • Spring Boot:用来简化Spring应用的初始搭建以及开发过程的配置框架
  • Maven:Java项目构建工具,成熟的项目
  • Gradle:更简洁的Java项目构建工具,吸收了旧构建工具的优点。
  • JPA:是Sun官方提出的Java持久化规范,即数据库操作规范。
  • Hibernate:Hibernate是一个ORM框架,是JPA的默认实现方式,一般说JPA都是指Hibernate。
  • Mybatis:Mybatis是一个轻便的ORM框架。
  • Spring data jpa:是Spring基于ORM框架、JPA规范的基础上封装的一套JPA应用框架

三、新项目流程

  1. 新建gradle项目
    • File->new->Prroject->Spring Initializr

    • 填写Group、Artifact选择Gradle Project项目生成

    • 可以直接在生成项目的时候选择对应需要安装的插件,如:web、jpa、mybatis等,也可以在项目初始化完成之后在build.gradle中添加/配置

  2. 配置build.gradle(位于根目录)

    plugins {
        id 'org.springframework.boot' version '2.1.3.RELEASE'
        id 'java'
    }
    
    apply plugin: 'io.spring.dependency-management'
    
    group = 'com.duomai'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8' // JDK最大兼容版本
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        // Spring Boot JPA 组件
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        // Spring Boot Web组件
        implementation 'org.springframework.boot:spring-boot-starter-web'
        // Mybatis插件,注意暂时使用**1.1.1**版本,高版本的运行好像有问题
        implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:1.1.1'
        // Mybatis分页插件
        implementation group: 'com.github.pagehelper', name: 'pagehelper-spring-boot-starter', version: '1.2.10'
        runtimeOnly 'org.springframework.boot:spring-boot-devtools'
        runtimeOnly 'mysql:mysql-connector-java'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
    }
    
    

    修改了build.gradle后,idea会自动安装/更新依赖包。
    参考:gradle官网Spring Boot Web服务搭建Spring Boot Mysql使用Spring Boot JPA使用

  3. 项目基础配置(位于src/resources/application.properties

    #运行配置
    server.port=9000
    #数据格式配置
    spring.jackson.time-zone=GMT+8 // 设置接口返回时区为东八区
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss // 自动将接口返回中的日期格式转换为标准格式
    #数据库连接配置
    spring.datasource.url=jdbc:mysql://192.168.0.235:3355/shop_balances?serverTimezone=Asia/Shanghai&tinyInt1isBit=false // serverTimezone选择Mysql东八区,tinyInt1isBit禁止Mysql自动将tinyint(1)类型数据映射为boolean类型
    spring.datasource.username=cishop
    spring.datasource.password=fuyuan1906
    #log配置
    logging.path=E:/java/demo/balance_card/log
    logging.level.com.favorites=DEBUG
    #logging.level.org.springframework.web=INFO
    logging.level.org.hibernate=ERROR
    #mybatis设置
    mybatis.type-aliases-package=com.duomai.balance_card.Model.Mapper
    mybatis.configuration.map-underscore-to-camel-case=true
    logging.level.com.duomai.balance_card.Model.Mapper=DEBUG
    #pagehelper插件设置
    pagehelper.helperDialect=mysql
    pagehelper.reasonable=false
    pagehelper.supportMethodsArguments=true
    pagehelper.params=pageNum=page;pageSize=limit
    #jpa 设置
    spring.jpa.properties.hibernate.hbm2ddl.auto=update
    spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5InnoDBDialect
    spring.jpa.show-sql= true
    
  4. 项目启动
    • IDEA启动,运行[Prefix]Application.java文件即可

    • 命令行启动

      • Maven项目
        在springboot的应用的根目录下运行 mvn spring-boot:run

      • Grdale项目
        在springboot的应用的根目录下运行 gradle bootRungradlew bootRun
        (前者是使用本地的gradle版本运行,后者是使用代码仓库中的gradle运行)

    • 打包构建可执行文件运行

  5. 开发环境热更新设置
    热更新不是很好用,有一定的延迟时间

四、项目文件分层解析

// 业务代码 src/main
java 
    com
        duomai
            balance_card
                [Prefix]Application.java // 项目启动文件,可以做一些全局设置,如时区设置、Mapper扫描等
                Config // 配置类,用于注册一些全局配置,如拦截器注册等
                Middleware // 中间件,实现AOP功能
                Controller // 控制器,主要做路由功能
                    xxxController.java
                    BaseErrorController.java // 路由匹配失败时使用的控制器
                Service // 业务代码
                Model // 目录主要用于实体与数据访问层
                    Entity // 数据表实体类
                    Repository // JPA数据仓库
                    Mapper // Mybatis映射文件
                    Provider // Mapper的Sql生成器
                Library // 库类,存放公共类文件/纯定义文件等
                    ApiReturnDefines.java // 接口返回定义
                    ExceptionErrprDefines.java // 异常监听层定义
                Helper // 辅助函数类文件
                OutPut // 接口输出层
                    ApiResult.java // 最终的接口输出格式
                    AiReturn.java // 快速生成ApiResult类,供外部调用
                Exception // 统一的异常处理
                    ControllerHandler // 路由层异常监听
                    SqlHandler // 数据库层异常监听
// 配置项
resources
    appliaction.properties // 项目配置文件

五、控制器中间件

  1. 一般使用Spring过滤器或拦截器实现AOP切面编程

  2. 过滤器和拦截器的对比

    • 作用域不同
      过滤器依赖于servlet容器,只能在 servlet容器,web环境下使用。
      拦截器依赖于spring容器,可以在spring容器中调用,不管此时Spring处于什么环境。
    • 细粒度的不同
      过滤器的控制比较粗,只能在请求进来时进行处理,对请求和响应进行包装。
      拦截器提供更精细的控制,可以分为controller对请求处理之前、渲染视图之后、请求处理之后三个切面。
    • 中断链执行的难易程度不同
      拦截器可以 preHandle方法内返回 false 进行中断。
      过滤器就比较复杂,需要处理请求和响应对象来引发中断,需要额外的动作,比如将用户重定向到错误页面。
  3. 拦截器的使用
    • 编写自定义拦截器(Middleware/ControllerInterceptor.java
    public class ControllerInterceptor implements HandlerInterceptor {
        private Logger logger = LoggerFactory.getLogger(ControllerInterceptor.class);
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            logger.info("preHandle....");
            // token校验等
            String token = request.getHeader("token");
            //     Common.sendJson(response, ApiReturn.fail(1001, "token验证失败"));
            //      return false;
            return true;
        }
    
        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
            logger.info("postHandle...");
        }
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
            // 接口日志记录等
            logger.info("afterCompletion...");
        }
     ```
    
    **说明**:
    preHandle:对客户端发过来的请求进行前置处理,如果方法返回true,继续执行后续操作,如果返回false,执行中断请求处理,请求不会发送到Controller。可以在这里校验一些权限信息,如token等,校验失败直接以JSON格式返回请求。
    
    postHandler:在请求进行处理后执行,也就是在Controller方法调用之后处理,前提是preHandle方法返回true。具体来说,postHandler方法会在DispatcherServlet进行视图返回渲染前被调用。
    
    afterCompletion: 该方法在整个请求结束之后执行,前提依然是preHandle方法的返回值为true。
    
    • 注册拦截器(Config/InterceptorConfig.java
    @Configuration
    public class InterceptorConfig implements WebMvcConfigurer {
        @Override
        // 核心方法
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(ControllerInterceptor())
                    //配置拦截规则
                    .addPathPatterns("/**")
                    .order(1);
    
            // 多个拦截器按上述方法持续注册即可,同时也可以设置order值,从小到大执行。
        }
    
        @Bean
        public HandlerInterceptor ControllerInterceptor() {
            return new ControllerInterceptor();
        }
    }
    

六、Controller层

  1. 定义控制器文件
    @RestController
    在类文件头部定义,标明为控制器文件,且输出格式为JSON
    
  2. 路由和参数
    1. 定义路由名称,接收方法
      例:@RequestMapping(value = "/get", method = {RequestMethod.GET, RequestMethod.POST})
      
        可选参数:
      
        value:路由名称
      
        method:指定请求的method类型, GET、POST、PUT、DELETE、PATCH等,可多选
      
        consumes:指定处理请求的提交内容类型(Content-Type),例如application/json, text/html
      
        produces: 指定返回的内容类型,仅当request请求头中的(Accept)类型中包含该指定类型才返回
      
        params: 指定request中必须包含某些参数值
      
        headers: 指定request中必须包含某些指定的header值
      
    2. 参数接收
      例:@RequestParam(value = "fields", required = false, defaultValue = "*") String fields
      
          value:参数名称
      
          defaultValue:默认值
      
          required:是否是必要参数
      
    3. 自定义错误路由
      1. 在Controller层中添加BaseErrorController.java文件,用于监听路由匹配失败的情况
        @Controller
        public class BaseErrorController implements ErrorController {
            @Override
            public String getErrorPath() {
                System.out.print("错误页面");
                return "error/error";
        }
        
            @RequestMapping(value = "/error")
            public void error() throws Exception {
                throw new Exception("路由匹配失败");
            }
        }
        
      2. 在Exception文件夹中添加ControllerHandler.java,用于捕获路由报错并输出。
        @RestControllerAdvice
        public class ControllerHandler {
            // 缺少必选参数
            @ExceptionHandler({MissingServletRequestParameterException.class})
            @ResponseBody
            public ApiResult requestMissingServletRequest(MissingServletRequestParameterException e){
                return ApiReturn.fail(ExceptionErrorDefines.RequestMissingServletRequest, e.getMessage());
            }
        }
        未解决:抛出异常后访问404页面运行环境会报错,但是页面正常  
        

        参考:https://www.jianshu.com/p/393f70b55b1b

  3. Service层调用

    1. Service类成员注入
      • 使用@Autowired修饰符进行依赖注入
        @Autowired
        private final CardService cardService;
        
      • 用构造函数来做注入类成员(推荐使用)
        private StoreBalanceCardsRepository cardsRepository;
        public CardController(StoreBalanceCardsRepository cardsRepository) {
            this.cardsRepository = cardsRepository;
        }
        **注**:
        IntelliJ IDEA使用依赖注入会有IDE报错,但不影响实际编译运行,如需去除报错提示,需要在Dao层(Respository/Mapper)类开头添加注解 `@Repository`
        
    2. 调用
      cardService.get(id, fields);
      

七、Service层

  1. 定义Service文件
    @Service
    在类文件头部定义,标明为Service文件
    
  2. 注入Model层操作文件
    private final StoreBalanceCardsRepository storeBalanceCardsRepository;
    private final StoreBalanceCardsMapper storeBalanceCardsMapper;
    
        CardService(StoreBalanceCardsRepository storeBalanceCardsRepository, StoreBalanceCardsMapper storeBalanceCardsMapper) {
            this.storeBalanceCardsRepository = storeBalanceCardsRepository;
            this.storeBalanceCardsMapper = storeBalanceCardsMapper;
        }
    
  3. 调用Model层文件

八、Model层

  1. Repository

    • Spring中概念,概念类似于数据仓库,是Spring data jpa的实现。居于业务层和数据层之间,将两者隔离开来,在它的内部封装了数据查询和存储的逻辑。

    • Repository和传统意义上的DAO的区别:
      Repository蕴含着真正的OO概念,即一个数据仓库角色,负责所有对象的持久化管理。DAO则没有摆脱数据的影子,仍然停留在数据操作的层面上。Repository是相对对象而言,DAO则是相对数据库而言,虽然可能是同一个东西,但侧重点完全不同。

  2. Mapper

    存放Mybatis数据库关系映射方法

  3. Provider
    为Mapper层提供的SQL生成器,即将SQL的生成与映射解耦。

  4. Entity

    • 根据表结构自动生成实体类
      **注意**:
      i、多次自动生成不会覆盖,如需更新需要把旧的实体类文件删除
      
      ii、有需要的话可以在实体类文件右击generate一键生成构造函数、set get方法、@Autowired等
      
      iii、自动生成的实体类中datetime类型的字段会被转为Java的timestamp类型数据,存储的时候也使用timestamp类型即可
      
      iv、生成出来的实体类catalog = "" 报错,是IDE的报错,不影响使用
      

      参考文档:https://blog.csdn.net/chenju05244554/article/details/1009142081

  5. Hibernate 和 Mybatis 的对比

    • Hibernate优势:
      i、DAO层开发比MyBatis简单,Mybatis需要维护SQL和结果映射。
      
      ii、对对象的维护和缓存要比MyBatis好,对增删改查的对象的维护要方便。
      
      iii、数据库移植性很好,MyBatis的数据库移植性不好,不同的数据库需要写不同SQL。
      
      iv、有更好的二级缓存机制,可以使用第三方缓存。MyBatis本身提供的缓存机制不佳。
      
    • Mybatis优势:
      i、MyBatis可以进行更为细致的SQL优化,可以减少查询字段。
      
      ii、MyBatis容易掌握,而Hibernate门槛较高。
      
    • 选用Mybatis的原因
      i、Hibernate无法满足动态获取部分字段的需求,即使是使用Hibernate提供的原始SQL也无法实现
      
      ii、Hibernate的JPA查询只适用于一些简单的情况,如遇到复杂的SQL,Repository中的方法名会很长。这时候又将回到Hibernate的自定义SQL查询,即原生SQL。
      
      iii、Hibernate的维护成本比MyBatis高很多,MyBatis的SQL生成完全取决于开发者,所以SQL修改、维护、优化会比较便利。
      

九、MyBatis使用

  1. 引入MyBatis以及pagehelper分页插件
    dependencies {
        implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:1.1.1'
        implementation group: 'com.github.pagehelper', name: 'pagehelper-spring-boot-starter', version: '1.2.10'
    }
    
  2. application.properties 添加相关配置
    #mybatis设置
    mybatis.type-aliases-package=com.duomai.balance_card.Model.Mapper // 项目中Mapper存放的包
    mybatis.configuration.map-underscore-to-camel-case=true // 自动将sql字段下划线转为驼峰,可以保证取出的数据格式就是数据库中存储的格式
    logging.level.com.duomai.balance_card.Model.Mapper=DEBUG // 开启DEBUG模式,用于开发环境,记录执行的SQL。
    #pagehelper插件设置
    pagehelper.helperDialect=mysql // 指定分页插件使用哪种数据库
    pagehelper.reasonable=false // 分页合理化参数,默认值为false。当该参数设置为 true 时,pageNum<=0 时会查询第一页, pageNum>pages(超过总数时),会查询最后一页。默认false 时,直接根据参数进行查询。
    pagehelper.supportMethodsArguments=true // 支持通过 Mapper 接口参数来传递分页参数,默认值false,分页插件会从查询方法的参数值中,自动根据上面 params 配置的字段中取值,查找到合适的值时就会自动分页,设置后无需手动启用分页插件。
    pagehelper.params=pageNum=page;pageSize=limit //  自定义Mapper 接口参数来传递分页参数的参数名
    
  3. 添加Mapper类扫描的两种方式
    • 在启动类中添加对mapper包扫描@MapperScan(推荐使用)
      @SpringBootApplication
      @MapperScan("com.duomai.balance_card.Model.Mapper")
      
    • 在具体的Mapper类上面添加注解 @Mapper

  4. Mapper类SQL用例

    public interface StoreBalanceCardsMapper {
        // select用例
        @SelectProvider(type = StoreBalanceCardsSqlBuilder.class, method = "getById")
        List<Map> getById(@Param("id") int id, @Param("fields") String fields, @Param("page") int page, @Param("limit") int limit);
    
        // 列表
        @SelectProvider(type = StoreBalanceCardsSqlBuilder.class, method = "list")
        Page<Map> list(@Param("fields") String fields, @Param("page") int page, @Param("limit") int limit);
    
        // insert用例
        @InsertProvider(type = StoreBalanceCardsSqlBuilder.class, method = "add")
        @Options(useGeneratedKeys=true, keyProperty="storeBalanceCard.id", keyColumn="id")
        int add(@Param("storeBalanceCard") StoreBalanceCards storeBalanceCard);
    
        // update用例
        @UpdateProvider(type = StoreBalanceCardsSqlBuilder.class, method = "updateName")
        @Options(useGeneratedKeys=true, keyProperty="storeBalanceCard.id", keyColumn="id")
        int updateName(@Param("storeBalanceCard") StoreBalanceCards storeBalanceCard, @Param("limit") int limit);
    
        // delete用例
        @DeleteProvider(type = StoreBalanceCardsSqlBuilder.class, method = "deleteById")
        int delete(@Param("id") int id, @Param("limit") int limit);
    }
    
    • 通过Provider的方式动态获取SQL

    • 需要分页时在具体的方法后多加page、limit参数,自动实现分页

      // Service层调用
      List storeBalanceCard = storeBalanceCardsMapper.list(fields, page, limit);
      // Mapper层实现
      Page<Map> list(@Param("fields") String fields, @Param("page") int page, @Param("limit") int limit);
      
    • 分页组件仅可用于查询,不可用于更新/删除,更新/删除需要另外实现

  5. Providerle类SQL用例

    public class StoreBalanceCardsSqlBuilder {
        private static final String STORE_BALANCE_CARDS = "store_balance_cards";
        public static String getById(int id, String fields) {
            return new SQL(){ { 
                SELECT(fields);
                FROM(STORE_BALANCE_CARDS);
                WHERE("store_balance_cards.id = #{id}");
            } }.toString();
        }
    
        public static String list(String fields) {
            return new SQL(){ {
                SELECT(fields);
                FROM(STORE_BALANCE_CARDS);
            } }.toString();
        }
    
        public static String add(@Param("storeBalanceCard") StoreBalanceCards storeBalanceCard) {
            return new SQL(){ {
                INSERT_INTO(STORE_BALANCE_CARDS);
                VALUES("sys_name", "#{storeBalanceCard.sysName}");
                VALUES("store_id", "#{storeBalanceCard.storeId}");
                VALUES("entity_store_id", "#{storeBalanceCard.entityStoreId}");
                VALUES("name", "#{storeBalanceCard.name}");
                VALUES("type", "#{storeBalanceCard.type}");
                VALUES("item_id", "#{storeBalanceCard.itemId}");
                VALUES("photo", "#{storeBalanceCard.photo}");
                VALUES("state", "#{storeBalanceCard.state}");
                VALUES("upgrade", "#{storeBalanceCard.upgrade}");
                VALUES("recharge_discount", "#{storeBalanceCard.rechargeDiscount}");
                VALUES("updated_at", "#{storeBalanceCard.updatedAt}");
                VALUES("created_at", "#{storeBalanceCard.createdAt}");
            } }.toString();
        }
    
        public static String updateName(@Param("storeBalanceCard") StoreBalanceCards storeBalanceCard, @Param("limit") int limit) {
            return new SQL(){ {
                UPDATE(STORE_BALANCE_CARDS);
                SET("name=#{storeBalanceCard.name}");
                if (storeBalanceCard.getUpdatedAt() != null) {
                    SET("updated_at=#{storeBalanceCard.updatedAt}");
                }
                WHERE("store_id = #{storeBalanceCard.storeId}");
            } }.toString() + " limit " + limit;
        }
    
        public static String deleteById(int id, int limit) {
            return new SQL(){ {
                DELETE_FROM(STORE_BALANCE_CARDS);
                WHERE("store_balance_cards.id = #{id}");
            } }.toString() + " limit " + limit;
        }
    }
    
  6. 分页组件详解

十、接口输出

  1. 接口统一输出格式

    state: // 状态位
    msg: // 接口输出提示信息
    data: { // 总的接口输出数据,可以为空
        data: // 接口数据
        other: // 其他返回数据,如total/page_total等
        ...:
    }
    
  2. 正常接口输出/异常监听输出统一使用OutPut/ApiReturn方法

十一、异常处理

  1. 项目运行异常统一监听
    现有:ControllrHandler 路由异常监听、SqlHandler 数据库操作异常监听
    需要持续添加
  2. 统一使用接口输出类进行返回,杜绝直接返回报错信息。

十二、单元测试

  1. 添加单元测试类

    src\test\java\com\duomai\balance_card\BalanceCardApplicationTests.java

  2. demo

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @AutoConfigureMockMvc
    public class BalanceCardApplicationTests {
    
        @Autowired
        private MockMvc mockMvc;
    
        @Test
        @SuppressWarnings("unchecked")
        public void getCard() throws Exception {
            String card_id = "1";
            String fields = "store_id,item_id,recharge_discount,photo,type,name";
            String res = this.mockMvc.perform(get("/card/get")
                    .param("id", card_id).param("fields", fields)
                    )
                    .andDo(print())
                    .andExpect(status().isOk())
                    .andReturn()
                    .getResponse()
                    .getContentAsString();
           // 接口返回不为空
           assertThat(res).isNotNull();
           // 校验接口返回格式是否完整
           Map<String, Object> api_res = Common.jsonToMap(res);
           assertThat(api_res).isNotNull();
           assertThat(api_res).hasSize(3);
           assertThat(api_res).containsKeys("state", "msg", "data");
           // 校验接口返回state是否正确
           int state = (int) api_res.get("state");
           Map<String, Object> api_data = (HashMap<String, Object>) api_res.get("data");
           assertThat(state).isEqualTo(ApiReturnDefines.SUCCESS);
           // 校验data数据是否正确
           assertThat(api_data).containsOnlyKeys("data");
           ArrayList<Map> data = (ArrayList) api_data.get("data");
           String[] fieldsAll = fields.split(",");
           assertThat(data).hasSize(1);
           assertThat(data.get(0)).containsKeys((Object[]) fieldsAll);
        }
    }
    
  3. 详解
    • 使用 @RunWith(SpringRunner.class)@SpringBootTest 定义测试类
    • 添加注解 @AutoConfigureMockMvc ,使用 Spring MockMvc 模拟Spring的HTTP请求并将其交给控制器,实际上并没有真正地启动服务器,仅仅是Mock,节省了启动服务器的开销。
    • 使用 AssertJ 库类来验证接口返回内容
      在demo中,使用断言验证了接口返回格式、state状态、data数据格式等基本内容。

    参考文档:https://spring.io/guides/gs/testing-web/

十三、项目打包

  1. 不同的构建文件
  • 普通 jar 包 : 会将源码编译后以工具包(即将class打成jar包)的形式对外提供,此时,你的 jar 包不一定要是可执行的,只要能通过编译,可以被别的项目以 import 的方式调用。
  • 可执行 jar 包 : 能通过 java -jar 的命令运行。
  • 普通 war 包 : war 是一个 web 模块,其中包括 WEB-INF,是可以直接运行的 WEB 模块。做好一个 web 应用后,打成包部署到容器中。
    • 可执行 war 包 : 普通 war 包 + 内嵌容器 。
  1. 构建可执行 jar 包

    • IDE打包
      右侧Gradle -> Tasks -> build -> build
    • 命令行打包
      进入项目根目录,执行gradle build
  2. 运行可执行 jar 包
    java -jar build/libs/balance_card-0.0.1.jar
    运行时可带参数,同application.properties中的参数名
    例:

    java -Djava.security.egd=file:/dev/./urandom -jar /opt/ci123/www/html/java/balance_card/build/libs/${jarName}.jar --server.port=80
    

十四、容器化部署

  1. 创建docker镜像
  • Dockfile 编写

    FROM java:8
    MAINTAINER duomai
    # 设置镜像源
    COPY sources.list /etc/apt/sources.list
    # 安装扩展
    RUN apt-get update && apt-get install -y \
    wget \
    curl \
    vim \
    git \
    less
    # 配置系统时间
    RUN /bin/cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
    && echo 'Asia/Shanghai' > /etc/timezone
    VOLUME /tmp
    # 设置工作目录
    ADD . /opt/ci123/www/html/java
    WORKDIR /opt/ci123/www/html/java
    # 指定输出端口
    EXPOSE 80

    • 快速构建镜像

      sh docker/build.sh [version]

  1. 环境变量配置
  • 配置

    #数据库连接配置 application.properties
    spring.datasource.url=jdbc:mysql://${APP_DB_HOST:192.168.0.235}:${APP_DB_PORT:3355}/${APP_DB_DATABASE:shop_balances}
    spring.datasource.username=${APP_DB_USER:cishop}
    spring.datasource.password=${APP_DB_PASSWORD:fuyuan1906}

    1. 启动容器( docker/run.sh )
  • 启动参数

    -p // 宿主机运行端口
    -d // 宿主机项目地址,缺省为执行run.sh的上一级目录
    -n // 容器名称
    -v // 镜像版本号
    -j // jar包版本,缺省为1.0

  • 环境变量

    # 运行配置
    APP_SERVER_PORT // 容器运行jar包的端口
    # 数据库配置
    APP_DB_HOST
    APP_DB_PORT
    APP_DB_DATABASE
    APP_DB_USER
    APP_DB_PASSWORD
    # 日志配置
    APP_LOG_PATH // 日志输出地址
    APP_LOG_SPRING_WEB_LEVEL // 对应logging.level.org.springframework.web,指org.springframework.web这个包下的日志输出等级,默认为ERROR,开发环境可配置为DEBUG
    APP_LOG_MYBATIS_LEVEL // 对应logging.level.com.duomai.balance_card.Model.Mapper,指balance_card.Model.Mapper包下的日志输出等级,默认为ERROR,开发环境可配置为DEBUG,开启DEBUG之后可在日志中查看DB操作

  • 启动容器

    docker run -d \
    -e APP_SERVER_PORT=8080 \
    -e APP_DB_HOST=192.168.0.235 \
    -e APP_DB_PORT=3355 \
    -e APP_DB_DATABASE=shop_balances \
    -e APP_DB_USER=cishop \
    -e APP_DB_PASSWORD=fuyuan1906 \
    -e APP_LOG_PATH=/opt/ci123/www/html/java/balance_card/log \
    -e APP_LOG_SPRING_WEB_LEVEL=DEBUG \
    -e APP_LOG_MYBATIS_LEVEL=DEBUG \
    -p $port:80 \
    -v $dir:/opt/ci123/www/html/java/balance_card \
    --restart=always \
    --name $name \
    harbor.oneitfarm.com/duomai/java-balance_card:$version \
    sh /opt/ci123/www/html/java/balance_card/docker/start.sh -j $jarName

RabbitMq 镜像队列集群搭建

建议食用本文前请先阅读【RabbitMq 普通集群搭建】

一、镜像队列集群概念

​ 镜像队列是基于普通集群模式的扩展,普通集群模式下如果某一个节点宕机,该节点下的队列操作将完全失效。而在镜像队列模式下,队列的数据将被复制到所有节点(或者配置过的节点)中,从而保证了一个节点宕机,其余节点也可以正常消费此消息。但此模式下也必然会带来性能下降、内存/磁盘消耗增加、网络IO负担增加等问题,所以镜像队列适用于对高可用要求比较高的系统。

  1. 每个镜像队列由一个主队列和一个或多个镜像组成。每个镜像队列都有自己的主节点,主队列存放于主节点上。对队列产生的操作将首先应用于队列的主节点,然后传播到镜像节点。包括发布队列、向消费者传递消息、跟踪来自消费者的确认等行为。
  2. 发布到集群中的消息将被复制到所有的镜像队列中,消费者连接任意节点消费队列实际上都将被连接至主队列的节点上。如果主队列已经确认消费了消息,则其余镜像队列中的消息将被丢弃。
  3. 如果主队列所在的节点发生异常,默认情况下最“老”的镜像队列将被选举为主队列,当然也可以制定不同的选举策略。

二、配置镜像队列

  1. 将队列配置成镜像队列需要通过创建policy来实现。policy包含策略键ha-mode和其对应的键值ha-params(可选)组成。
  • exactly模式
    • ha-paramscount,表示队列的总数量。
    • count表示主队列+镜像队列的总数量,如果count为1,则表示只存在于主队列。如果 count为2则表示存在主队列和一个镜像队列,以此类推。如果count值大于集群中节点的总数则表示所有节点都将同步一份镜像队列。如果某个镜像队列的节点宕机,则会寻找一个剩余未同步的节点来同步镜像队列。
  • all模式
    • 不需要ha-params
    • 此模式下所有节点都将同步镜像队列,如果有新节点加入,则新节点也会进行同步。官方建议同步镜像队列的节点数为N/2 + 1,其中N表示节点总数。同步到所有节点会增加所有集群节点的负载,包括网络I/O、磁盘I/O和磁盘空间的使用等。
  • nodes模式
    • ha-paramsnode names,节点的名称。
    • 此模式下将在指定的节点上同步镜像队列,如果声明队列时其余节点均不在新,则只会在声明连接的那个节点上创建队列。
  1. 配置policy
  • rabbitmqctl命令配置

    set_policy [-p vhost] [--priority priority] [--apply-to apply-to] name pattern definition

    • name:策略名称
    • pattern:策略匹配符,正则表达式。当与给定资源匹配时,将应用该策略。
    • definition:策略内容定义,JSON字符串。
    • priority:策略的优先级,整数。数字越大,优先级越高。默认值为0。
    • apply-to:策略应用的对象,支持queues,exchanges,all,默认值为all
      // exactly模式,匹配前缀为two的资源
      rabbitmqctl set_policy -p cluster ha-two ^two. '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
      
      // all模式,匹配所有前缀
      rabbitmqctl set_policy -p cluster ha-all ^ '{"ha-mode":"all"}'
      
      // nodes模式,匹配所有前缀
      rabbitmqctl set_policy -p cluster ha-nodes ^ '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
      
  • management管理后台配置

    Admin -> Policies -> Add / update a policy

  1. 新镜像队列同步设置
  • ha-sync-mode:manual

    默认模式,新队列镜像将不同步现有消息,只接收新消息。当消费者消费完所有仅存在于主队列上的消息后,新的队列镜像将随着时间的推移成为和主节点队列相同的精确镜像队列。

  • ha-sync-mode: automatic

    当新队列镜像加入时,队列将自动同步所有消息。

三、额外说明

  1. 独占队列不会被复制为镜像队列。
  2. 主节点队列失效后,其中一个镜像队列将被选举为主队列并带来如下影响:
    1. 与主机点连接的客户端将全部断开。
    2. 运行时间最长的镜像队列将被选举为主队列,如果镜像队列尚未开始同步,则队列上的消息将丢失。
    3. 新的主队列会认为之前所有的消费者的连接都已经断开,它将重新发送旧队列中没有收到ack的消息。这可能出现客户端已经发送过ack, 但是服务端在接收到之前就已宕机的情况,从而导致发送两遍相同的消息,因此所有未确认的消息都将使用redelivered标志重新发送。
    4. 如果消费者连着的是镜像队列节点,并且消费者在启动时设置了x-cancel-on-ha-failover参数,则消费者将收到一个服务端消费取消的通知,如果未设置此参数,则消费者将无法感知主节点已宕机。
    5. 如果使用自动ack机制则消息将丢失。
  3. 如果停止了某个包含主节点队列的节点,则其他节点的镜像队列将被选举为主节点队列。在重新启动此节点后,该节点只会被当做是一个新加入集群的节点,不会重新成为主节点队列。
  4. 在主节点队列宕机并且其他镜像队列尚未同步的极端情况下,rabbitMq集群将拒绝任何镜像队列选举为主队列,整个队将不可用且被关闭。如果在镜像队列尚未同步的情况下也需要将某个镜像队列选举为主队列,需要配置policyha-promote-on-shutdownalways(默认为when-synced),并且ha-promote-on-failure不可配置为hen-synced(默认值为always)。

四、集群测试

  1. 启动节点、创建用户、vhostexchangequeue,启动消费者。(快速启动,参考【RabbitMq 使用docker搭建集群】篇)

    1. 节点:
      1. 节点一:rabbit@clusterRabbit1
      2. 节点二:rabbit@clusterRabbit2
    2. 用户:api_managementadministrator标签,开放虚拟主机cluster所有权限)

    3. 策略:

      rabbitmqctl set_policy -p cluster ha-all ^ '{"ha-mode":"all"}'
      
    4. vhostcluster

    5. exchangecluster(直连交换机)

    6. queue

      1. 节点一 rabbit@clusterRabbit1
        1. 队列一:
        • nameclusterRabbit1Queue1
        • routing_keyclusterRabbit1key
        1. 队列二:
        • nameclusterRabbit1Queue2
        • routing_keyclusterRabbitCommonKey
      2. 节点二 rabbit@clusterRabbit2
        1. 队列三:
        • nameclusterRabbit2Queue1
        • routing_keyclusterRabbit2key
        1. 队列四:
        • nameclusterRabbit2Queue2
        • routing_keyclusterRabbitCommonKey(与队列二 相同)
    7. consumer
      1. 消费者一:
      • 连接节点:节点一 rabbit@clusterRabbit1
      • 消费队列:队列一 clusterRabbit1Queue1
        1. 消费者二:
      • 连接节点:节点一 rabbit@clusterRabbit1
      • 消费队列:队列二 clusterRabbit1Queue2
        1. 消费者三:
      • 连接节点:节点二 rabbit@clusterRabbit2
      • 消费队列:队列三 clusterRabbit2Queue1
        1. 消费者四(非此节点的队列):
      • 连接节点:节点一 rabbit@clusterRabbit1
      • 消费队列:队列三 clusterRabbit2Queue1
        1. 消费者五:
      • 连接节点:节点二 rabbit@clusterRabbit2
      • 消费队列:队列四 clusterRabbit2Queue2
  2. 连接节点二 rabbit@clusterRabbit2,再次声明队列名和队列一同名的队列clusterRabbit1Queue1。【同名队列再次声明】

  • 结果:声明不成功,节点二中没有生成新的clusterRabbit1Queue1队列,而节点一中clusterRabbit1Queue1队列多出了新绑定的routing_key:clusterRabbit2key,这意味着在同一集群中不同节点之间的队列名是唯一的,在一个节点中可以操作另一个节点的队列数据。
  1. 生产者连接节点一 rabbit@clusterRabbit1,发布clusterRabbit1key消息。【发布此节点队列消息】
  • 消费者一 成功接收到消息。
  1. 生产者连接节点二 rabbit@clusterRabbi2,发布clusterRabbit1key消息。【发布非此节点队列消息】
  • 消费者一 成功接收到消息。
  1. 生产者连接节点一 rabbit@clusterRabbit1,发布clusterRabbitCommonKey消息。【多个节点队列绑定相同消息】
  • 消费者二 成功接收到消息。
  • 消费者五 成功接收到消息。
  1. 生产者连接节点一 rabbit@clusterRabbit1,发布clusterRabbit2key消息。【消费非此节点队列消息】
  • 消费三、四 轮询接收到消息。
  1. 关闭节点二 rabbit@clusterRabbit2,连接节点一 rabbit@clusterRabbit1,发布clusterRabbit2key消息【投递消息给集群中意外退出的节点】
  • 连接节点二 rabbit@clusterRabbit2的消费者全部断开,消费者四并未断开。
  • 节点一rabbit@clusterRabbit1 晋升成为队列三、队列四的主节点。
  • 消费者四 可以继续成功接收到消息。
  • 再次启动节点二rabbit@clusterRabbit2,没有再次成为队列三、队列四的主节点,只是复制了队列三、队列四的镜像队列。
  1. 修改消费者四,在ack之前sleep(20),发布clusterRabbit2key消息,并立即关闭节点二 rabbit@clusterRabbit2
  • 和上面一样,消费者四并未断开。
  • 消费者收到两条相同的消息,说明节点二在收到 ack之前宕机后,此消息会被当做未消费的消息重新放入队列后消费。

使用pm2增删改常驻进程脚本

业务中使用pm2来管理MQ的消费者(需常驻),使用pm2有个好处就是,如果进程意外退出了,pm2可以将它自动重启。下面介绍常见的使用方法:新增、删除、修改。

一、新增一个常驻进程

1.1 先创建一个json配置文件,比如:


{ /** * docs: http://pm2.keymetrics.io/docs/usage/application-declaration/#attributes-available */ name: "cront/Trade/deliveyNotice", args: "cront/Trade/deliveyNotice", script: "/opt/ci123/www/html/api_shop/webroot/index.php", exec_interpreter: "/opt/ci123/php/bin/php", exec_mode: "fork", max_memory_restart: "100M", out_file: "/tmp/api_shop/Delivery.log", error_file: "/tmp/api_shop/Delivery.log", instances: 1 }

我们使用代码位置(cront/Trade/deliveyNotice)作为name,方便管理。args是在执行script时加在其后面的参数。

1.2 启动脚本


pm2 start /path/to/your/config.json

start后接上一步写的json配置文件。

二、删除常驻进程

2.1 首先找到我们要删除/关闭的进程在pm2里的id是多少。


// e.g. pm2 list|grep cront/Trade/deliveyNotice pm2 list|grep xxx

pm2 list可以查看所有pm2管理的进程,我们根据关键字把需要删除的进程筛选出来,找到id。

2.2 从pm2中删除进程


pm2 delete id

id是上一步找到的进程id(进程在pm2中的id,不是pid)。

三、修改常驻进程,并重启

pm2有restart和reload命令。

一般认为restart的意思是先关闭旧进程,再重启一个新的进程。
而reload的意思是先重启一个新的进程,再关闭旧进程。


pm2 restart /path/to/your/config.json #或者 pm2 reload /path/to/your/config.json

restart和reload都不能真正实现进程的平滑重启。只有将消费进程改造后,才可以实现平滑重启。

四、其他

4.1 pm2也支持将所有config写在一个json文件里;

4.2 使用pm2管理常驻进程的初衷,是想借助pm2的进程重启功能;其他一些工具比如supervisor也有类似功能。

4.3 pm2管理非node进程时,只能使用fork模式,不能使用cluster模式;

4.4 官方对配置文件的说明: Process File

RabbitMQ-匿名队列

匿名队列

创建RabbitMQ队列的时候,如果没有指定队列名,系统会自动创建一个随机字符串作为队列名,比如:

这是RabbitMQ规定的。匿名队列不是没有名字,而是名字是由系统自动、随机生成的。

匿名队列 vs 具名队列

匿名队列的好处是方便,但是队列名在RabbitMQ里有它的独特用处。当多个Consumer同时订阅一个队列上的同一种消息(比如:Direct交换机,Consumer的binding_key相同),RabbitMQ会在这些Consumer之间轮流投递消息。

总结起来,各自优缺点如下:

  1. 需要消息持久化时,队列名必不可少(虽然匿名队列本质上也有名字,但是很难记对不?故障恢复的时候怎么办呢?);
  2. 匿名队列更适合用在临时环境,一般匿名+队列自动删除同时使用,解决临时队列使用后的维护问题;
  3. 具名队列在迁移Consumer时更具优势,可以先在新服务器上运行Consumer,再关闭旧服务器上的Consumer;
  4. 匿名队列不需要取名字,写法上要简洁写。

总体上,如果业务里要使用队列,建议首先考虑具名队列。如果队列太多怎么办呢?建议使用消息总线,免去维护的烦恼。

RabbitMQ入门

学习东西有三个境界:了解、熟练使用、理解原理。大神一般都是第四个境界:根据原理造轮子。对于RabbitMQ,我目前大概在第二个境界。以下介绍均按照自己理解来,个人不保证完全正确,而且也认为肯定有些不准或错误的地方,建议仅作参考,欢迎指正。

一、RabbitMQ是一款开源企业级消息代理软件,实现了AMQP协议(和MQTT协议)。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议。
MQTT,即Message Queuing Telemetry Transport,是一种轻量级的、灵活的网络协议,常用于物联网。
官网:https://www.rabbitmq.com/
文档:https://www.rabbitmq.com/documentation.html

二、主要概念

  1. Virtual Host:
    你可以参照windows概念:一机多用户。为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。Virtual Host是隔离的最高级别。

  2. Exchange: 交换机
    交换机用来发送AMQP实体消息。接收生产者的消息,将消息路由到[0, n](n>0,整数)个队列。消息内容可以是文本,也可以是二进制数据。不同类型的交换机有不同的消息路由策略。每个队列创建的时候,必须指定一个Exchange,这个Exchange是下面描述中的某一种: (以下中文名可能和网上的不一样)

    2.1 Direct exchange: 直连交换机。
    根据消息携带的路由键(routing key)将消息投递给对应队列(完全匹配),可以发送给多个队列(消息复制)。

    2.2 Fanout exchange: 扇形交换机。
    将消息发给绑定到该交换机的所有队列,忽略(队列绑定到exchange时绑定的)路由键。可以处理广播消息。

    2.3 Topic exchange: 主题交换机。

    a. 根据消息携带的路由键,将消息发送给相应队列。和Direct不同的是,主题交换机的匹配规则可以是模糊匹配。
    
    b. "#"(井号)表示任意数量(0个or多个)单词。当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
    
    c. "*"(星号)表示一个单词。当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
    

    2.4 Headers exchange: 消息头交换机/首部交换机。

    a. 和http请求一样,每个RabbitMQ的消息也有消息头,消息头内部有很多键值对,这些键值对可以自己定义。
    
    b. 头交换机使用一个或多个消息头(x-match可以为any或all)来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
    
    c. x-match为any时,任意一个消息头符合即可。相应地,all表示必须满足所有定义的消息头。
    
  3. queue: 队列
    队列需要连接到某具体的交换机(在代码中, channel创建时需要指定交换机,queue再通过channel发送/接收消息,channel是建立在tcp连接上的可复用逻辑通道,避免频繁的tcp连接)。

  4. channel: 通道
    4.1 见queue中的描述。
    4.2 channel可以减少tcp频繁创建、断开的开销。
    4.3 binding_key: 绑定
    4.4 在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。
    4.5 routing_key在某些交换机下会被忽略,相应地binding_key也会被忽略(binding_key的作用就是和routing_key做匹配)。
    4.6 可以理解成binding_key就是生产者指定的"routing_key"。

  5. 消息消费模式
    5.1 订阅-发布
    5.2 消费者主动拉取

  6. 部分特性
    6.1 消息持久化。消息持久化需要以下指标同时持久化:

    a. 交换机持久化
    
    b. 队列持久化
    
    c. 消息设置持久化(Delivery Mode => 2)
    
    d. 如果消息、队列设置了过期时间,过期后的消息会被发送到“死信队列”(如果设置了的话),否则丢弃。
    
    e. 注意上面一条,持久化的消息也可能因为设置过期时间而被丢弃。
    

    6.2 过期时间说明:Message的TTL("expiration")是从数据到达Queue中后开始计时的,而不是Message被创建时计时。Queue的TTL("x-expires")定义是Queue被自动删除前可以处于未使用状态的时间。如果你设置了Queue为自动删除,那么"x-expires"这个参数就有效。

    官方解释:
    [1] Auto-delete:queue that has had at least one consumer is deleted when last consumer unsubscribes

    [2] Expiry time can be set for a given queue by setting the x-expires argument to queue.declare, or by setting the expires policy. This controls for how long a queue can be unused before it is automatically deleted. Unused means the queue has no consumers

    一条消息什么时候过期是由两个因素决定的:Message中的"expiration",以及Queue中的"x-message-ttl",他们都是一种消息过期策略,消息具体过期时间以 min("x-expires", "x-message-ttl")为准。

    6.3 消费者ack机制:

    a. 在确认消息已被获取后,才删除消息;
    
    b. 自动确认ack机制: 如果消息不重要,自动ack的消息可以增加消息的消费速度。
    

    6.4 生产者confirm机制:

    a. 确认消息成功投递(到队列)后,执行回调函数;
    
    b. 不论是持久化,还是生产者confirm,对RabbitMQ吞吐量都有一定影响;
    

下单接口优化

背景

对于比较关键的下单接口,性能直接影响到商户的使用体验。目前,我们的下单接口大部分耗时都在4s-5s之间,少数耗时在2-3s。具体耗时多少,也和实际覆盖到的流程有关,不同分支流程,需要处理的业务逻辑往往不一样,调用接口数量也不一样。

思路

4s-5s耗时算比较多的,考虑到有些商户网络比较差,客户端从下单到接收到返回数据的实际耗时会更多,实际体验也会更差。

缩减耗时,可以从mysql优化入手,但是我们的sql已经挺快了(我的意思是,它的优化空间并不是很大,继续优化的难度会越来越大),网络耗时的优化也难以有很明显的效果。

从ztrace的监控面板上可以看到,几乎所有请求都是串联的,只要改为并发,就可以大大减小接口总耗时。这么多的接口,只要拿5、6个做并发,就可以节省接近1s的时间,所有优化空间很大。

特别是一些诸如“获取用户信息”、“获取商户信息”、“获取商品信息”之类的接口,它们对执行顺序没有特别要求,完全可以在接口开始时就做并发。

调用链跟踪系统的一个巨大优点就是直观,每个部分的耗时都可以显示出来,并且进行对比。

操刀

PHP有对并发访问的支持:curl_multi_init,但是要用到我们项目里,要尽量减小对原流程的干涉,所以做了一些独特的设计。

cURL批处理的用法,是先初始化一个批处理句柄:curl_multi_init,然后通过curl_multi_add_handle把普通的curl句柄加入批处理句柄,再统一执行(并发)。

下面是一个例子,来自:php.net

<?php
// 创建一对cURL资源
$ch1 = curl_init();
$ch2 = curl_init();

// 设置URL和相应的选项
curl_setopt($ch1, CURLOPT_URL, "http://www.example.com/");
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch2, CURLOPT_URL, "http://www.php.net/");
curl_setopt($ch2, CURLOPT_HEADER, 0);

// 创建批处理cURL句柄
$mh = curl_multi_init();

// 增加2个句柄
curl_multi_add_handle($mh,$ch1);
curl_multi_add_handle($mh,$ch2);

$running=null;
// 执行批处理句柄
do {
    usleep(10000);
    curl_multi_exec($mh,$running);
} while ($running > 0);

// 关闭全部句柄
curl_multi_remove_handle($mh, $ch1);
curl_multi_remove_handle($mh, $ch2);
curl_multi_close($mh);

?>

我直接找了一个相关库,并把它clone到了公司github上,保留了授权(感谢Apache License,感谢作者,附上原地址:jmathai/php-multi-curl,内部项目地址:shop/php-multi-curl

为了少改业务流程,我设计了一个方法:preExec,它的参数和execApi一样。
它的作用是生成请求的curl句柄,并将其加入到批处理(还没执行),然后计算请求的hash,设置$ci->preExec[$request_hash](可以通过它去获取这次请求的结果)。

如何计算请求hash:请求hash的计算元素,包括请求url、请求params、请求method。

function requestHash($url, $p, $method)
{
    $url = strtolower(trim(trim($url), '&?/\\'));
    $phash = $this->arrayHash($p);
    $method = strtolower(trim($method)) == 'post' ? 'post' : 'get';
    return md5($url . $phash . $method);
}
function arrayHash($p, $filter = array('mdstr', 'expire'))
{
    $para_sort = array();
    foreach ($p as $k => $v) {
        if ($filter && in_array($v, $filter, true)) continue;
        // array object bool int float null resource...
        $para_sort[$k] = is_array($v) ? $this->arrayHash($v)
            : (is_object($v) ? $this->arrayHash((array)$v) : strval($v));
    }
    ksort($para_sort);
    return md5(http_build_query($para_sort));
}

当我们去主动获取请求结果时,如果当前请求已经执行完成,会立即返回请求结果,其他还没执行完的请求继续执行,不堵塞业务流程。如果当前请求没执行完,会堵塞直到当前请求执行结束。

执行execApi时,先判断是否有设置$ci->preExec[$request_hash],如果没设置,就是一次普通的curl访问,和以前的流程一样(单次请求,堵塞直到获取到结果);如果有设置,那就执行curl批处理(所有的并发请求都在这时开始执行。如果已经执行过了就不再执行),并通过$ci->preExec[$request_hash]去获取当前请求的结果。获取到结果后,会将$ci->preExec[$request_hash]删除。

所以,只要我们在流程开始前执行preExec,那执行到execApi的时候就会自动做并发,并获取结果。

在流程1和流程2,execApi分别执行普通curl访问、并发curl访问。

DEBUG流程

下面讲一些遇到的坑。

开始做并发时很顺利,也成功实现了并发。但是当我把它和调用链跟踪结合起来的时候,一大堆问题就来了。

为了准确跟踪接口调用时间,我们需要记录每个curl的执行开始时刻,和执行过程的耗时。

每创建一个curl句柄后,我们都会向curl批处理会话中添加这个单独的curl句柄,但是这个时候curl是没开始执行的。如果我们把创建curl的时刻作为url开始访问的时刻,记录到的接口生命周期会比实际的长很多(提前记录了)。

这时我想到了通过记录curl结束时间和curl耗时,反推开始时间。而curl有个回调函数,可以在接收到数据的时候执行:CURLOPT_WRITEFUNCTION(有没有curl开始执行的回调函数呢?很遗憾没有!)

于是,我使用了CURLOPT_WRITEFUNCTION来记录curl调用的结束时刻。但是在235上,设置CURLOPT_WRITEFUNCTION后总是得不到返回数据(curl_exec总是返回TRUE),我发现CURLOPT_RETURNTRANSFER设置失效了!

** 235上的libcurl版本是:7.19.7**

CURLOPT_WRITEFUNCTION 回调函数名。该函数应接受两个参数。第一个是 cURL resource;第二个是要写入的数据字符串。数 据必须在函数中被保存。 函数必须准确返回写入数据的字节数,否则传输会被一个错误所中 断。

CURLOPT_RETURNTRANSFER TRUE 将curl_exec()获取的信息以字符串返回,而不是直接输出。

一顿折腾,在Stack Overflow上找到一点提示 ready go

When I placed that line before any of the other options, it was simply ignored.

我尝试在设置CURLOPT_WRITEFUNCTION后再次设置CURLOPT_RETURNTRANSFER,这次跑通了。

BUT!我自己的电脑上又出问题了,设置了CURLOPT_WRITEFUNCTIONCURLOPT_RETURNTRANSFER就失效,CURLOPT_RETURNTRANSFER生效的时候,就不执行CURLOPT_WRITEFUNCTION。在需要二选一的时候,我选择了放弃。

** 我电脑上的libcurl版本是:7.55.0**

我放弃了CURLOPT_WRITEFUNCTION。找到了一个替代回调函数CURLOPT_HEADERFUNCTION,这个函数会在接收到返回的header信息时调用。虽然它不能精确表示结束时间,但是马马虎虎也能用。最重要的是,这个函数始终都会被调用。

CURLOPT_HEADERFUNCTION 设置一个回调函数,这个函数有两个参数,第一个是cURL的资源句柄,第二个是输出的 header 数据。header数据的输出必须依赖这个函数,返回已写入的数据大小。

这时,调用链跟踪和并发成功结合起来了,监控的数据基本正确,也能正常运行。

如果看到ztrace上面,一个并发前只需要100ms+的接口,在并发后要800ms+,请相信我,curl_getinfo($ch, CURLINFO_TOTAL_TIME)返回的就是这个值,不是程序有bug。具体原因期待大神解释。

调试过程中还遇到一个libcurl的bug,这个bug只在 7.55.0 上有(没错,我电脑上的就是这个版本:broken_heart:)。这个bug就是curl_getinfo($ch, CURLINFO_TOTAL_TIME)会在某些时候返回一个超大的浮点数,比如4295.xx。而经过我观察,只需要减掉4295就行了(究竟对不对,就看我猜的对不对了:pray:)

结语

好了,上面是修改ApiShopClient的整个过程,主要时间花在了适配调用链跟踪上面,从这个debug流程也能看到,php在涉及到底层的时候有点力不从心(大神忽略掉这句)。

调用链跟踪系统-介绍

背景资料

Dapper,大规模分布式系统的跟踪系统

Zipkin

如果要查看zipkin资料,官网的英文文档是最正确的,讲解也很系统;

在阅读文章前,建议先至少看一下Dapper的翻译;

一、介绍

调用链跟踪系统进行了比较大的升级,主要改变有:

  1. 对接zipkin日志标准(仍保留了些许不同);
  2. 使用zipkin的数据库和界面;
  3. 大幅降低对正常业务流程的性能影响;
  4. 将数据压缩后再传递给收集器;
  5. rabbitmq使用单独的数据通道,不再和业务共用一个;
  6. 采用文件日志和mysql记录结合的方式,分别记录调用链的业务数据和性能数据。 业务数据100%记录,性能数据采样记录(5%);
  7. 支持业务数据的上下文透明传输

下面详细讲解系统的构成、设计考量、使用和特点。

二、结构

分布式跟踪系统整体结构

浅绿色的是数据最终落地的地方。

名词解释:临时存储

临时存储究竟是干嘛的呢?在zipkin里,一个Span被分割成两部分,一个是Client/Producer,另一个是Server/Consumer。Span的含义更接近于一次RPC,它横跨了两个服务节点,所以在两个节点分别上报信息,最后再合并成一个Span。

后端收到两部分Span的时间不一样,临时存储就用来存储先到达的Span,等另一半到达后再取出来做合并。

名词解释:收集器

从MQ接收上报的Span,做Span的合并,并存储到持久存储(mysql)

名词解释:压缩

压缩成二进制,再通过MQ传递。实际应用中,是先将Span转换为数组,将key替换成a-z的单字母,再转成json后使用zlib压缩(需要zlib扩展)。

和压缩相关的还有一个叫 msgpack 的东西,但是它需要单独安装PHP扩展,所以放弃了。

三、设计考量

考量一:降低对业务的影响

只在少数地方嵌入跟踪代码。实际操作中,在项目入口(index.php)、rpc请求、输出这三个地方做了埋点。

耗时操作在请求以外执行。我们知道PHP没有异步,但是PHP有个非常有用的函数register_shutdown_function,我们把耗时操作全放里面,这样可以尽量减少对线上业务的影响。如果是fastcgi模式,我们会优先使用fastcgi_finish_request这个函数。http://php.net/manual/zh/function.register-shutdown-function.php#108212

考量二:开关/采样率/消息生产速率控制

一共设计了两个开关(业务日志开关、Span上报开关),默认情况下这两个开关都是开启的。在此基础上,增加了采样率控制,仅当Span上报开关开启,且当前Span需要采样时,Span才会通过MQ传递给后端。业务日志开关控制biz-ztrace日志的记录。

需要注意的时,即便把开关全部关闭,traceid也会生成、传递,但不会有日志记录,也不会有Span上报。

对于消息速率控制,在实际中采用的是“过量丢弃”,阈值是10000。只要队列里的消息超过这个值,即不再接收消息,直接抛弃。

考量三:动态增加消费能力

随着系统扩大,采集的数据增多,动态增强数据的消费能力非常重要。目前的做法是通过加开消费者来实现。之所以能直接增加消费者,是因为它们使用的同一个临时存储,我们只是把消息的消费瓶颈,转移到了临时存储的IO瓶颈上。

想象一下,我们有两个消费者A、B,分别使用两个不同的临时存储,Client端的Span被A接收到,Server端的Span被B接收到,由于它们的临时存储隔开的,A、B都无法做Span的合并!

几乎所有的调用链跟踪系统实现方案,都绕不开这个问题,可能是一个Span被分发到不同处理实例,也可能是同一Trace的不同Span被分发到不同处理实例——这都会带来很多麻烦。

解决方法也很多,比如根据traceid做hash,再转发给不同的后端实例,这样同一个trace下的Span都会被,且只会被转发给同一处理实例。

但是,仅仅做hash是不够的,还必须借助“一致性Hash”算法。下面的例子解释了原因。

假设已经有一个处理实例CA,spanA、SpanB都属于同一个trace。CA已经接收到spanA了,这时我们新增了一个CB(和CA使用不同临时存储),如果hash结果表明spanB应该转发给CB(这完全可能发生),那又会出现CA、CB都无法合并这个Span的情况。

关于一致性Hash,我们后面再讲。

考量四:业务日志、性能日志分开

之所以要分开,是因为业务日志很多(随着采集端的增加,数据增长更多),我们还没处理过这么多的数据。

目前,业务日志记录了$_GET、$_POST、$_SERVER和apiReturn,收集端有seller和api。每天产生的日志共2.5G左右,每月日志总量大概在25G。日志总量和采集端数量呈正相关。

四、使用

下面的例子都有判断$trace_open;,并且使用了 try {} catch () {},这样做是为了减少可能出现的异常,必须要添加。

记录业务日志

业务日志是记录在文件里的,用的最多。

过期数据处理

我们把太久远的数据称为“过期数据”,因为它们被再次使用的概率很低。我们用的最多的是近期的数据,近期数据才是最重要的,业务日志和性能统计数据都适用这个道理。

我们处理过期数据的方式很简单——打包压缩、删除,这是目前能找到的最划算的处理方法。具体处理逻辑:

文件日志:保留一个月的压缩日志,保留一周内的原始日志。
数据库:保留两个月的数据。

global $trace_open;
if ($trace_open) {
    try {
        $tracer = \Tricolor\ZTracker\Core\GlobalTracer::tracer();
        $tracer->log('get', $_GET);
    } catch (\Exception $e) {}
}

记录跟踪日志

跟踪日志会出现在ui里(如果被采样了的话)

global $trace_open;
if ($trace_open) {
    try {
        $tracer = \Tricolor\ZTracker\Core\GlobalTracer::tracer();
        $tracer->currentSpan()->putTag('store_id', (int)$_POST['store_id']);
    } catch (\Exception $e) {}
}

透明传输业务字段(试验性质)

在调用链上游设置一个值,在调用链下游可以获取到。

之所以说是试验性质,是因为设计里有,也有实现,但实际业务里还没使用过。

global $trace_open;
if ($trace_open) {
    try {
        $tracer = \Tricolor\ZTracker\Core\GlobalTracer::tracer();
        $tracer->currentContext()->set('from', 'mamagou');
        // get
        $tracer->currentContext()->get('from');
    } catch (\Exception $e) {}
}

五、特点

  1. 和zipkin的Span比起来,我们的Span结构略有不同。zipkin里的Sampled字段,在我们这里对应Decision字段,Decision字段除了记录是否采样,还记录了log开关和report开关。

  2. 数据库、UI使用zipkin,减少了一些开发量。zipkin的Span设计能适应较多的采集场景,比如单向数据传递、黑盒模式的远端节点(调用数据库时,数据库就是一个黑盒模型)。

  3. 和淘宝类似,加了业务字段透传功能,业务日志和跟踪日志分开。

Elk 6.1 安装使用说明

ELK 不是一款软件,而是 Elasticsearch、Logstash 和 Kibana 三种软件产品的首字母缩写。这三者都是开源软件,通常配合使用,而且又先后归于 Elastic.co 公司名下,所以被简称为 ELK Stack。根据 Google Trend 的信息显示,ELK Stack 已经成为目前最流行的集中式日志解决方案。

架构

Logstash

介绍

数据收集引擎:数据存储与数据流。它支持动态的从各种数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操作,然后存储到用户指定的位置;

安装

1、 官方教程
2、 windows下安装Logstash

效果图

注意事项

1、windows下需要使用NSSM

Elasticsearch

安装

说明:Elastic 需要 Java 8 环境,注意要保证环境变量JAVA_HOME正确设置。
1、下载最新Elasticsearch版本,解压到指定目录。
2、在Unix上运行bin/elasticsearch或者在Windows上运行bin\elasticsearch.bat
3、打开:http://localhost:9200/

效果图

注意事项:

  1. 启动内存不足
# vi ${elasticsearch_HOME}/config/jvm.options
#-Xms2g
#-Xmx2g
-Xms512m
-Xmx512m
  1. 无法以root权限启动
# groupadd es
# useradd es -g es -p es
# chown es:es ${elasticsearch_HOME}/
# sudo su es
# ./bin/elasticsearch

安装elasticsearch-head

git clone git://github.com/mobz/elasticsearch-head.git
cd elasticsearch-head
npm install
npm run start
open http://localhost:9100/

谷歌插件安装链接

说明

Kibana

介绍

Kibana是个数据分析和可视化平台。通常与 Elasticsearch 配合使用,对其中数据进行搜索、分析和以统计图表的方式展示;

安装

kibana官方文档

效果图

注意事项

1、 需要cmd执行,直接运行.bat无效
2、 Logstash 6.1.1 issue with setup.bat file "could not find jruby in"
在7.0版本修复,下载master文件,并替换bin目录文件接口。

参考资料
1. Kibana入门教程
2. Logstash 最佳实践
3. kibana官方文档
4. ELK+Filebeat 集中式日志解决方案详解

mysql 如何恢复数据?

问题:

尝试还原数据库,之后提示 table doesn`t exist。

分析:

类型:MyISAM
数据:Table.frm,Table.MYD,Table.MYI
位置:/data/$databasename/目录中
说明:直接复制到mysql中data目录中,便可以使用

类型:InnoDB
数据文件:存储在/$innodb_data_home_dir/中的ibdata1文件中
结构文件:结构文件存在于/data/table_name.frm中
说明:不可以直接使用,并报错:table doesn`t exist

解决方法:

1、 停止 apache 和 mysql服务
2、 拷贝相应文件到/data/目录,在数据库引擎类型为InnoDB时,拷贝数据文件的同时还需要拷贝ibdata1。
3、 将根目录下的ib_logfile*文件全部删除掉

备注:

1、 正常的数据导出恢复,最好用工具,不要在data文件层面去恢复
2、 测试环境在windows下

参考文档:

1、 mysql 直接从date 文件夹备份表,还原数据库之后提示 table doesn`t exist的原因和解决方法