<?php
namespace app\common\logic;
//require '/vendor/autoload.php';
use app\common\exception\InvalidArgumentsException;
use Elasticsearch\ClientBuilder;
use Elasticsearch\Common\Exceptions\ElasticCloudIdParseException;
use think\facade\Log;
class Elastic
{
private $client;
static private $instance;
private function __construct(){
$this->client = ClientBuilder::create()->setHosts(config('es.hosts'))->setBasicAuthentication(config('es.user'),config('es.passwd'))->build();
}
/**
* 私有化克隆函数,防止类外克隆对象
*/
private function __clone ()
{}
static public function getInstance()
{
//判断$client是否是Singleton的对象,不是则创建
if (!self::$instance instanceof self) {
self::$instance = new self();
}
return self::$instance;
}
/**
* 批量插入文档
* @param array $params
* @return bool
*/
public function addAllDos(array $params,$index='chat')
{
foreach ($params as $param){
$params1['body'][] = [
'index' => [ #创建或替换
'_index' => $index,
'_id' => $param['id'],
],
];
$params1['body'][] = $param;
}
try {
$this->client->bulk($params1);
} catch (\Exception $e) {
Log::write($e->getMessage(), "插入失败");
return false;
}
return true;
}
/**
* 更新文档(更新一条数据)
* @param int $id
* @param array $body
* @param string $index
* @return bool
*/
public function updateDoc(int $id, array $body, string $index): bool
{
$params = [
'index' => $index,
'id' => $id,
'body' => [
'doc' => $body
]
];
try {
$this->client->update($params);
} catch (\Exception $e) {
Log::write($e->getMessage(), "更新文档{$id}失败");
return false;
}
return true;
}
/**
* 删除文档(删除一条数据)
* @param int $id
* @param string $index
* @return bool
*/
public function deleteDoc(int $id, string $index): bool
{
$params = [
'index' => $index,
'id' => $id
];
try {
$this->client->delete($params);
} catch (\Exception $e) {
Log::write($e->getMessage(), "删除文档{$id}失败");
return false;
}
return true;
}
/**
* 检测索引是否存在,只能单个索引名称检测
* @param string $index 索引名称
* @return bool
*/
public function indexExists($index)
{
$response = $this->client->indices()->exists(['index' => $index]);
return $response;
}
/**
* query
'query' => [
多字段匹配
'multi_match' => [
'query' => $keywords,
'fields' => ['title', 'content', 'keyword'],
'type' => 'most_fields' // most_fields 多字段匹配度更高 best_fields 完全匹配占比更高
],
单个字段匹配
'match' => [
'title' => $keywords
],
完全匹配
'match_phrase' => [
'title' => $keywords
],
联合查询
'bool' => [
'should' => [ // 相当于or
[
'match_phrase' => [
'title' => $keywords
]
],
[
'match_phrase' => [
'content' => $keywords
]
],
[
'match_phrase' => [
'keyword' => $keywords
]
],
],
'must' => [ // 相当于and
[
'match' => [
'title' => $keywords
]
],
],
'should' => [ // 相当于or
[
'match' => [
'title' => $keywords
]
],
[
'match' => [
'content' => $keywords
]
],
[
'match' => [
'keyword' => $keywords
]
],
],
'must_not' => [ // 相当于not
[
'match' => [
'content' => $keywords
]
]
],
'filter' => [ // 过滤器 gt 大于;gte 大于等于;lt 小于;lte 小于等于
'range' => [
'id' => ['lt' => 20598, 'gt' => 20590]
]
],
],
],
'highlight' => [ // 搜索词高亮设置
// 'pre_tags' => "<p class='key' style='color: red;'>", // 自定义高亮样式
// 'post_tags' => "</p>",
'fields' => [ // 设置高亮的字段
'title' => (object)[],
'content' => (object)[],
'keyword' => (object)[],
]
],
*/
/**
* 获取索引文档数据列表
* @param $index 表名
* @param array $where 实例 ['title'=>'test title','name'=>'jjjj']
* @param string $orderField
* @param string $sort
* @param int $limit
* @param int $page
* @return mixed
*/
public function getList($index, $query=[],$orderField='id',$sort='desc', $limit = 10, $page = 1)
{
$offset = ((int)$page - 1) * (int)$limit;
$params = [
'index' => $index,
'body' => [
'query' => $query,
'sort' => [[$orderField => ['order' => $sort]]],
'from' => $offset,
'size' => $limit,
//'aggs'=>$aggs,
]
];
//聚合查询
if (isset($query['aggs'])){
$params['body']['aggs'] = $query['aggs'];
$params['body']['size'] = 0;
unset($query['aggs']);
$params['body']['query'] = $query;
}
try {
$response = $this->client->search($params);
$data = [];
$total = 0;
if (isset($response['hits']['total']['value']) && !isset($response['aggregations'])) {
if ( $response['hits']['total']['value'] >0 ){
//循环数据
foreach ( $response['hits']['hits'] as $value ){
$data[] = $value['_source'];
}
}
$total = $response['hits']['total']['value'];
}
//聚合查询的结果
if ( isset($response['aggregations']) ) {
/* foreach ($response['aggregations']['group_']['buckets'] as $key=>$value) {
}*/
return ['code'=>200,'msg'=>'','data'=>$response['aggregations']];
}
return ['code'=>200,'msg'=>'','data'=>$data,'total'=>$total];
}catch (\Exception $e){
throw new InvalidArgumentsException("es查询错误".$e->getMessage(),400);
}
//$response['hits']['hits']是具体的数据列表
return $response['hits'];
}
/**
* 创建表 doc
* @param string $index_name
* @param array $body
* @return array
*/
public function create_mappings($index_name = 'chat',$body=[]) {
类型 text、integer、float、double、boolean、date
$body = [
'id' => ['type' => 'long'],
'from_id' => ['type' => 'text'],
'from_name' => ['type' => 'text'],
'from_avatar' => ['type' => 'text'],
'to_id' => ['type' => 'text'],
'to_name' => ['type' => 'text'],
'seller_code' => ['type' => 'text',],
'content' => ['type' => 'text','index'=>true,'analyzer'=>'ik_max_word'],
'read_flag' => ['type' => 'integer'],
'content_type' => ['type' => 'integer'],
'client_id' => ['type' => 'long'],
'remark' => ['type' => 'text','index'=>true,'analyzer'=>'ik_max_word'],
'create_time' => ['type' => 'long'],
];
$params = [
'index' => $index_name, //索引名称
'body' => [
'settings' => [ // 设置配置
'number_of_shards' => 5, //主分片数
'number_of_replicas' => 1 //主分片的副本数
],
'mappings' => [ // 设置映射
'_source' => [ // 存储原始文档
'enabled' => 'true'
],
'properties' => $body // 配置数据结构与类型
],
]
];
$response = $this->client->indices()->create($params);
return $response;
}
}
//创建index的命令脚本
<?php
namespace app\command;
use app\common\logic\Elastic;
use think\console\Command;
use think\console\Input;
use think\console\Output;
class CreateEsIndex extends Command
{
protected function configure()
{
// 指令配置
$this->setName('createesindex');
// 设置参数
}
protected function execute(Input $input, Output $output)
{
// 指令输出
//$output->writeln('createesindex');
类型 text、long,integer、float、double、boolean、date,keyword
$body = [
'id' => ['type' => 'long'],
'from_id' => ['type' => 'keyword'],
'from_name' => ['type' => 'text','fielddata'=>true,'fields'=>['raw'=>['type'=>'keyword']]],
'image' => ['type' => 'text'],
'to_id' => ['type' => 'keyword'],
'to_name' => ['type' => 'text'],
'code' => ['type' => 'text',],
'content' => ['type' => 'text','index'=>true,'analyzer'=>'ik_max_word'],
'status' => ['type' => 'integer'],
'content_type' => ['type' => 'integer'],
'client_code' => ['type' => 'long'],
'create_time' => ['type' => 'long'],
];
$params = [
'index' => 'chat', //索引名称
'body' => [
'settings' => [ // 设置配置
'number_of_shards' => 5, //主分片数
'number_of_replicas' => 0 //主分片的副本数
],
'mappings' => [ // 设置映射
/* '_source' => [ // 存储原始文档
'enabled' => 'true'
],*/
'properties' => $body // 配置数据结构与类型
],
]
];
$response = Elastic::getInstance()->indices()->create($params);
return $response;
}
}
一些逻辑查询
$query = [
'bool'=>[
'must'=>[
],
'filter' =>
[
'range'=>[
'create_time'=>[
'gt' => (int)$start_time , 'lt' =>(int)$end_time
]
],
],
// 'minimum_should_match'=>1,
],
];
if ($param['keyword']){
$query['bool']['must'][] = [
'match_phrase'=>[
'content'=>$param['keyword'],
],
];
}
//查询发送者或者接收者
if ($customerCode){
$query['bool']['should'][] = [
'match'=>[
'to_id'=>$customerCode,
],
];
$query['bool']['should'][] = [
'match'=>[
'from_id'=>$customerCode,
],
];
//or查询至少满足一个条件
$query['bool']['minimum_should_match']=1;
}
//page小于0向上翻页,否则是展示当前limit条信息
if (isset($param['id']) ){
if ($page>0){
$query['bool']['must'][] = [
'range'=>[
'id'=>['gte'=>$param['id']],
],
];
}else{
$page = -$page;
$query['bool']['must'][] = [
'range' => [
'id'=>['lt'=>$param['id']],
],
];
}
}
//全文搜索
if (!$all){
$query['bool']['must'][] = [
'match'=>[
'seller_code'=>$sellerCode,
],
];
}
分页聚合查询
$size = $param['page'] *$param['limit'];
$query = [
'bool'=>[
'must'=>[
[
'match_phrase'=>[
'from_name'=>$param['name'],
],
],
[
'match_phrase'=>[
'to_id'=>$param['code'],
],
],
],
// 'minimum_should_match'=>1,
],
'aggs'=>[
'group_from_name'=>[
'terms'=>[
'field'=>'from_name.raw',
'size'=>$size
],
'aggs'=>[
'agency_names'=>[
'top_hits'=>[
'size'=>1,
'_source'=>[
'include'=>['from_id','from_name','from_avatar']
],
],
],
],
],
],
];
循环入es
$chat = new Chat();
$chat->chunk(300,function ($chatlogs) use($output){
foreach ($chatlogs as $key=>$chatlog){
//写入es
$chatlogs[$key]['create_time'] = strtotime($chatlog['create_time']);
}
Elastic::getInstance()->addAllDos($chatlogs);
});