- 《自开发了一款短视频去水印小程序》(2023年06月06日)
- 《es 字符串时间格式的字符串怎么做小时查询》(2024年04月11日)
// 页标
sourceBuilder.from((interfaceDTO.getPageIndex() - 1) * interfaceDTO.getPageSize());
// 页数
sourceBuilder.size(interfaceDTO.getPageSize());
// 获取超过1w条数据,需要加上track_total_hits: true ,不然只能显示出9999条
sourceBuilder.trackTotalHits(true);
@Override
public RequestResult getRequestLogs(InterfaceDTO interfaceDTO) throws Exception {
Date queryRequestLogStartTime = DateUtil.toDate(interfaceDTO.getQueryRequestLogStartTime(), "yyyy-MM-dd HH:mm:ss");
Date queryRequestLogEndTime = DateUtil.toDate(interfaceDTO.getQueryRequestLogEndTime(), "yyyy-MM-dd HH:mm:ss");
List<Date> betweenDates = DateUtil.getBetweenDates(queryRequestLogStartTime, queryRequestLogEndTime);
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
if (StringUtil.isNotEmpty(interfaceDTO.getPartnerId())) {
queryBuilder.must(QueryBuilders.termQuery("partnerId", interfaceDTO.getPartnerId().toLowerCase()));
}
if (StringUtil.isNotEmpty(interfaceDTO.getAppId())) {
queryBuilder.must(QueryBuilders.termQuery("appId", interfaceDTO.getAppId().toLowerCase()));
}
if (interfaceDTO.getServiceId() != null) {
queryBuilder.must(QueryBuilders.termQuery("serviceId", interfaceDTO.getServiceId()));
}
if (interfaceDTO.getId() != null) {
queryBuilder.must(QueryBuilders.termQuery("interfaceId", interfaceDTO.getId()));
}
if (interfaceDTO.getResponseState() != null) {
queryBuilder.must(QueryBuilders.termQuery("responseState", interfaceDTO.getResponseState()));
}
if (interfaceDTO.getRequestDuration() != null) {
queryBuilder.must(QueryBuilders.rangeQuery("downStreamRequestDuration").gte(interfaceDTO.getRequestDuration()));
}
queryBuilder.must(QueryBuilders.rangeQuery("downStreamRequestTime").from(queryRequestLogStartTime.getTime()).to(queryRequestLogEndTime.getTime()));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(queryBuilder);
sourceBuilder.from((interfaceDTO.getPageIndex() - 1) * interfaceDTO.getPageSize());
sourceBuilder.size(interfaceDTO.getPageSize());
sourceBuilder.trackTotalHits(true);
sourceBuilder.sort("downStreamRequestTime", SortOrder.DESC);
sourceBuilder.fetchSource(new String[] {}, new String[] {});
SearchRequest searchRequest = new SearchRequest(getEsIndices(betweenDates));
searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false));
searchRequest.types("_doc");
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = eslClient.search(searchRequest, ElasticsearchConfig.COMMON_OPTIONS);
SearchHits hits = searchResponse.getHits();
TotalHits totalHits = hits.getTotalHits();
SearchHit[] searchHits = hits.getHits();
PageUtil<InterfaceLogDTO> interfaceLogs = pageInterfaceLogs(totalHits, searchHits, interfaceDTO);
return RequestResult.success(interfaceLogs);
}
private PageUtil<InterfaceLogDTO> pageInterfaceLogs(TotalHits totalHits, SearchHit[] searchHits, InterfaceDTO interfaceDTO) {
List<InterfaceLogDTO> interfaceLogDTOS = new ArrayList<>();
for (SearchHit hit: searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
JSONObject jsonObject = new JSONObject();
jsonObject.putAll(sourceAsMap);
InterfaceLogDTO interfaceLogDTO = jsonObject.toJavaObject(InterfaceLogDTO.class);
interfaceLogDTOS.add(interfaceLogDTO);
}
int totalPage = Math.toIntExact(totalHits.value / interfaceDTO.getPageSize()) + 1;
return new PageUtil<>(interfaceDTO.getPageIndex(), interfaceDTO.getPageSize(), Long.valueOf(totalHits.value).intValue(), totalPage, interfaceLogDTOS);
}
java.lang.IllegalArgumentException: mapping source must be pairs of fieldnames and properties definition.
1,导包错误,需要导入client包下的api,直接解决问题
如果导入的为过时的方法,不修复导包,则mapping 方法需要加_doc。
从es官网找到的
request.mapping("_doc",
“{\n” +
" “_doc”: {\n" +
" “properties”: {\n" +
" “message”: {\n" +
" “type”: “text”\n" +
" }\n" +
" }\n" +
" }\n" +
“}”,
XContentType.JSON);
不生效并报错
查看源码,发现mapping的第二个参数,要求必须为 %2==0,改为
request.mapping("_doc", “datetime”, “type=date”);
多个参数
request.mapping("_doc", “datetime”, “type=date”, “string”, “type=text”);
!!! 设置“_doc”!!!
脚本
#!/bin/bash
# 定义ClickHouse连接参数
CLICKHOUSE_HOST="127.0.0.1"
CLICKHOUSE_PORT="9000"
CLICKHOUSE_DATABASE="xxx"
CLICKHOUSE_USER=""
CLICKHOUSE_PASSWORD=""
# 定义每个批次的大小(可根据需要调整)
BATCH_SIZE=100000
# 定义SQL文件路径
SQL_FILE="/root/sql/sql.sql"
# 计算SQL文件的总行数
TOTAL_LINES=$(wc -l < "$SQL_FILE")
echo "Total lines in SQL file: $TOTAL_LINES"
# 计算需要执行的批次数量
TOTAL_BATCHES=$((TOTAL_LINES / BATCH_SIZE + 1))
echo "Total batches to execute: $TOTAL_BATCHES"
# 循环执行每个批次的导入
for i in $(seq 0 $((TOTAL_BATCHES - 1))); do
# 计算当前批次的起始行号和结束行号
START_LINE=$((i * BATCH_SIZE + 1))
END_LINE=$(((i + 1) * BATCH_SIZE))
if [ "$END_LINE" -gt "$TOTAL_LINES" ]; then
END_LINE=$TOTAL_LINES
fi
# 提取当前批次的SQL语句并保存到临时文件中
awk "NR >= $START_LINE && NR <= $END_LINE" "$SQL_FILE" > batch_${i}.sql
# 导入当前批次的数据到ClickHouse
echo "Importing batch $i (lines $START_LINE to $END_LINE)..."
#cat batch_${i}.sql | clickhouse-client -h "$CLICKHOUSE_HOST" --port "$CLICKHOUSE_PORT"
clickhouse-client -h "$CLICKHOUSE_HOST" --port "$CLICKHOUSE_PORT" --query="INSERT INTO xx.xx FORMAT MySQLDump" < batch_${i}.sql
# 检查上一步操作是否成功
if [ $? -eq 0 ]; then
echo "Batch $i import completed successfully."
else
echo "Error: Batch $i import failed."
exit 1
fi
# 删除临时文件
rm -f batch_${i}.sql
done
echo "Import completed!"
这样,clickhouse-client 将执行SQL文件中指定的数据库和表上的操作。
先在clickhouse建表和库
CREATE TABLE "xxx"
(
"USER_ID" String,
"DOOR_INDEX_CODE" String,
"DOOR_NAME" String,
"IN_OUT_TYPE" String,
"ORG_INDEX_CODE" String,
"ORG_INDEX_NAME" String,
"ACADEMY" String,
"BIRTHPLACE" String,
"CENSUS_REGISTER" String,
"CERT_NO" String,
"COMPANY_UNIT" String,
"DORMITOR" String,
"EDUCATION" String,
"EMAIL" String,
"HOUSE_HOLD_REL" String,
"SFTYPE" String,
"MOBILE" String,
"NATION" String,
"ORGID" String,
"PERSON_ID" String,
"PERSON_NAME" String,
"SEX" String,
"CERTTYPE" String,
"EVENT_TIME" String,
"CARD_NO" String,
"IS_MEN" String,
"JOB_NO" String,
"DEV_INDEX_CODE" String,
"DEV_NAME" String,
"DOOR_REGION_INDEX_CODE" String,
"PIC_URI" String,
"ORG_NAME" String,
"EVENT_TYPE" String,
"RECEIVE_TIME" String,
"CREATE_TIME" String,
"CREATE_USER" String,
"UPDATE_TIME" String,
"UPDATE_USER" String,
"ID" String,
"IS_DELETE" String DEFAULT '0',
"ORG_ID" String,
"EVENT_ID" String DEFAULT ''
)
ENGINE = MergeTree()
PRIMARY KEY ("ID")
clickhouse-client 导入
INSERT INTO qwuser.B_T_PEOPLE_INOUT_LOGS
FROM INFILE '/root/sql/sql.sql' FORMAT MySQLDump
clickhouse常用操作命令
启动Server服务
systemctl start clickhouse-server
1
重启server
systemctl restart clickhouse-server
1
停止server
systemctl stop clickhouse-server
1
client命令行连接-多命令行模式
clickhouse-client -m --password <密码>
1
client命令行远程连接
clickhouse-client --host 192.168.45.10 --port 9000 --database default --user default --password “”
1
client命令行执行sql文件
clickhouse-client --user default --password 密码 -d default --multiquery < /root/temp.sql