Kafka Source Connect
Kafka Connect는 아파치 카프카와 다른 시스템간에 데이터를 확장 하고, 안전한 방법으로 스트리밍하기 위한 도구이입니다. Kafka Source Connect는 특히 데이터 소스로부터 실시간으로 데이터를 읽어 Kafka 토픽으로 전송하는 역할을 합니다. 이는 데이터베이스, 로그 파일, 메시지 큐와 같은 다양한 데이터 소스를 지원하며, 데이터 파이프라인 구축과 데이터 통합 프로젝트에 필수적인 컴포넌트입니다.
Kafka 에 대한 자세한 설명은 관련한 전문 서적과 다른 블로그 자료들이 많으니
이번 글에서는 Kafka Connect 에 대한 설명보다는
기동 및 사용을 위한 방법에 대해 알아보겠습니다.
( Standalone, Distributed 모드 중 Distributed 모드로 진행. )
■ 설치 ( install )
kafka를 설치하면 기본적으로 포함되어 있습니다.
■ 설정 ( property )
1. kafka connect 설정을 위해 아래 파일을 수정합니다.
path : /usr/local/kafka/config
filename : connect-distributed.properties
2. connect-distributed.properties 수정
- plugin이 저장될 폴더를 지정합니다.
# bootstrap 서버 설정
bootstrap.servers={kafkabrokerIP}:{port} ...
# plugin 설정
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
별도로 http 포트를 바꾸거나, 디테일한 설정을 하지 않는다면 위처럼 두개 옵션만 변경해주면 됩니다.
3. connect library를 다운 받아서, 정의한 plugin 폴더에 넣습니다. ( 본 예제는 mysql )
download link : Debezium공식바로가기
4. 다운받은 .tar.gz 파일을 plugin 폴더에 압축해제 합니다.
tar -zxvf debezium-connector-mysql-2.4.1.Final-plugin.tar.gz
■ 실행 ( sh )
- kafka 가 기동되어 있다면, connect 실행 명령어로 실행합니다.
# kafka connect 실행
./bin/connect-distributed.sh -daemon ./config/connect-distributed.properties
■ Connect 등록 ( curl or call API )
■ Connect 확인 ( curl or call API )
# kafka connect 실행 확인
curl -s "http://{ip}:8083"
{"version":"3.4.1","commit":"8a516edc2755df89","kafka_cluster_id":"ljnNv_CtTwKxlsfllEEdXA"}
# 사용가능 플러그인 확인
curl -X GET -s "http://{ip}:8083/connector-plugins"
[{"class":"io.confluent.connect.jdbc.JdbcSinkConnector","type":"sink","version":"10.7.4"},{"class":"io.confluent.connect.jdbc.JdbcSourceConnector","type":"source","version":"10.7.4"},{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"2.4.1.Final"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"3.4.1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"3.4.1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"3.4.1"}]
# 실행된 kafka connector 확인
curl -X GET -s "http://{ip}:8083/connectors"
["mysql-source-connect-1"]
# kafka connector 설정확인
curl -X GET -s "http://{ip}:8083/connectors/mysql-source-connect-1/config"
# kafka connector 상태확인
curl -X GET -s "http://{ip}:8083/connectors/mysql-source-connect-1/status"
# kafka connector 삭제
curl -X DELETE -s "http://{ip}:8083/connectors/mysql-source-connect-1"
■ 결과
- http://{}:8083 을 입력시
정상적으로 등록된 Connector 리스트를 확인 가능합니다.
'IT-Infra > Kafka' 카테고리의 다른 글
kafka - 컨슈머와 컨슈머 그룹 (0) | 2023.12.31 |
---|