es分页查询

// 页标
sourceBuilder.from((interfaceDTO.getPageIndex() - 1) * interfaceDTO.getPageSize());
// 页数
sourceBuilder.size(interfaceDTO.getPageSize());
// 获取超过1w条数据,需要加上track_total_hits: true ,不然只能显示出9999条
sourceBuilder.trackTotalHits(true);
@Override
public RequestResult getRequestLogs(InterfaceDTO interfaceDTO) throws Exception {
    Date queryRequestLogStartTime = DateUtil.toDate(interfaceDTO.getQueryRequestLogStartTime(), "yyyy-MM-dd HH:mm:ss");
    Date queryRequestLogEndTime = DateUtil.toDate(interfaceDTO.getQueryRequestLogEndTime(), "yyyy-MM-dd HH:mm:ss");
    List<Date> betweenDates = DateUtil.getBetweenDates(queryRequestLogStartTime, queryRequestLogEndTime);

    BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
    if (StringUtil.isNotEmpty(interfaceDTO.getPartnerId())) {
        queryBuilder.must(QueryBuilders.termQuery("partnerId", interfaceDTO.getPartnerId().toLowerCase()));
    }
    if (StringUtil.isNotEmpty(interfaceDTO.getAppId())) {
        queryBuilder.must(QueryBuilders.termQuery("appId", interfaceDTO.getAppId().toLowerCase()));
    }
    if (interfaceDTO.getServiceId() != null) {
        queryBuilder.must(QueryBuilders.termQuery("serviceId", interfaceDTO.getServiceId()));
    }
    if (interfaceDTO.getId() != null) {
        queryBuilder.must(QueryBuilders.termQuery("interfaceId", interfaceDTO.getId()));
    }
    if (interfaceDTO.getResponseState() != null) {
        queryBuilder.must(QueryBuilders.termQuery("responseState", interfaceDTO.getResponseState()));
    }
    if (interfaceDTO.getRequestDuration() != null) {
        queryBuilder.must(QueryBuilders.rangeQuery("downStreamRequestDuration").gte(interfaceDTO.getRequestDuration()));
    }

    queryBuilder.must(QueryBuilders.rangeQuery("downStreamRequestTime").from(queryRequestLogStartTime.getTime()).to(queryRequestLogEndTime.getTime()));

    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(queryBuilder);
    sourceBuilder.from((interfaceDTO.getPageIndex() - 1) * interfaceDTO.getPageSize());
    sourceBuilder.size(interfaceDTO.getPageSize());
    sourceBuilder.trackTotalHits(true);
    sourceBuilder.sort("downStreamRequestTime", SortOrder.DESC);
    sourceBuilder.fetchSource(new String[] {}, new String[] {});

    SearchRequest searchRequest = new SearchRequest(getEsIndices(betweenDates));
    searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false));
    searchRequest.types("_doc");
    searchRequest.source(sourceBuilder);

    SearchResponse searchResponse = eslClient.search(searchRequest, ElasticsearchConfig.COMMON_OPTIONS);
    SearchHits hits = searchResponse.getHits();
    TotalHits totalHits = hits.getTotalHits();
    SearchHit[] searchHits = hits.getHits();

    PageUtil<InterfaceLogDTO> interfaceLogs = pageInterfaceLogs(totalHits, searchHits, interfaceDTO);
    return RequestResult.success(interfaceLogs);
}

private PageUtil<InterfaceLogDTO> pageInterfaceLogs(TotalHits totalHits, SearchHit[] searchHits, InterfaceDTO interfaceDTO) {
    List<InterfaceLogDTO> interfaceLogDTOS = new ArrayList<>();

    for (SearchHit hit: searchHits) {
        Map<String, Object> sourceAsMap = hit.getSourceAsMap();
        JSONObject jsonObject = new JSONObject();
        jsonObject.putAll(sourceAsMap);
        InterfaceLogDTO interfaceLogDTO = jsonObject.toJavaObject(InterfaceLogDTO.class);
        interfaceLogDTOS.add(interfaceLogDTO);
    }

    int totalPage = Math.toIntExact(totalHits.value / interfaceDTO.getPageSize()) + 1;
    return new PageUtil<>(interfaceDTO.getPageIndex(), interfaceDTO.getPageSize(), Long.valueOf(totalHits.value).intValue(), totalPage, interfaceLogDTOS);
}

es创建索引报错java.lang.IllegalArgumentException: mapping source must be pairs of fieldnam

java.lang.IllegalArgumentException: mapping source must be pairs of fieldnames and properties definition.

1,导包错误,需要导入client包下的api,直接解决问题

如果导入的为过时的方法,不修复导包,则mapping 方法需要加_doc。
从es官网找到的

request.mapping("_doc",
“{\n” +
" “_doc”: {\n" +
" “properties”: {\n" +
" “message”: {\n" +
" “type”: “text”\n" +
" }\n" +
" }\n" +
" }\n" +
“}”,
XContentType.JSON);

不生效并报错
查看源码,发现mapping的第二个参数,要求必须为 %2==0,改为

request.mapping("_doc", “datetime”, “type=date”);
多个参数
request.mapping("_doc", “datetime”, “type=date”, “string”, “type=text”);

!!! 设置“_doc”!!!

shell分批导入数据到clickhouse

脚本

#!/bin/bash

# 定义ClickHouse连接参数
CLICKHOUSE_HOST="127.0.0.1"
CLICKHOUSE_PORT="9000"
CLICKHOUSE_DATABASE="xxx"
CLICKHOUSE_USER=""
CLICKHOUSE_PASSWORD=""

# 定义每个批次的大小(可根据需要调整)
BATCH_SIZE=100000

# 定义SQL文件路径
SQL_FILE="/root/sql/sql.sql"

# 计算SQL文件的总行数
TOTAL_LINES=$(wc -l < "$SQL_FILE")
echo "Total lines in SQL file: $TOTAL_LINES"

# 计算需要执行的批次数量
TOTAL_BATCHES=$((TOTAL_LINES / BATCH_SIZE + 1))
echo "Total batches to execute: $TOTAL_BATCHES"

# 循环执行每个批次的导入
for i in $(seq 0 $((TOTAL_BATCHES - 1))); do
    # 计算当前批次的起始行号和结束行号
    START_LINE=$((i * BATCH_SIZE + 1))
    END_LINE=$(((i + 1) * BATCH_SIZE))
    if [ "$END_LINE" -gt "$TOTAL_LINES" ]; then
        END_LINE=$TOTAL_LINES
    fi

    # 提取当前批次的SQL语句并保存到临时文件中
    awk "NR >= $START_LINE && NR <= $END_LINE" "$SQL_FILE" > batch_${i}.sql

    # 导入当前批次的数据到ClickHouse
    echo "Importing batch $i (lines $START_LINE to $END_LINE)..."
    #cat batch_${i}.sql | clickhouse-client -h "$CLICKHOUSE_HOST" --port "$CLICKHOUSE_PORT"
    clickhouse-client -h "$CLICKHOUSE_HOST" --port "$CLICKHOUSE_PORT" --query="INSERT INTO xx.xx FORMAT MySQLDump" < batch_${i}.sql
  

    # 检查上一步操作是否成功
    if [ $? -eq 0 ]; then
        echo "Batch $i import completed successfully."
    else
        echo "Error: Batch $i import failed."
        exit 1
    fi

    # 删除临时文件
    rm -f batch_${i}.sql
done

echo "Import completed!"

这样,clickhouse-client 将执行SQL文件中指定的数据库和表上的操作。

达梦数据迁移到clickhouse

先在clickhouse建表和库

CREATE TABLE "xxx"
(
"USER_ID" String,
"DOOR_INDEX_CODE" String,
"DOOR_NAME" String,
"IN_OUT_TYPE" String,
"ORG_INDEX_CODE" String,
"ORG_INDEX_NAME" String,
"ACADEMY" String,
"BIRTHPLACE" String,
"CENSUS_REGISTER" String,
"CERT_NO" String,
"COMPANY_UNIT" String,
"DORMITOR" String,
"EDUCATION" String,
"EMAIL" String,
"HOUSE_HOLD_REL" String,
"SFTYPE" String,
"MOBILE" String,
"NATION" String,
"ORGID" String,
"PERSON_ID" String,
"PERSON_NAME" String,
"SEX" String,
"CERTTYPE" String,
"EVENT_TIME" String,
"CARD_NO" String,
"IS_MEN" String,
"JOB_NO" String,
"DEV_INDEX_CODE" String,
"DEV_NAME" String,
"DOOR_REGION_INDEX_CODE" String,
"PIC_URI" String,
"ORG_NAME" String,
"EVENT_TYPE" String,
"RECEIVE_TIME" String,
"CREATE_TIME" String,
"CREATE_USER" String,
"UPDATE_TIME" String,
"UPDATE_USER" String,
"ID" String,
"IS_DELETE" String DEFAULT '0',
"ORG_ID" String,
"EVENT_ID" String DEFAULT ''
)
ENGINE = MergeTree()
PRIMARY KEY ("ID")

clickhouse-client 导入

INSERT INTO qwuser.B_T_PEOPLE_INOUT_LOGS
FROM INFILE '/root/sql/sql.sql' FORMAT MySQLDump

clickhouse数据库启动、重启、关闭、命令行模式、远程连接

clickhouse常用操作命令

启动Server服务

systemctl start clickhouse-server

1

重启server

systemctl restart clickhouse-server

1

停止server

systemctl stop clickhouse-server

1

client命令行连接-多命令行模式

clickhouse-client -m --password <密码>

1

client命令行远程连接

clickhouse-client --host 192.168.45.10 --port 9000 --database default --user default --password “”

1

client命令行执行sql文件

clickhouse-client --user default --password 密码 -d default --multiquery < /root/temp.sql