Elasticsearch Enrich processor

Elasticsearch 7.5부터 이미 인덱싱되어 있는 인덱스들의 데이터를 새로 인덱싱되는 데이터에 원하는대로 추가하는 Enrich 작업 가능

주요 활용 사례

  • IP address를 통해서 연관된 디바이스나 서비스 정보들 추가
  • 상품 주문 데이터의 Product ID를 통해서 상세 제품 정보 데이터 추가
  • 이메일 주소를 통해서 상세 컨택 포인트 정보 추가
  • GPS의 정보를 통해서 우편 번호 정보 추가

 

주의 사항

  • Enrich processor가 여러 작업을 수행하며 수집 파이프라인의 속도에 영향을 줄 수 있으므로, 프로덕션 환경에 배포하기 전 테스트 실행해 벤치마킹할 것을 권장
  • 실시간 데이터를 추가하는 데는 수집 프로세서를 사용하지 않는 것이 좋으며, 수집 프로세서는 자주 변경되지 않는 참조 데이터에서 가장 잘 동작

 

 

Ingest node

elasticsearch에 데이터를 인덱싱하기 전에 다양한 전처리를 할 수 있는 메커니즘을 제공하는 노드 타입으로써

Enrich processor 설정 시 해당 노드 타입에서 프로세스 실행

 

모든 노드 타입의 default가 ingest 가능으로 설정되어 있으며, elasticsearch.yml 파일에서 disable 가능

node.ingest: false

 

 

Ingest Pipeline

다양한 프로세서(Processors)로 파이프라인(Ingest Pipline)을 구성해서 순차적으로 데이터를 처리한 후 es에 인덱싱

  • 데이터의 전처리 위해 전처리 processor가 지정된 pipeline 정의
  • elasticsearch 7.8부터 ingest data를 쉽게 관리하기 위해 kibana의 management에 Ingest Node Pipelines 추가됨
    • 모든 데이터 파이프라인을 하나의 테이블에서 볼 수 있음
    • JSON 에디터를 이용해 파이프라인 생성, 삭제, 편집 가능

 

How the enrich processor works

Ingest pipeline이 인덱싱 전의 데이터 변환

  • Ingest pipeline의 정의된 processor들의 인덱싱 되는 데이터들을 대상으로한 변환 작업이 끝나면 target 인덱스에 해당 document 추가됨
  • 대부분 processor들 해당 문서의 변환 작업만 수행하지만, enrich processor는 들어오는 문서들에 새로운 데이터를 추가하는 작업 수행 

 

 

Enrich Pipeline 구성

 

1. prerequisites 확인

security 사용 시 권한/롤 설정 필요

  1. read index privileges for any indices used
  2. The enrich_user built-in roleAdd enrich data

 

2. Source Index 생성

source index의 document는 incoming 문서에 추가하려는 enrich data가 포함되어야 하며, 
문서 및 인덱스 API를 사용해 source index 관리 가능 (일반 인덱스와 동일)

 

3. Enrich Policy 생성

source 인덱스를 대상으로 Enrich Policy 정의 및 생성

PUT /_enrich/policy/my-policy
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
  }
}
  • policy type : enrich policy 정의
    • geo_match: geo_shape_query를 기반 (지리 정보 관련된 incoming 문서에 적용 시 사용)
    • match: term 쿼리 기반
    • rangeMatch: range 쿼리 기반 (숫자, 날짜, IP주소 관련 incoming 문서 매치 시 사용)
  • indices: 같은 match_field를 공유하는 source index 여러개 정의 가능
  • match_field: incoming 문서에 사용될 source 인덱스의 필드
  • enrich_field: incoming 문서에 추가될 필드 정의. source 인덱스에 반드시 존재해야 함
  • query (optional): enrich index에서 매칭 시 필터링 쿼리 정의 (default: match_all

 

주의사항

생성된 enrich policy는 업데이트나 변경 불가하므로, 변경 필요 시 아래 작업 수행

  1. 신규 enrich policy 생성 및 실행
  2. 사용 중인 enrich processor의 기존 enrich policy를 신규 policy로 대체 
  3. delete api 사용해 기존 enrich policy 삭제

 

 

4. Enrich Policy 실행

enrich policy를 execute API로 실행해 enrich 인덱스 생성

PUT /_enrich/policy/my-policy/_execute

 

enrich 인덱스에는 source 인덱스에 있는 문서가 포함되며, enrich 인덱스는 항상 .enrich-*로 시작하고 읽기 전용이며 강제 병합(force merge) 됨

 

 

5. enrich processor 정의 및 ingest pipeline에 추가

PUT _ingest/pipeline/my_pipeline_id
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "enrich-policy",
        "field" : "enrich_field",
        "target_field": "enrich_target_field",
        "max_matches": "1"
      }
    }
  ]
}
  • enrich processor
    • policy_name: 위에서 정의한 enrich policy
    • field: 새로 인덱싱되는 data에서 매칭될 필드 정의
    • target_field: enrich data가 포함될 필드 이름 정의. 해당 필드에 enrich data가 json array 형식으로 추가
    • max_matches: target_field에 추가될 json array 길이 제한 

 

6. Ingest and enrich documents.

Index template에 위에서 생성한 pipeline 추가해 적용

PUT _template/index_template
{
  "settings" : {
    "index": {
      "default_pipeline": "my_pipeline_id",
       ...
    }
  }
}

 

 

 

 

참고 문서 : https://www.elastic.co/guide/en/elasticsearch/reference/7.x/enrich-setup.html#create-enrich-source-index

 

'Elasticsearch' 카테고리의 다른 글

Elasticsearch 디스크 불균등 이슈 해결하기  (0) 2024.03.10
nested list field size 집계  (1) 2023.12.23
keyword list 필드 string으로 합치기  (1) 2023.12.23
NGram analyzer  (0) 2023.12.14
ElasticSearch(oss) vs OpenSearch  (0) 2023.10.28

+ Recent posts