Kafka Connect to Vast S3#
Kafka Connect is an open-source framework that enables scalable and reliable streaming of data between Apache Kafka and other data systems. It provides a standardized way to build connectors that move large datasets in and out of Kafka, allowing for easy integration with databases, key-value stores, search indexes, and file systems.
Installation#
Prerequisites#
Vast cluster
View (s3)
S3 bucket (only for kafka connect)
S3 credentials (only for kafka connect)
Host
Docker
docker-compose
python3.9
Setting up Kafka (including ZooKeeper) and KafkaConnect#
Put this docker-compose.yml file in your client (If you are not interested in kafka conenct you can drop this container)
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka1:
image: confluentinc/cp-kafka:6.2.0
container_name: kafka1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://${HOST_IP}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
kafka2:
image: confluentinc/cp-kafka:6.2.0
container_name: kafka2
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://${HOST_IP}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
kafka3:
image: confluentinc/cp-kafka:6.2.0
container_name: kafka3
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://${HOST_IP}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
kafka-connect:
image: confluentinc/cp-kafka-connect:6.2.0
container_name: kafka-connect
depends_on:
- kafka1
- kafka2
- kafka3
- minio
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka1:29092,kafka2:29093,kafka3:29094
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
command:
- bash
- -c
- |
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
/etc/confluent/docker/run
Keep an eye on
CONNECT_GROUP_ID: compose-connect-group
, this is the consumer group and it is defined at the worker level (not the same as we declare every time in consume request)
Run the following docker compose command (in the same directory as the docker-compose.yml file)
export HOST_IP=$(hostname -I | awk '{print $1}')
docker-compose up -d
Kafka#
We will use simple kafka producer and consumer (python).
Before running you should install python package:
python3.9 -m pip install kafka-python
Of course this command might be changed depends on your env
Creating new topic#
You can skip this one as topics being created automatically while producing messages to non-exists topic. But, with kafka.admin you can apply partitions and replications.
Creating new topic
from kafka.admin import KafkaAdminClient, NewTopic
# Kafka Admin Configuration
admin_client = KafkaAdminClient(
bootstrap_servers=['<YOUR_CLIENT_IP>:9092','<YOUR_CLIENT_IP>:9093','<YOUR_CLIENT_IP>:9094'],
client_id='python-admin-client'
)
# Define a new topic with partitions and replication factor
topic_name = "my-new-topic"
num_partitions = <YOUR_NUMBER_OF_PARTITONS>
replication_factor = <YOUR_NUMBER_OF_REPLICATION>
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor
)
try:
# Create the topic
admin_client.create_topics(new_topics=[topic], validate_only=False)
print(f"Topic '{topic_name}' created successfully with {num_partitions} partitions and replication factor {replication_factor}.")
except Exception as e:
print(f"Error creating topic: {e}")
finally:
admin_client.close()Type or paste something here to turn it into an excerpt.
Produce messages to topic#
from kafka import KafkaProducer
import json
num_of_messages = # Complete - the amount of messages to topic (can be also 1)
data = {"name": "something"}
producer = KafkaProducer(bootstrap_servers='<YOUR_CLIENT_IP>:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range (1,num_of_messages):
producer.send('<YOUR_TOPIC_NAME>', data)
producer.flush()
print("Message sent to Kafka topic")
Now the messages are in the topic and waiting to be read
Consume messages from topic#
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'new-topic',
bootstrap_servers='<YOUR_CLIENT_IP>:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='<YOUR_CUNSUMER_GROUP>',
session_timeout_ms=60000,
heartbeat_interval_ms=20000
)
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
This code will run in infinite loop and print your Messages
Kafka connect#
Sanity check#
You can review your kafka connect modules with
curl -X GET http://localhost:8083/connector-plugins
You should get a list in return, one of them should be the s3 (the first item in my response)
[
{
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "10.5.17"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "6.2.0-ccs"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "6.2.0-ccs"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "6.2.0-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
Create your s3 sink instance#
Make configuration file (you can call it “vast.json” like I did)
Config file#
vast.json
{
"name": "<YOUR_CONNECTOR_NAME>",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "<YOUR_TOPIC_NAME>",
"s3.bucket.name": "<YOUR_BUCKET_NAME>",
"s3.region": "us-east-1",
"s3.part.size": "5242880",
"flush.size": "1000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"store.url": "<http://<YOUR_ENDPOINT_URL>",
"aws.access.key.id": "<YOUR_KEY_ID>",
"aws.secret.access.key": "<YOUR_ACCESS_KEY>",
"s3.path.style.access": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
I disabled the schema because I wanted more flexible schemas but you can change it as you like.
Critical parameters#
flush.size
- Defines the number of records that must be accumulated before the connector writes a batch to S3.rotate.interval.ms
- Defines the maximum amount of time, in milliseconds, before a batch is written to S3, regardless of the number of records accumulated.region
- is not real but need to be supplied
Launch the following http command
curl -X POST -H "Content-Type: application/json" --data @vast.json http://localhost:8083/connectors
Let’s ensure the connector is running
curl -X GET http://localhost:8083/connectors/<YOUR_CONNECTOR_NAME>/status
The response should be:
{
"name": "<YOUR_CONNECTOR_NAME>",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
}
],
"type": "sink"
}
At this point the connector is running and listening to the target topic. Let’s examine the logic.
Running scenario#
We want to write messages to the target topic and see them in our s3 bucket.
Use the above producer to write messages, let’s set it to number higher than 1000 as it the parameter that was declared in the configuration (
"flush.size": "1000"
), so it would be written to s3 immediately.
Once the topic reached the treshold of messages the kafka connect should start working
Kafka Connect in action#
At this point, that data should be written to your bucket.
Let’s examine kafka-connect logs:
docker-compose logs kafka-connect
At the end of the line (in case you haven’t done further actions) you should see:
kafka-connect | [2024-11-19 11:22:14,015] INFO Assigned topic partitions: [your-topic-name-3-0] (io.confluent.connect.s3.S3SinkTask)
kafka-connect | [2024-11-19 11:23:06,941] INFO Creating S3 output stream. (io.confluent.connect.s3.storage.S3Storage)
kafka-connect | [2024-11-19 11:23:06,942] INFO Create S3OutputStream for bucket 'temp-db' key 'topics/your-topic-name-3/partition=0/your-topic-name-3+0+0000000000.json' (io.confluent.connect.s3.storage.S3OutputStream)
kafka-connect | [2024-11-19 11:23:06,955] INFO Starting commit and rotation for topic partition your-topic-name-3-0 with start offset {partition=0=0} (io.confluent.connect.s3.TopicPartitionWriter)
kafka-connect | [2024-11-19 11:23:06,992] INFO Files committed to S3. Target commit offset for your-topic-name-3-0 is 1000 (io.confluent.connect.s3.TopicPartitionWriter)
kafka-connect | [2024-11-19 11:23:06,992] INFO Creating S3 output stream. (io.confluent.connect.s3.storage.S3Storage)
kafka-connect | [2024-11-19 11:23:06,994] INFO Create S3OutputStream for bucket 'temp-db' key 'topics/your-topic-name-3/partition=0/your-topic-name-3+0+0000001000.json' (io.confluent.connect.s3.storage.S3OutputStream)
Of course this log might be change according to your parameters
At this point you should see the files in your s3 bucket
Verify files are on s3#
S3 listing#
Make configuration file
vast.s3cfg
access_key = #
secret_key = #
host_base = #
host_bucket = #
use_https = #
Run the following command
s3cmd --config vast.s3cfg ls s3://<YOUR_BUCKET_NAME>/path/to/your/folder
You should get in response all files written to s3.
You can also download the folder with
s3cmd --config vast.s3cfg get s3://<YOUR_BUCKET_NAME>/path/to/your/folder
And explore the files