Kafka to VastDB with Trino Lookup

Kafka to VastDB with Trino Lookup#

Download Kafka to your client

wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar -xzf kafka_2.12-2.8.0.tgz
cd kafka_2.12-2.8.0/bin

From one shell prompt create a Topic and start a producer . This producer will stream all system messages into the topic. Please note this will hang the terminal unless you background it and the IP are the Vast VIP Ip’s with the kafka view associated with that view

./kafka-topics.sh --create --topic leon --bootstrap-server 172.200.202.9:9092  --partitions 1
journalctl -n all -f | ./kafka-console-producer.sh --topic leon --bootstrap-server 172.200.202.9:9092,172.200.202.10:9092,172.200.202.11:9092,172.200.202.12:9092

Open a new shell to your client

Create a file called vast.properties with the following properties. please note you will need to change the data_endpoints endpoint access_key_id secret_access_key to values on your cluster. The user with the access/secret key must be an identity policy that has query access to the vast system

## Update these fields ##
# "endpoint" should be a load-balanced DNS entry or one of the VIPs prefixed by "http://"
# it should not contain a trailing / or anything else.
endpoint=http://172.200.202.9
 
# "data_endpoints" should be be a load-balanced DNS entry or one or more of the VIPs
# prefixed by "http://" it should not contain a trailing / or anything else.
# Multiple VIPs can be used with commas between them, eg: http://x.x.x.x,http://y.y.y.y
data_endpoints=http://172.200.202.9
 
# Access and secret keys -- make sure the user was added to an identity policy
# granting them access to the catalog.
access_key_id=220IKZ22SDDQ4C341Z4R
secret_access_key=80lcqf6IxeqSUuNfNwuSpIbveetwslaHrhNoU56A
 
## Don't change these fields ##
connector.name=vast
region=us-east-1
 
num_of_splits=32
num_of_subsplits=8
 
vast.http-client.request-timeout=60m
vast.http-client.idle-timeout=60m
 
enable_custom_schema_separator=true
custom_schema_separator=|

Start up trino in docker . this version is for vast 5.1 and above

docker run \
	--name trino \
	-p 8080:8080 -d \
	-v ./vast.properties:/etc/trino/catalog/vast.properties:ro \
	--platform linux/amd64 \
	vastdataorg/trino-vast:429

Jump into the trino instance.

docker exec -it trino trino

List the schemas. is you cant see the kafka broker and other tables you have done something wrong with identity policies or the list bucket option in the view policies. If in doubt ask a friend.

 SHOW SCHEMAS FROM vast;

Outputs:

                	Schema
-------------------------------------------------
csnow-db|vts
information_schema
insight-engine-database|langchain
kafka-broker|kafka_topics
leon|leon
system_schema
vast-audit-log-bucket|vast_audit_log_schema
vast-big-catalog-bucket|vast_big_catalog_schema
(8 rows)

show the first message in the leon topic. its a blob object and encoded. i.e. doesnt mean anything

trino> select * from vast."kafka-broker|kafka_topics".leon limit 1;

Outputs:

key  |                  	value                  	|	timestamp_broker 	|   timestamp_producer	| headers
------+-------------------------------------------------+-------------------------+-------------------------+---------
NULL | 2d 2d 20 4c 6f 67 73 20 62 65 67 69 6e 20 61 74 | 2025-03-21 08:11:11.352 | 2025-03-21 08:11:11.307 | []
  	| 20 53 61 74 20 32 30 32 35 2d 30 31 2d 32 35 20 |                     	|                     	|
  	| 30 34 3a 34 39 3a 33 34 20 55 54 43 2e 20 2d 2d |                     	|                     	|
(1 row)

Decode this message so we can read it. (as long as the message is text)

trino> select from_utf8(value) AS decoded_json from vast."kafka-broker|kafka_topics".leon limit 5;

Outputs:

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
-- Logs begin at Sat 2025-01-25 04:49:34 UTC. --                                                                                                                                  	>
Jan 25 04:49:34 cosmos-var-cb3-c1 kernel: Linux version 4.18.0-348.23.1.el8_5.x86_64 (mockbuild@dal1-prod-builder001.bld.equ.rockylinux.org) (gcc version 8.5.0 20210514 (Red Hat 8.5.>
Jan 25 04:49:34 cosmos-var-cb3-c1 kernel: Command line: BOOT_IMAGE=(hd0,msdos1)/vmlinuz-4.18.0-348.23.1.el8_5.x86_64 root=/dev/mapper/root_vg01-lv_01 ro panic=60 log_buf_len=10M nvme>
Jan 25 04:49:34 cosmos-var-cb3-c1 kernel: x86/split lock detection: disabled                                                                                                      	>
Jan 25 04:49:34 cosmos-var-cb3-c1 kernel: x86/fpu: Supporting XSAVE feature 0x001: 'x87 floating point registers'                                                                 	>
(5 rows)

Something more complex. Keyword Frequency Distribution Query. how many warns and errors on the topic.

 WITH decoded_lines AS (
  SELECT
    line
  FROM
    vast."kafka-broker|kafka_topics".leon,
	UNNEST(regexp_split(from_utf8(value), '\n')) AS t(line)
),
tagged_lines AS (
  SELECT
    CASE
      WHEN regexp_like(line, '(?i)error') THEN 'ERROR'
  	WHEN regexp_like(line, '(?i)warn') THEN 'WARN'
  	WHEN regexp_like(line, '(?i)critical') THEN 'CRITICAL'
  	ELSE 'INFO'
	END AS level
  FROM
    decoded_lines
  WHERE
    regexp_like(line, '(?i)(error|warn|critical)')
)
SELECT
  level,
  COUNT(*) AS occurrences
FROM
  tagged_lines
GROUP BY
  level
ORDER BY
  occurrences DESC;

Outputs:

 level | occurrences
-------+-------------
WARN  |     	128
ERROR |      	57
(2 rows)

Decodes the Kafka blobs. Splits each blob into individual lines. Searches each line for error, warn, or critical (case-insensitive). Extracts the timestamp, keyword hit, and displays the matching line.

WITH decoded_lines AS (
  SELECT
	timestamp_broker,
	line
  FROM
	vast."kafka-broker|kafka_topics".leon,
	UNNEST(regexp_split(from_utf8(value), '\n')) AS t(line)
),
filtered_lines AS (
  SELECT
	timestamp_broker,
	line,
	CASE
  	WHEN regexp_like(line, '(?i)error') THEN 'ERROR'
  	WHEN regexp_like(line, '(?i)warn') THEN 'WARN'
  	WHEN regexp_like(line, '(?i)critical') THEN 'CRITICAL'
  	ELSE 'INFO'
	END AS level
  FROM
	decoded_lines
  WHERE
	regexp_like(line, '(?i)(error|warn|critical)')
)
SELECT
  timestamp_broker,
  level,
  line AS message
FROM
  filtered_lines
ORDER BY
  timestamp_broker DESC
LIMIT 5;

Outputs:

	timestamp_broker 	| level |                                                                                                              	                                   >
-------------------------+-------+----------------------------------------------------------------------------------------------------------------------------------------------------->
2025-03-21 08:11:20.564 | ERROR | Mar 13 17:02:36 cosmos-var-cb3-c1 dockerd[2545]: time="2025-03-13T17:02:36.677268080Z" level=error msg="Handler for POST /v1.45/containers/e40125ff6>
2025-03-21 08:11:20.392 | WARN  | Mar 11 10:00:36 cosmos-var-cb3-c1 systemd[1]: run-docker-runtime\x2drunc-moby-9fbc20cd5ae833a8dcff6a8b01064ecad8448102df8c5d2022cdcd927e418e23-runc.>
2025-03-21 08:11:19.822 | ERROR | Mar 05 15:30:33 cosmos-var-cb3-c1 dnf[1850333]:   - Curl error (7): Couldn't connect to server for https://packages.microsoft.com/rhel/8/prod/repoda>
2025-03-21 08:11:19.822 | ERROR | Mar 05 15:30:33 cosmos-var-cb3-c1 dnf[1850333]: Error: Failed to download metadata for repo 'packages-microsoft-com-prod': Cannot download repomd.xm>
2025-03-21 08:11:19.822 | ERROR | Mar 05 15:30:33 cosmos-var-cb3-c1 dnf[1850333]: Errors during downloading metadata for repository 'packages-microsoft-com-prod':                	>
(5 rows)