Kafka Connect는 Apache Kafka와 다른 데이터 시스템 간에 데이터를 확장 가능하고 안정적으로 스트리밍할 수 있는 도구입니다. 이를 통해 대용량 데이터 세트를 Kafka 안팎으로 이동시키는 커넥터를 빠르게 정의할 수 있습니다. Kafka Connect는 전체 데이터베이스를 수집하거나 모든 애플리케이션 서버의 메트릭을 Kafka topic으로 수집하여, 저지연으로 스트림 처리를 가능하게 합니다. 또한, export 커넥터를 통해 Kafka topic에서 Elasticsearch와 같은 2차 인덱스 또는 오프라인 분석을 위한 Hadoop과 같은 배치 시스템으로 데이터를 전달할 수 있습니다.
이 포스트에서는 Kafka Connect의 작동 방식과 중요 용어 및 핵심 개념을 설명합니다.
Kafka Connect란?
Kafka Connect는 데이터베이스, 키-값 저장소, 검색 인덱스 및 파일 시스템 간의 간단한 데이터 통합을 위한 중앙 집중식 데이터 허브 역할을 하는 Apache Kafka의 무료 오픈 소스 컴포넌트입니다. Kafka Connect를 사용하여 Apache Kafka와 다른 데이터 시스템 간에 데이터를 스트리밍하고 대용량 데이터셋을 Kafka 안팎으로 이동시키는 커넥터를 빠르게 만들 수 있습니다.
Kafka Connect의 이점
Kafka Connect는 다음과 같은 이점을 제공합니다:
- 데이터 중심 파이프라인: Connect는 의미 있는 데이터 추상화를 사용하여 데이터를 Kafka로 가져오거나 Kafka로부터 데이터를 내보냅니다.
- 유연성과 확장성: Connect는 스트리밍 및 배치 지향 시스템을 단일 노드(독립형) 또는 조직 전체 서비스(분산형)로 실행할 수 있습니다.
- 재사용성과 확장성: Connect는 기존 커넥터를 활용하거나 필요에 맞게 확장할 수 있으며, 프로덕션까지의 시간을 단축시킵니다.
Kafka Connect는 Kafka와의 데이터 스트리밍에 중점을 두어 고품질, 신뢰성 및 고성능 커넥터 플러그인을 작성하기 쉽게 합니다. 또한, 다른 프레임워크로는 달성하기 어려운 보장을 프레임워크가 제공할 수 있게 합니다. Kafka 및 스트림 처리 프레임워크와 결합하면 ETL 파이프라인의 필수 구성 요소가 됩니다.
Kafka Connect의 작동 방식
Kafka Connect 프레임워크는 전체 데이터베이스를 수집하거나 모든 애플리케이션 서버의 메트릭을 Kafka topic으로 수집하여 저지연으로 스트림 처리를 가능하게 합니다. 예를 들어, export 커넥터는 Kafka topic에서 Elasticsearch와 같은 2차 인덱스 또는 오프라인 분석을 위한 Hadoop과 같은 배치 시스템으로 데이터를 전달할 수 있습니다.
Kafka Connect를 독립형 프로세스로 배포하여 단일 머신에서 작업을 실행하거나(예: 로그 수집) 전체 조직을 지원하는 분산형, 확장 가능, 내결함성 서비스를 제공할 수 있습니다. Kafka Connect는 진입 장벽이 낮고 운영 오버헤드가 적습니다. 개발 및 테스트를 위한 독립형 환경으로 시작하여 대규모 조직의 데이터 파이프라인을 지원하는 전체 프로덕션 환경으로 확장할 수 있습니다.
Kafka Connect 개념
이 섹션에서는 다음과 같은 Kafka Connect 개념을 설명합니다:
- 커넥터(Connectors): 태스크를 관리하여 데이터 스트리밍을 조정하는 고수준 추상화입니다. 커넥터는 데이터 소스와 싱크 간의 데이터 흐름을 담당합니다.
- 태스크(Tasks): Kafka로 데이터를 복사하는 방법을 구현하는 단위입니다. 각 태스크는 데이터를 Kafka로 복사하거나 Kafka로부터 복사하는 실제 작업을 수행합니다.
- 작업자(Workers): 커넥터와 태스크를 실행하는 프로세스입니다. 작업자는 독립형 모드 또는 분산형 모드로 실행되어 커넥터와 태스크를 관리하고 실행합니다.
- 컨버터(Converters): Connect와 데이터를 송수신하는 시스템 간의 데이터를 변환하는 코드입니다. 컨버터는 데이터 형식을 변환하여 서로 다른 시스템 간의 데이터 교환을 가능하게 합니다.
- 변환(Transforms): 커넥터에 의해 생성되거나 커넥터로 전송되는 각 메시지를 변경하는 간단한 로직입니다. 이를 통해 데이터 형식을 변경하거나 특정 데이터를 필터링할 수 있습니다.
- 데드 레터 큐(Dead Letter Queue): 커넥터 오류를 처리하는 방식입니다. Connect는 오류가 발생한 메시지를 별도의 큐에 저장하여 나중에 분석하거나 재처리할 수 있도록 합니다.
커넥터 (Connectors)
Kafka Connect는 두 가지 유형의 커넥터를 포함합니다:
- 소스 커넥터(Source Connector): 소스 커넥터는 전체 데이터베이스를 수집하고 테이블 업데이트를 Kafka 토픽으로 스트리밍합니다. 또한, 소스 커넥터는 모든 애플리케이션 서버의 메트릭을 수집하여 Kafka 토픽에 저장함으로써, 저지연으로 스트림 처리를 가능하게 합니다.
- 싱크 커넥터(Sink Connector): 싱크 커넥터는 Kafka 토픽에서 데이터를 가져와 Elasticsearch와 같은 2차 인덱스 또는 Hadoop과 같은 배치 시스템으로 전달하여 오프라인 분석을 가능하게 합니다.
Confluent는 관계형 데이터베이스나 HDFS와 같은 일반적으로 사용되는 시스템으로 데이터를 스트리밍할 수 있는 여러 사전 구축된 커넥터를 제공합니다. connectors 목록은 여기에서 확인할 수 있습니다.
Kafka Connect의 커넥터는 데이터가 어디로부터 어디로 복사되어야 하는지를 정의합니다. 커넥터 인스턴스는 Kafka와 다른 시스템 간의 데이터 복사를 관리하는 논리적 작업입니다. 커넥터에 의해 구현되거나 사용되는 모든 클래스는 커넥터 플러그인에 정의됩니다. 커넥터 인스턴스와 커넥터 플러그인은 모두 "커넥터"라고 불릴 수 있지만, 문맥에 따라 어느 것을 가리키는지 항상 명확해야 합니다 (예를 들어, "커넥터 설치"는 플러그인을 의미하고, "커넥터 상태 확인"은 커넥터 인스턴스를 의미합니다).
태스크 (Tasks)
태스크는 Connect의 데이터 모델에서 주요 역할을 합니다. 각 커넥터 인스턴스는 데이터를 복사하는 여러 태스크 세트를 조정합니다. 커넥터가 단일 작업을 여러 태스크로 나눌 수 있도록 함으로써, Kafka Connect는 병렬 처리와 확장 가능한 데이터 복사를 최소한의 구성으로 지원합니다. 태스크 자체는 상태를 저장하지 않습니다. 대신 태스크의 상태는 Kafka의 특별한 토픽인 config.storage.topic
과 status.storage.topic
에 저장되며, 관련된 커넥터에 의해 관리됩니다. 태스크는 언제든지 시작, 중지 또는 재시작될 수 있어 탄력적이고 확장 가능한 데이터 파이프라인을 제공합니다.
태스크 리밸런싱 (Task Rebalancing)
커넥터가 클러스터에 처음 제출되면, 작업자들은 클러스터 내의 모든 커넥터와 그들의 태스크를 리밸런싱하여 각 작업자가 대략 동일한 양의 작업을 담당하도록 합니다. 이 리밸런싱 절차는 커넥터가 필요한 태스크 수를 늘리거나 줄일 때, 또는 커넥터의 구성이 변경될 때에도 사용됩니다. 작업자가 실패하면 태스크는 활성 작업자들 사이에서 다시 리밸런싱됩니다. 태스크가 실패할 경우, 태스크 실패는 예외적인 경우로 간주되므로 리밸런싱이 트리거되지 않습니다. 따라서 실패한 태스크는 프레임워크에 의해 재시작되지 않으며, REST API를 사용하여 재시작해야 합니다.
작업자 (Workers)
커넥터와 태스크는 논리적 작업 단위이며, 프로세스에서 실행되도록 스케줄링되어야 합니다. Kafka Connect는 이러한 프로세스를 작업자(workers)라고 부르며, 두 가지 유형의 작업자가 있습니다: 독립형 작업자와 분산형 작업자입니다.
독립형 작업자 (Standalone Workers)
독립형 모드는 가장 간단한 모드로, 단일 프로세스가 모든 커넥터와 태스크를 실행하는 책임을 집니다. 단일 프로세스이기 때문에 최소한의 구성만 필요합니다. 독립형 모드는 시작하기에 편리하며, 개발 중이거나 단일 프로세스만 필요한 특정 상황(예: 호스트에서 로그 수집)에 적합합니다. 그러나 단일 프로세스이기 때문에 확장성이 제한되며, 단일 프로세스에 추가하는 모니터링을 제외하고는 내결함성이 없습니다.
분산형 작업자 (Distributed Workers)
분산형 모드는 Kafka Connect에 대한 확장성과 자동 내결함성을 제공합니다. 분산형 모드에서는 동일한 group.id를 사용하여 여러 작업자 프로세스를 시작하며, 이들은 모든 가용 작업자에 걸쳐 커넥터와 태스크의 실행을 조정합니다. 작업자를 추가하거나, 작업자를 종료하거나, 작업자가 예기치 않게 실패하면, 나머지 작업자들은 이를 인식하고, 업데이트된 가용 작업자 집합에 따라 커넥터와 태스크를 재분배하도록 조정합니다. 이는 소비자 그룹의 리밸런싱과 유사합니다. 내부적으로, Connect 작업자들은 소비자 그룹을 사용하여 조정하고 리밸런싱을 수행합니다.
모든 작업자가 동일한 group.id를 가지면 동일한 Connect 클러스터에 속하게 됩니다. 예를 들어, 작업자 A가 group.id=connect-cluster-a를 가지고 있고, 작업자 B가 동일한 group.id를 가지고 있으면, 작업자 A와 작업자 B는 connect-cluster-a
라는 클러스터를 형성합니다.
컨버터 (Converters)
컨버터는 Kafka로 읽고 쓰는 동안 특정 데이터 형식을 지원하기 위해 Kafka Connect 배포에 필요합니다. 태스크는 데이터를 바이트에서 Connect 내부 데이터 형식으로, 또는 그 반대로 변환하기 위해 컨버터를 사용합니다.
기본적으로, Confluent Platform은 다음과 같은 컨버터를 제공합니다:
- AvroConverter (
io.confluent.connect.avro.AvroConverter
): Schema Registry와 함께 사용 - ProtobufConverter (
io.confluent.connect.protobuf.ProtobufConverter
): Schema Registry와 함께 사용 - JsonSchemaConverter (
io.confluent.connect.json.JsonSchemaConverter
): Schema Registry와 함께 사용 - JsonConverter (
org.apache.kafka.connect.json.JsonConverter
): 구조화된 데이터에 사용, Schema Registry 없이 사용 - StringConverter (
org.apache.kafka.connect.storage.StringConverter
): 간단한 문자열 형식 - ByteArrayConverter (
org.apache.kafka.connect.converters.ByteArrayConverter
): 변환을 하지 않는 "패스스루" 옵션 제공
컨버터는 커넥터와 분리되어 있어, 커넥터 간에 컨버터를 재사용할 수 있습니다. 예를 들어, 동일한 Avro 컨버터를 사용하여 JDBC 소스 커넥터가 Avro 데이터를 Kafka에 쓰고, HDFS 싱크 커넥터가 Kafka에서 Avro 데이터를 읽을 수 있습니다. 즉, 동일한 컨버터를 사용하여 JDBC 소스가 ResultSet을 반환하여 HDFS에 parquet 파일로 최종 기록되는 경우에도 동일한 컨버터를 사용할 수 있습니다.
다음 그래픽은 JDBC 소스 커넥터를 사용하여 데이터베이스에서 읽고, Kafka에 쓰고, 마지막으로 HDFS 싱크 커넥터를 사용하여 HDFS에 쓰는 동안 컨버터가 어떻게 사용되는지 보여줍니다.
Connect에서 사용할 수 있는 내장 원시 컨버터는 다음과 같습니다:
- DoubleConverter (
org.apache.kafka.connect.converters.DoubleConverter
): DOUBLE 값을 직렬화하고 역직렬화합니다. 바이트에서 Connect 형식으로 변환할 때, 컨버터는 선택적 FLOAT64 스키마를 반환합니다. - FloatConverter (
org.apache.kafka.connect.converters.FloatConverter
): FLOAT 값을 직렬화하고 역직렬화합니다. 바이트에서 Connect 형식으로 변환할 때, 컨버터는 선택적 FLOAT32 스키마를 반환합니다. - IntegerConverter (
org.apache.kafka.connect.converters.IntegerConverter
): INTEGER 값을 직렬화하고 역직렬화합니다. 바이트에서 Connect 형식으로 변환할 때, 컨버터는 선택적 INT32 스키마를 반환합니다. - LongConverter (
org.apache.kafka.connect.converters.LongConverter
): LONG 값을 직렬화하고 역직렬화합니다. 바이트에서 Connect 형식으로 변환할 때, 컨버터는 선택적 INT64 스키마를 반환합니다. - ShortConverter (
org.apache.kafka.connect.converters.ShortConverter
): SHORT 값을 직렬화하고 역직렬화합니다. 바이트에서 Connect 형식으로 변환할 때, 컨버터는 선택적 INT16 스키마를 반환합니다.
변환 (Transforms)
커넥터는 개별 메시지에 대해 간단하고 가벼운 수정을 할 수 있도록 변환을 구성할 수 있습니다. 이는 사소한 데이터 조정 및 이벤트 라우팅에 편리하며, 여러 변환을 커넥터 구성에 연쇄적으로 연결할 수 있습니다. 그러나 더 복잡한 변환과 많은 메시지에 적용되는 작업은 Confluent Platform의 ksqlDB와 Kafka Streams를 사용하여 구현하는 것이 가장 좋습니다.
변환은 하나의 레코드를 입력으로 받아 수정된 레코드를 출력하는 간단한 함수입니다. Kafka Connect가 제공하는 모든 변환은 간단하지만 유용한 수정 작업을 수행합니다. 사용자 정의 논리로 Transformation 인터페이스를 구현하여 이를 Kafka Connect 플러그인으로 패키징하고, 모든 커넥터와 함께 사용할 수 있습니다.
소스 커넥터와의 변환 사용
소스 커넥터와 함께 변환을 사용할 때, Kafka Connect는 커넥터가 생성한 각 소스 레코드를 첫 번째 변환에 전달합니다. 변환은 수정 작업을 수행하고 새로운 소스 레코드를 출력합니다. 이 업데이트된 소스 레코드는 체인의 다음 변환으로 전달되어 새로운 수정된 소스 레코드를 생성합니다. 이러한 과정은 남은 모든 변환에 대해 계속됩니다. 최종적으로 업데이트된 소스 레코드는 이진 형식으로 변환되어 Kafka에 기록됩니다.
싱크 커넥터와의 변환 사용
변환은 싱크 커넥터와도 함께 사용할 수 있습니다. Kafka Connect는 Kafka에서 메시지를 읽고 이진 표현을 싱크 레코드로 변환합니다. 변환이 있는 경우, Kafka Connect는 레코드를 첫 번째 변환에 전달하여 수정 작업을 수행하고 새로운, 업데이트된 싱크 레코드를 출력합니다. 업데이트된 싱크 레코드는 체인의 다음 변환으로 전달되어 새로운 싱크 레코드를 생성합니다. 이러한 과정은 남은 모든 변환에 대해 계속되며, 최종적으로 업데이트된 싱크 레코드는 싱크 커넥터로 전달되어 처리됩니다.
데드 레터 큐 (Dead Letter Queue)
데드 레터 큐(DLQ)는 싱크 커넥터에만 적용됩니다. Confluent Cloud 싱크 커넥터의 경우 DLQ 주제가 자동으로 생성됩니다.
유효하지 않은 레코드는 여러 이유로 발생할 수 있습니다. 예를 들어, 레코드가 JSON 형식으로 직렬화되어 싱크 커넥터에 도착했는데, 싱크 커넥터 구성은 Avro 형식을 기대하는 경우가 있습니다. 유효하지 않은 레코드를 싱크 커넥터가 처리할 수 없는 경우, 오류는 커넥터의 errors.tolerance
구성 속성에 따라 처리됩니다.
errors.tolerance
의 유효한 값은 두 가지입니다:
- none (기본값)
- all
errors.tolerance
가 none으로 설정된 경우, 오류나 유효하지 않은 레코드가 발생하면 커넥터 태스크가 즉시 실패하고 커넥터는 실패 상태로 전환됩니다. 이 문제를 해결하려면 Kafka Connect 작업자 로그를 검토하고 다음을 수행해야 합니다:
- 실패 원인을 조사합니다.
- 문제를 해결합니다.
- 커넥터를 재시작합니다.
errors.tolerance
가 all로 설정된 경우, 모든 오류나 유효하지 않은 레코드가 무시되고 처리가 계속됩니다. 오류는 Connect 작업자 로그에 기록되지 않습니다. 레코드가 실패했는지 확인하려면 내부 메트릭을 사용하거나 소스의 레코드 수를 계산하고 처리된 레코드 수와 비교해야 합니다.
오류 처리 기능은 모든 유효하지 않은 레코드를 특수 토픽으로 라우팅하고 오류를 보고합니다. 이 주제에는 싱크 커넥터가 처리할 수 없는 레코드의 DLQ가 포함됩니다.
데드 레터 큐 주제 생성
DLQ를 생성하려면 다음 구성 속성을 싱크 커넥터 구성에 추가합니다:
errors.tolerance = all
errors.deadletterqueue.topic.name = <dead-letter-topic-name>
다음 예는 DLQ가 활성화된 GCS 싱크 커넥터 구성을 보여줍니다:
{
"name": "gcs-sink-01",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "<my-gcs-bucket>",
"gcs.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-gcs-sink-01"
}
}
DLQ 주제에 실패한 레코드가 포함되어 있어도 그 이유를 보여주지는 않습니다. 실패한 레코드 헤더 정보를 포함하려면 다음 구성 속성을 추가할 수 있습니다:
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.context.headers.enable
매개 변수가 true로 설정되면 레코드 헤더가 DLQ에 추가됩니다(기본값은 false입니다). 그런 다음 Confluent Platform의 kcat(이전의 kafkacat) 유틸리티를 사용하여 레코드 헤더를 확인하고 레코드가 실패한 이유를 파악할 수 있습니다. 오류는 Connect Reporter에도 전송됩니다. 원본 레코드 헤더와의 충돌을 피하기 위해 DLQ 컨텍스트 헤더 키는 _connect.errors
로 시작합니다.
헤더가 활성화된 동일한 예제 구성은 다음과 같습니다:
{
"name": "gcs-sink-01",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "<my-gcs-bucket>",
"gcs.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-gcs-sink-01",
"errors.deadletterqueue.context.headers.enable": true
}
}