OpenSearch에서 중복 데이터 저장을 막기 위한 방법을 정리하기 위해 글을 작성하게 되었습니다.
1. 중복 방지 전략
OpenSearch에서 중복 방지의 핵심은 쿼리로 체크하지 않는 것입니다. RDB처럼 UNIQUE 제약 조건이 존재하지 않기 때문에, 유일성을 보장할 수 있는 유일한 수단은 _id입니다.
즉, 중복을 막고 싶다면 어떤 값을 _id로 설계할 것인가를 고려해야 합니다.
1-1. index
같은 _id로 index를 호출하면 기존 문서는 교체 됩니다. 내부적으로는 새 버전이 생성되고 _version 이 증가합니다. 중복을 에러로 간주하지 않고, 최신 상태로 갱신 하는 전략입니다. 배치 재실행 시에도 동일 결과가 보장되므로 멱등성이 확보됩니다. 단점은 문서 전체를 교체한다는 점입니다. 일부 필드만 바꾸고 싶을 경우에는 적합하지 않습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
override fun saveAll(documents: List<Document>) {
val operations = documents.map { document ->
BulkOperation.Builder()
.index(
IndexOperation.Builder<Document>()
.index(INDEX_NAME)
.id(document.businessId) // 중복 기준
.document(document)
.build()
)
.build()
}
val response = openSearchClient.bulk(
BulkRequest.Builder().operations(operations).build()
)
if (response.errors()) {
throw RuntimeException("OpenSearch bulk 실패")
}
}
중복 기준이 단일 필드가 아닌 여러 필드 조합일 경우, 해당 값을 조합해 _id로 사용합니다. 이 방식은 별도의 조회 없이 _id만으로 유니크가 보장되며, 성능도 좋고 배치에서 안정적입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override fun saveAll(documents: List<Document>) {
val operations = documents.map { document ->
val compositeId = "${document.businessId}_${document.transactionDate}"
BulkOperation.Builder()
.index(
IndexOperation.Builder<Document>()
.index(INDEX_NAME)
.id(compositeId)
.document(document)
.build()
)
.build()
}
val response = openSearchClient.bulk(
BulkRequest.Builder().operations(operations).build()
)
if (response.errors()) {
throw RuntimeException("OpenSearch bulk 실패")
}
}
1-2. create
create는 RDB의 INSERT와 동일한 개념입니다. 이미 같은 _id가 존재하면 409 Conflict 가 발생합니다. 중복을 오류로 감지하고 싶을 때 사용합니다. 배치를 재실행하면 이전 데이터가 모두 409를 반환합니다. 따라서 409는 중복으로 간주하고 필터링하는 처리가 필요할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
override fun saveAll(documents: List<Document>) {
val operations = documents.map { document ->
BulkOperation.Builder()
.create(
CreateOperation.Builder<Document>()
.index(INDEX_NAME)
.id(document.businessId)
.document(document)
.build()
)
.build()
}
val response = openSearchClient.bulk(
BulkRequest.Builder().operations(operations).build()
)
val realErrors = response.items()
.filter { it.error() != null && it.status() != 409 }
if (realErrors.isNotEmpty()) {
throw RuntimeException("OpenSearch bulk 실패: ${realErrors.size}건")
}
}
1-3. docAsUpsert
문서가 존재하면 해당 필드만 업데이트하고, 없으면 새로 생성하는 방식입니다. index와 차이점은 기존 문서의 다른 필드를 유지한다는 점인데, 내부적으로 GET → merge → reindex 과정을 거치기 때문에 index보다 약간 느립니다. 하지만 부분 업데이트가 필요한 경우에는 가장 적합합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
override fun saveAll(documents: List<Document>) {
val operations = documents.map { document ->
BulkOperation.Builder()
.update(
UpdateOperation.Builder<Document, Document>()
.index(INDEX_NAME)
.id(document.businessId)
.action { action ->
action.doc(document).docAsUpsert(true)
}
.build()
)
.build()
}
val response = openSearchClient.bulk(
BulkRequest.Builder().operations(operations).build()
)
if (response.errors()) {
throw RuntimeException("OpenSearch bulk 실패")
}
}
2. 정리
OpenSearch에서 중복 방지는 쿼리로 사전 조회하는 방식이 아닌, _id 설계를 통해 해결하는 문제입니다. 이 외에도 외부 버전을 활용한 동시성 제어, 낙관적 락, 시퀀스 번호 기반 제어 등 보다 정교한 전략들도 존재합니다. 시스템의 데이터 특성과 일관성 요구 수준에 따라 적절한 방식을 선택합니다.