Kafka Connect to Vast S3#

Kafka Connect is an open-source framework that enables scalable and reliable streaming of data between Apache Kafka and other data systems. It provides a standardized way to build connectors that move large datasets in and out of Kafka, allowing for easy integration with databases, key-value stores, search indexes, and file systems.

Installation#

Prerequisites#

  • Vast cluster

  • View (s3)

  • S3 bucket (only for kafka connect)

  • S3 credentials (only for kafka connect)

  • Host

    • Docker

    • docker-compose

    • python3.9

Setting up Kafka (including ZooKeeper) and KafkaConnect#

Put this docker-compose.yml file in your client (If you are not interested in kafka conenct you can drop this container)

version: '3'  
services:  
  zookeeper:  
    image: confluentinc/cp-zookeeper:6.2.0  
    container_name: zookeeper  
    environment:  
      ZOOKEEPER_CLIENT_PORT: 2181  
      ZOOKEEPER_TICK_TIME: 2000

  kafka1:  
    image: confluentinc/cp-kafka:6.2.0  
    container_name: kafka1  
    depends_on:  
      - zookeeper  
    ports:  
      - "9092:9092"  
    environment:  
      KAFKA_BROKER_ID: 1  
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://${HOST_IP}:9092  
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT  
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3

  kafka2:  
    image: confluentinc/cp-kafka:6.2.0  
    container_name: kafka2  
    depends_on:  
      - zookeeper  
    ports:  
      - "9093:9093"  
    environment:  
      KAFKA_BROKER_ID: 2  
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://${HOST_IP}:9093  
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT  
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3

  kafka3:  
    image: confluentinc/cp-kafka:6.2.0  
    container_name: kafka3  
    depends_on:  
      - zookeeper  
    ports:  
      - "9094:9094"  
    environment:  
      KAFKA_BROKER_ID: 3  
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://${HOST_IP}:9094  
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT  
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3

  kafka-connect:  
    image: confluentinc/cp-kafka-connect:6.2.0  
    container_name: kafka-connect  
    depends_on:  
      - kafka1  
      - kafka2  
      - kafka3  
      - minio  
    ports:  
      - "8083:8083"  
    environment:  
      CONNECT_BOOTSTRAP_SERVERS: kafka1:29092,kafka2:29093,kafka3:29094  
      CONNECT_REST_PORT: 8083  
      CONNECT_GROUP_ID: compose-connect-group  
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs  
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets  
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status  
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter  
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter  
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter  
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter  
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect  
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO  
      CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR  
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3  
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3  
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3  
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components  
    command:  
      - bash  
      - -c  
      - |  
        confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest  
        /etc/confluent/docker/run
  • Keep an eye on CONNECT_GROUP_ID: compose-connect-group, this is the consumer group and it is defined at the worker level (not the same as we declare every time in consume request)

Run the following docker compose command (in the same directory as the docker-compose.yml file)

export HOST_IP=$(hostname -I | awk '{print $1}')
docker-compose up -d

Kafka#

We will use simple kafka producer and consumer (python).

Before running you should install python package:

python3.9 -m pip install kafka-python
  • Of course this command might be changed depends on your env

Creating new topic#

You can skip this one as topics being created automatically while producing messages to non-exists topic. But, with kafka.admin you can apply partitions and replications.

Creating new topic

from kafka.admin import KafkaAdminClient, NewTopic

# Kafka Admin Configuration  
admin_client = KafkaAdminClient(  
    bootstrap_servers=['<YOUR_CLIENT_IP>:9092','<YOUR_CLIENT_IP>:9093','<YOUR_CLIENT_IP>:9094'],  
    client_id='python-admin-client'  
)

# Define a new topic with partitions and replication factor  
topic_name = "my-new-topic"  
num_partitions = <YOUR_NUMBER_OF_PARTITONS>  
replication_factor = <YOUR_NUMBER_OF_REPLICATION>

topic = NewTopic(  
    name=topic_name,  
    num_partitions=num_partitions,  
    replication_factor=replication_factor  
)

try:  
    # Create the topic  
    admin_client.create_topics(new_topics=[topic], validate_only=False)  
    print(f"Topic '{topic_name}' created successfully with {num_partitions} partitions and replication factor {replication_factor}.")  
except Exception as e:  
    print(f"Error creating topic: {e}")  
finally:  
    admin_client.close()Type or paste something here to turn it into an excerpt.

Produce messages to topic#

from kafka import KafkaProducer  
import json  

num_of_messages = # Complete - the amount of messages to topic (can be also 1)  
data = {"name": "something"}  
producer = KafkaProducer(bootstrap_servers='<YOUR_CLIENT_IP>:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))  

for i in range (1,num_of_messages):  
    producer.send('<YOUR_TOPIC_NAME>', data)
    
producer.flush()

print("Message sent to Kafka topic")

Now the messages are in the topic and waiting to be read

Consume messages from topic#

from kafka import KafkaConsumer

consumer = KafkaConsumer(  
    'new-topic',  
    bootstrap_servers='<YOUR_CLIENT_IP>:9092',  
    auto_offset_reset='earliest',  
    enable_auto_commit=True,  
    group_id='<YOUR_CUNSUMER_GROUP>',  
    session_timeout_ms=60000,  
    heartbeat_interval_ms=20000  
)

for message in consumer:  
    print(f"Received message: {message.value.decode('utf-8')}")

This code will run in infinite loop and print your Messages

Kafka connect#

Sanity check#

You can review your kafka connect modules with

curl -X GET http://localhost:8083/connector-plugins

You should get a list in return, one of them should be the s3 (the first item in my response)

[  
    {  
        "class": "io.confluent.connect.s3.S3SinkConnector",  
        "type": "sink",  
        "version": "10.5.17"  
    },  
    {  
        "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",  
        "type": "source",  
        "version": "6.2.0-ccs"  
    },  
    {  
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",  
        "type": "sink",  
        "version": "6.2.0-ccs"  
    },  
    {  
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",  
        "type": "source",  
        "version": "6.2.0-ccs"  
    },  
    {  
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",  
        "type": "source",  
        "version": "1"  
    },  
    {  
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",  
        "type": "source",  
        "version": "1"  
    },  
    {  
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",  
        "type": "source",  
        "version": "1"  
    }  
]

Create your s3 sink instance#

Make configuration file (you can call it “vast.json” like I did)

Config file#

vast.json

{  
  "name": "<YOUR_CONNECTOR_NAME>",  
  "config": {  
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",  
    "tasks.max": "1",  
    "topics": "<YOUR_TOPIC_NAME>",  
    "s3.bucket.name": "<YOUR_BUCKET_NAME>",  
    "s3.region": "us-east-1",  
    "s3.part.size": "5242880",  
    "flush.size": "1000",  
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",  
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",  
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",  
    "schema.compatibility": "NONE",  
    "store.url": "<http://<YOUR_ENDPOINT_URL>",  
    "aws.access.key.id": "<YOUR_KEY_ID>",  
    "aws.secret.access.key": "<YOUR_ACCESS_KEY>",  
    "s3.path.style.access": "true",  
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",  
    "key.converter.schemas.enable": "false",  
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
    "value.converter.schemas.enable": "false"  
  }  
}

I disabled the schema because I wanted more flexible schemas but you can change it as you like.

Critical parameters#

  • flush.size - Defines the number of records that must be accumulated before the connector writes a batch to S3.

  • rotate.interval.ms - Defines the maximum amount of time, in milliseconds, before a batch is written to S3, regardless of the number of records accumulated.

  • region - is not real but need to be supplied

Launch the following http command

curl -X POST -H "Content-Type: application/json" --data @vast.json http://localhost:8083/connectors

Let’s ensure the connector is running

curl -X GET http://localhost:8083/connectors/<YOUR_CONNECTOR_NAME>/status

The response should be:

{  
    "name": "<YOUR_CONNECTOR_NAME>",  
    "connector": {  
        "state": "RUNNING",  
        "worker_id": "kafka-connect:8083"  
    },  
    "tasks": [  
        {  
            "id": 0,  
            "state": "RUNNING",  
            "worker_id": "kafka-connect:8083"  
        }  
    ],  
    "type": "sink"  
}

At this point the connector is running and listening to the target topic. Let’s examine the logic.

Running scenario#

We want to write messages to the target topic and see them in our s3 bucket.

  • Use the above producer to write messages, let’s set it to number higher than 1000 as it the parameter that was declared in the configuration ("flush.size": "1000"), so it would be written to s3 immediately.

Once the topic reached the treshold of messages the kafka connect should start working

Kafka Connect in action#

At this point, that data should be written to your bucket.

Let’s examine kafka-connect logs:

docker-compose logs kafka-connect

At the end of the line (in case you haven’t done further actions) you should see:

kafka-connect    | [2024-11-19 11:22:14,015] INFO Assigned topic partitions: [your-topic-name-3-0] (io.confluent.connect.s3.S3SinkTask)  
kafka-connect    | [2024-11-19 11:23:06,941] INFO Creating S3 output stream. (io.confluent.connect.s3.storage.S3Storage)  
kafka-connect    | [2024-11-19 11:23:06,942] INFO Create S3OutputStream for bucket 'temp-db' key 'topics/your-topic-name-3/partition=0/your-topic-name-3+0+0000000000.json' (io.confluent.connect.s3.storage.S3OutputStream)  
kafka-connect    | [2024-11-19 11:23:06,955] INFO Starting commit and rotation for topic partition your-topic-name-3-0 with start offset {partition=0=0} (io.confluent.connect.s3.TopicPartitionWriter)  
kafka-connect    | [2024-11-19 11:23:06,992] INFO Files committed to S3. Target commit offset for your-topic-name-3-0 is 1000 (io.confluent.connect.s3.TopicPartitionWriter)  
kafka-connect    | [2024-11-19 11:23:06,992] INFO Creating S3 output stream. (io.confluent.connect.s3.storage.S3Storage)  
kafka-connect    | [2024-11-19 11:23:06,994] INFO Create S3OutputStream for bucket 'temp-db' key 'topics/your-topic-name-3/partition=0/your-topic-name-3+0+0000001000.json' (io.confluent.connect.s3.storage.S3OutputStream)
  • Of course this log might be change according to your parameters

At this point you should see the files in your s3 bucket

Verify files are on s3#

S3 listing#

Make configuration file

vast.s3cfg

access_key = #  
secret_key = #  
host_base = #  
host_bucket = #  
use_https = #

Run the following command

s3cmd --config vast.s3cfg ls s3://<YOUR_BUCKET_NAME>/path/to/your/folder

You should get in response all files written to s3.

You can also download the folder with

s3cmd --config vast.s3cfg get s3://<YOUR_BUCKET_NAME>/path/to/your/folder

And explore the files