Elasticsearch Enrich processor
Elasticsearch 7.5부터 이미 인덱싱되어 있는 인덱스들의 데이터를 새로 인덱싱되는 데이터에 원하는대로 추가하는 Enrich 작업 가능
주요 활용 사례
- IP address를 통해서 연관된 디바이스나 서비스 정보들 추가
- 상품 주문 데이터의 Product ID를 통해서 상세 제품 정보 데이터 추가
- 이메일 주소를 통해서 상세 컨택 포인트 정보 추가
- GPS의 정보를 통해서 우편 번호 정보 추가
주의 사항
- Enrich processor가 여러 작업을 수행하며 수집 파이프라인의 속도에 영향을 줄 수 있으므로, 프로덕션 환경에 배포하기 전 테스트 실행해 벤치마킹할 것을 권장
- 실시간 데이터를 추가하는 데는 수집 프로세서를 사용하지 않는 것이 좋으며, 수집 프로세서는 자주 변경되지 않는 참조 데이터에서 가장 잘 동작
Ingest node
elasticsearch에 데이터를 인덱싱하기 전에 다양한 전처리를 할 수 있는 메커니즘을 제공하는 노드 타입으로써
Enrich processor 설정 시 해당 노드 타입에서 프로세스 실행
모든 노드 타입의 default가 ingest 가능으로 설정되어 있으며, elasticsearch.yml 파일에서 disable 가능
node.ingest: false
Ingest Pipeline
다양한 프로세서(Processors)로 파이프라인(Ingest Pipline)을 구성해서 순차적으로 데이터를 처리한 후 es에 인덱싱
- 데이터의 전처리 위해 전처리 processor가 지정된 pipeline 정의
- elasticsearch 7.8부터 ingest data를 쉽게 관리하기 위해 kibana의 management에 Ingest Node Pipelines 추가됨
- 모든 데이터 파이프라인을 하나의 테이블에서 볼 수 있음
- JSON 에디터를 이용해 파이프라인 생성, 삭제, 편집 가능
How the enrich processor works

Ingest pipeline이 인덱싱 전의 데이터 변환
- Ingest pipeline의 정의된 processor들의 인덱싱 되는 데이터들을 대상으로한 변환 작업이 끝나면 target 인덱스에 해당 document 추가됨
- 대부분 processor들 해당 문서의 변환 작업만 수행하지만, enrich processor는 들어오는 문서들에 새로운 데이터를 추가하는 작업 수행
Enrich Pipeline 구성
1. prerequisites 확인
security 사용 시 권한/롤 설정 필요
- read index privileges for any indices used
- The enrich_user built-in roleAdd enrich data
2. Source Index 생성
source index의 document는 incoming 문서에 추가하려는 enrich data가 포함되어야 하며,
문서 및 인덱스 API를 사용해 source index 관리 가능 (일반 인덱스와 동일)
3. Enrich Policy 생성
source 인덱스를 대상으로 Enrich Policy 정의 및 생성
PUT /_enrich/policy/my-policy
{
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
}
}
- policy type : enrich policy 정의
- geo_match: geo_shape_query를 기반 (지리 정보 관련된 incoming 문서에 적용 시 사용)
- match: term 쿼리 기반
- rangeMatch: range 쿼리 기반 (숫자, 날짜, IP주소 관련 incoming 문서 매치 시 사용)
- indices: 같은 match_field를 공유하는 source index 여러개 정의 가능
- match_field: incoming 문서에 사용될 source 인덱스의 필드
- enrich_field: incoming 문서에 추가될 필드 정의. source 인덱스에 반드시 존재해야 함
- query (optional): enrich index에서 매칭 시 필터링 쿼리 정의 (default: match_all
주의사항
생성된 enrich policy는 업데이트나 변경 불가하므로, 변경 필요 시 아래 작업 수행
- 신규 enrich policy 생성 및 실행
- 사용 중인 enrich processor의 기존 enrich policy를 신규 policy로 대체
- delete api 사용해 기존 enrich policy 삭제
4. Enrich Policy 실행
enrich policy를 execute API로 실행해 enrich 인덱스 생성
PUT /_enrich/policy/my-policy/_execute

enrich 인덱스에는 source 인덱스에 있는 문서가 포함되며, enrich 인덱스는 항상 .enrich-*로 시작하고 읽기 전용이며 강제 병합(force merge) 됨
5. enrich processor 정의 및 ingest pipeline에 추가
PUT _ingest/pipeline/my_pipeline_id
{
"description" : "describe pipeline",
"processors" : [
{
"enrich" : {
"policy_name": "enrich-policy",
"field" : "enrich_field",
"target_field": "enrich_target_field",
"max_matches": "1"
}
}
]
}
- enrich processor
- policy_name: 위에서 정의한 enrich policy
- field: 새로 인덱싱되는 data에서 매칭될 필드 정의
- target_field: enrich data가 포함될 필드 이름 정의. 해당 필드에 enrich data가 json array 형식으로 추가
- max_matches: target_field에 추가될 json array 길이 제한
6. Ingest and enrich documents.
Index template에 위에서 생성한 pipeline 추가해 적용
PUT _template/index_template
{
"settings" : {
"index": {
"default_pipeline": "my_pipeline_id",
...
}
}
}
'Elasticsearch' 카테고리의 다른 글
Elasticsearch 디스크 불균등 이슈 해결하기 (0) | 2024.03.10 |
---|---|
nested list field size 집계 (1) | 2023.12.23 |
keyword list 필드 string으로 합치기 (1) | 2023.12.23 |
NGram analyzer (0) | 2023.12.14 |
ElasticSearch(oss) vs OpenSearch (0) | 2023.10.28 |