Welcome to Salem Houali ‘s Oracle Developer Notes

Next‑Gen Ingestion Pipelines: Oracle Database 26ai to Kafka (Kraft mode)

Introduction
Data ingestion from Oracle Database 26ai into Apache Kafka (KRaft mode) relies on a streamlined, connector‑based architecture that captures relational changes or table snapshots from Oracle and publishes them into Kafka topics without requiring ZooKeeper. Oracle 26ai provides modernized JDBC drivers, JSON-native views, and enhanced metadata capabilities that simplify integration with Kafka Connect.

☑️Context: We will push data from a table in oracle database Version 23.26.0.0.0 to a Kafka topic.
✔️Kafka is running in Kraft-mode (No Zookeeper needed)
✔️ We create a table that serves as source to data ingestion in Kafka
We will go through several steps to successfully conclude our demo.

Note: Most of the steps are all automated in shell scripts

☑️Environment:
✔️Virtual machine running oracle Linux 9
✔️Oracle database Version 23.26.0.0.0installed and operational.

========================================
ORACLE LINUX ENVIRONMENT
========================================

---- OS VERSION ----
NAME="Oracle Linux Server"
VERSION="9.7"

---- ORACLE DATABASE VERSION ----
Running SQL*Plus version check...
Oracle AI Database 26ai Free Release 23.26.0.0.0 - Develop, Learn, and Run for F
ree

---- LISTENER STATUS ----

LSNRCTL for Linux: Version 23.26.0.0.0 - Production on 03-MAY-2026 15:21:08

Copyright (c) 1991, 2025, Oracle. All rights reserved.

Connecting to (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=ol9m1)(PORT=1521)))
STATUS of the LISTENER
------------------------
Alias LISTENER
Version TNSLSNR for Linux: Version 23.26.0.0.0 - Production
Start Date 01-MAY-2026 16:17:40
Uptime 1 days 23 hr. 3 min. 27 sec
Trace Level off
Security ON: Local OS Authentication
SNMP OFF
Default Service FREE
Listener Parameter File /opt/oracle/product/26ai/dbhomeFree/network/admin/listener.ora
Listener Log File /opt/oracle/diag/tnslsnr/ol9m1/listener/alert/log.xml
Listening Endpoints Summary...
(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=ol9m1)(PORT=1521)))
(DESCRIPTION=(ADDRESS=(PROTOCOL=ipc)(KEY=EXTPROC1521)))
Services Summary...
Service "467a4b81d8554e29e0632b011facf11b" has 1 instance(s).
Instance "FREE", status READY, has 1 handler(s) for this service...
Service "FREE" has 1 instance(s).
Instance "FREE", status READY, has 1 handler(s) for this service...
Service "FREEXDB" has 1 instance(s).
Instance "FREE", status READY, has 1 handler(s) for this service...
Service "freepdb1" has 1 instance(s).
Instance "FREE", status READY, has 1 handler(s) for this service...
The command completed successfully

---- ORACLE RPM PACKAGES ----
oracle-indexhtml-9-4.0.1.el9_2.noarch
oraclelinux-release-9.7-1.0.6.el9.x86_64
oracle-logos-90.4-1.0.1.el9.x86_64
oracle-backgrounds-90.4-1.0.1.el9.noarch
oraclelinux-release-el9-1.0-26.el9.x86_64
oracle-ai-database-preinstall-26ai-1.0-1.el9.x86_64
oracle-ai-database-free-26ai-23.26.0-1.x86_64

---- ENVIRONMENT VARIABLES ----
ORACLE_HOME = /opt/oracle/product/26ai/dbhomeFree


First all scripts are created in a dedicated folder :
/opt/kafka-admin
Note: If you want to run the demo on your own, you will be able to download the scripts demo_oracle26ai_kafka_kraft

Main script description:

install-kafka.sh
Purpose: Install Apache Kafka 3.0.0 in Kraft mode on Oracle Linux 9.
What this script does:
1. Installs Java 11 (required for Kafka 3.0.0)
2. Downloads Kafka 3.0.0 (Scala 2.13)
3. Extracts it into /opt/kafka
4. Creates /var/lib/kafka/data
5. Generates a Kraft Cluster ID
6. Formats the metadata log directory
7. Prepares Kafka to run in Zookeeper‑less mode

create-service.sh
Purpose: Run Kafka as a systemd service in Kraft mode
Run Kafka Connect as a systemd service
1. Kafka (Kraft‑mode) already installed under /opt/kafka
2. Kafka (Kraft‑mode) running Oracle Linux 9.
3. Supports automatic restarts, and uses a dedicated kafka user for safety.

Note: Run this script as a privileged user

cr_kafka_connect_kraft.sh
Purpose: Kafka Connect (KRaft Mode) Automation Script

What this script does
1. Creates Connect internal topics (Kraft mode)
2. Validates Kafka availability
3. Starts Kafka Connect distributed worker
4. Prepares plugin directory

Note: Run this script as a privileged user

#!/bin/bash
#
##############################################################################
#
# Script Name : install-kafka.sh
# Created By : Salem Houali
# Created On : 2026-04-26
# ========================================================================
#
# Purpose:
# Install Apache Kafka 3.0.0 in KRaft mode on Oracle Linux 9.
#
# What this script does:
# 1. Installs Java 11 (required for Kafka 3.0.0)
# 2. Downloads Kafka 3.0.0 (Scala 2.13)
# 3. Extracts it into /opt/kafka
# 4. Creates /var/lib/kafka/data
# 5. Generates a KRaft Cluster ID
# 6. Formats the metadata log directory
# 7. Prepares Kafka to run in ZooKeeper‑less mode
#
# Notes:
# - Run this script as root or a privileged user.
# - Kafka will run under the dedicated 'kafka' user.
#
##############################################################################
set -euo pipefail
# -----------------------------------------------------------------------------
# Variables
# -----------------------------------------------------------------------------
KAFKA_VERSION="3.0.0"
SCALA_VERSION="2.13"
KAFKA_TGZ="kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
DOWNLOAD_URL=“https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/${KAFKA_TGZ}"
KAFKA_USER="kafka"
KAFKA_HOME="/opt/kafka"
KAFKA_DATA="/var/lib/kafka/data"
KRAFT_CONFIG_DIR="${KAFKA_HOME}/config/kraft"
SERVER_PROPERTIES="${KRAFT_CONFIG_DIR}/server.properties"
TMP_DIR="/opt/kafka-install/tmp"
mkdir -p "${TMP_DIR}"
# -----------------------------------------------------------------------------
# Install Dependencies
# -----------------------------------------------------------------------------
echo "=== Installing dependencies ==="
dnf install -y java-11-openjdk java-11-openjdk-devel wget tar
# -----------------------------------------------------------------------------
# Create Kafka User
# -----------------------------------------------------------------------------
echo "=== Creating Kafka user ==="
id -u "${KAFKA_USER}" &>/dev/null || useradd -r -s /sbin/nologin "${KAFKA_USER}"
# -----------------------------------------------------------------------------
# Create Directories
# -----------------------------------------------------------------------------
echo "=== Creating Kafka directories ==="
mkdir -p "${KAFKA_HOME}"
mkdir -p "${KAFKA_DATA}"
mkdir -p "${KRAFT_CONFIG_DIR}"
# -----------------------------------------------------------------------------
# Download Kafka
# -----------------------------------------------------------------------------
echo "=== Downloading Kafka ${KAFKA_VERSION} ==="
wget -O "${TMP_DIR}/${KAFKA_TGZ}" "${DOWNLOAD_URL}"
# -----------------------------------------------------------------------------
# Extract Kafka
# -----------------------------------------------------------------------------
echo "=== Extracting Kafka ==="
tar -xzf "${TMP_DIR}/${KAFKA_TGZ}" -C "${KAFKA_HOME}" --strip-components=1
# -----------------------------------------------------------------------------
# Create server.properties
# -----------------------------------------------------------------------------
echo "=== Creating server.properties ==="
cat > "${SERVER_PROPERTIES}" << 'EOF'
process.roles=broker,controller
node.id=1
listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
controller.quorum.voters=1@localhost:9093
log.dirs=/var/lib/kafka/data
num.partitions=3
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
EOF
# -----------------------------------------------------------------------------
# Generate Cluster ID
# -----------------------------------------------------------------------------
echo "=== Generating Cluster ID ==="
CLUSTER_ID="$(${KAFKA_HOME}/bin/kafka-storage.sh random-uuid)"
echo "Cluster ID: ${CLUSTER_ID}"
# -------------------------------------------------—————————-
# Format Storage
# -----------------------------------------------------------------------------
echo "=== Formatting storage for KRaft ==="
"${KAFKA_HOME}/bin/kafka-storage.sh" format \
-t "${CLUSTER_ID}" \
-c "${SERVER_PROPERTIES}"
# -----------------------------------------------------------------------------
# Permissions
# -----------------------------------------------------------------------------
echo "=== Setting permissions ==="
chown -R "${KAFKA_USER}:${KAFKA_USER}" "${KAFKA_HOME}"
chown -R "${KAFKA_USER}:${KAFKA_USER}" /var/lib/kafka
# -----------------------------------------------------------------------------
# Completion Message
# -----------------------------------------------------------------------------
echo "=== Kafka installation complete ==="
echo "Start Kafka with:"
echo " ${KAFKA_HOME}/bin/kafka-server-start.sh ${SERVER_PROPERTIES}"
☑️ Execution Output

☑️Create and Run Kafka & Kafka Connect as a systemd services
Run create-service.sh script

#!/bin/bash
#
################################################################################
#
# Script name : create-service.sh
# Created by: Salem Houali
# Created on : 2026-05-02
# ===================================================================
#
# Purpose: Run Kafka as a systemd service in Kraft mode
# Run Kafka Connect as a systemd service
# 1. Kafka (Kraft‑mode) already installed under /opt/kafka
# 2. Kafka (Kraft‑mode) running Oracle Linux 9.
# 3. Supports automatic restarts, and uses a dedicated kafka user for safety.
#
# Note: Run this script as a privileged user
################################################################################
#
set -e
echo “=== Creating kafka (kraft) systemd file ===”
sudo tee /etc/systemd/system/kafka.service > /dev/null << ‘EOF’
[Unit]
Description=Apache Kafka (KRaft Mode)
After=network.target
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=5
Environment=”KAFKA_HEAP_OPTS=-Xms1G -Xmx1G”
[Install]
WantedBy=multi-user.target
EOF
echo “=== Reload and enable systemd service ===”
systemctl daemon-reload
systemctl enable –now kafka
echo “=== Check status ===”
systemctl status kafka > kafka_status.txt
echo “=== Creating kafka connect systemd file ===”
sudo tee /etc/systemd/system/kafka-connect.service > /dev/null << ‘EOF’
[Unit]
Description=Kafka Connect Distributed Worker
After=network.target kafka.service
Requires=kafka.service
[Service]
Type=simple
User=kafka
Group=kafka
Environment=”KAFKA_HEAP_OPTS=-Xms1G -Xmx1G”
ExecStart=/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
ExecStop=/bin/kill -TERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=100000
[Install]
WantedBy=multi-user.target
EOF
echo “=== Reload and enable systemd service ===”
systemctl daemon-reload
systemctl enable –now kafka-connect
echo “=== Check kafka-connect status ===”
systemctl status kafka > kafka_connect_status.txt
echo “=== Verify both services ===”
systemctl status kafka
systemctl status kafka-connect
echo “=== Check Connect REST API ===”
curl localhost:8083

☑️Check the services (Kafka)

☑️Check the services (Kafka-connect)

☑️Create Connect internal topics / Validate Kafka availability)
Run cr_kafka_connect_kraft.sh

#!/bin/bash
#
#########################################################################
#
# Script name : cr_kafka_connect_kraft.sh
# Created by: Salem Houali
# Creatd on : 2026-04-26
# ==================================================================
# Purpose: Kafka Connect (KRaft Mode) Automation Script
#
# What this script does
# 1. Creates Connect internal topics (KRaft mode)
# 2. Validates Kafka availability
# 3. Starts Kafka Connect distributed worker
# 4. Prepares plugin directory
#
# Note: Run this script as a privillegd user
##########################################################################
#

set -e

KAFKA_HOME="/opt/kafka"
BOOTSTRAP="localhost:9092"
CONNECT_CONFIG="$KAFKA_HOME/config/connect-distributed.properties"
PLUGINS_DIR="/opt/kafka/plugins"

echo "=== Kafka Connect (KRaft Mode) Automation ==="

# backup the oroginal file first
cp $CONNECT_CONFIG "$CONNECT_CONFIG"_bkp

# replace the current file

echo "=== connect-distributed.properties ==="

sudo tee /opt/kafka/config/connect-distributed.properties > /dev/null << 'EOF'
# Example: use this minimal configuration

bootstrap.servers=localhost:9092

group.id=connect-cluster
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/opt/kafka/plugins

rest.port=8083
rest.host.name=localhost
rest.advertised.host.name=localhost

offset.flush.interval.ms=10000

EOF

# ---------------------------------------------------------
# 1. Validate Kafka is running
# ---------------------------------------------------------
echo "[1/6] Checking Kafka broker availability..."

systemctl status kafka | grep -c "running"

if [ $? -ne 0 ]; then
echo "ERROR: Kafka broker not reachable on $BOOTSTRAP"
exit 1
fi

echo "Kafka broker is reachable."

# ---------------------------------------------------------
# 2. Create Connect internal topics
# ---------------------------------------------------------
echo "[2/6] Creating Kafka Connect internal topics..."

create_topic() {
local topic=$1
local partitions=$2

if "$KAFKA_HOME"/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP --list | grep -q "^$topic$"; then
echo "Topic '$topic' already exists."
else
"$KAFKA_HOME"/bin/kafka-topics.sh --create \
--topic "$topic" \
--bootstrap-server "$BOOTSTRAP" \
--replication-factor 1 \
--partitions "$partitions" \
--config cleanup.policy=compact
echo "Created topic: $topic"
fi
}

create_topic "connect-configs" 1
create_topic "connect-offsets" 25
create_topic "connect-status" 5

# ---------------------------------------------------------
# 3. Prepare plugin directory
# ---------------------------------------------------------
echo "[3/6] Ensuring plugin directory exists at $PLUGINS_DIR..."

sudo mkdir -p "$PLUGINS_DIR"
sudo chown -R kafka:kafka "$PLUGINS_DIR"
sudo chown -R kafka:kafka /opt/kafka

echo "Plugin directory ready."

# ---------------------------------------------------------
# 4. Validate Connect worker config
# ---------------------------------------------------------
echo "[4/6] Validating Connect worker configuration..."

if [[ ! -f "$CONNECT_CONFIG" ]]; then
echo "ERROR: Connect worker config not found at $CONNECT_CONFIG"
exit 1
fi

grep -q "plugin.path" "$CONNECT_CONFIG" || {
echo "plugin.path=$PLUGINS_DIR" | sudo tee -a "$CONNECT_CONFIG"
echo "Added plugin.path to worker config."
}

echo "Connect worker configuration validated."

# ---------------------------------------------------------
# 5. Start Kafka Connect
# ---------------------------------------------------------
echo "[5/6] Starting Kafka Connect distributed worker..."

nohup $KAFKA_HOME/bin/connect-distributed.sh "$CONNECT_CONFIG" \
> /var/log/kafka-connect.log 2>&1 &

sleep 5

if ! curl -s http://localhost:8083/ >/dev/null; then
echo "Kafka Connect is DOWN"
else
echo "Kafka Connect is UP"
fi

echo "Kafka Connect started successfully."

# ---------------------------------------------------------
# 6. Validate REST API
# ---------------------------------------------------------
echo "[6/6] Validating Kafka Connect REST API..."

curl -s http://localhost:8083/ | jq .

echo "Kafka Connect REST API is healthy."
echo "=== Kafka Connect setup complete ==="

 ☑️Validate that Kafka Kraft setup is **fully operational**.
 ✅ Test workflow : create a topic, start a producer, start a consumer.

✅Create a topic

[root@ol9m1 bin]# ./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic test-topic \
--partitions 1 \
--replication-factor 1
Created topic test-topic.
[root@ol9m1 bin]#

✅ Validate the topic (test-topic) is created

[root@ol9m1 bin]#
[root@ol9m1 bin]# ./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
__consumer_offsets
connect-configs
connect-offsets
connect-status
test-topic
[root@ol9m1 bin]#

✅ Start a producer in a terminal and enter some text
[root@ol9m1 bin]#
[root@ol9m1 bin]# ./kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic
>validate if Kafka in KRAFT mode is working..
>Kafka in Kraft-mode works!!!
>This ends the demo.
>

✅ Start a consumer and validate the text entered
[root@ol9m1 bin]# ./kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning
validate if Kafka in KRAFT mode is working..
Kafka in Kraft-mode works!!!
This ends the demo.


Let's proceed with the next step Install the JDBC Connector Plugin
☑️Download the Confluent JDBC Connector (Use the latest version compatible with Kafka 3.x/4.x)
☑️Choose ingestion style JDBC Source

| Option | What it does | When to use it |
|——————|———————————————–|———————–|
| JDBC Source | Polls tables for new/changed rows | Simple, low–medium volume, no hard CDC |
| Debezium Oracle | Log‑based CDC (inserts/updates/deletes) | True CDC, audit, high volume |
| GoldenGate → KC | Oracle’s enterprise CDC into Kafka | Mission‑critical, Oracle‑blessed path |

cd /opt/kafka/plugins
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar

☑️Add Oracle JDBC driver
Place the ojdbc11.jar in the same directory:
/opt/kafka/libs
Since we have already our database installed and functional, we need just to copy ojdbc11.jar

cp /opt/oracle/product/26ai/dbhomeFree/jdbc/lib/ojdbc11.jar /opt/kafka/plugins

Place the jar files in /opt/kafka/libs

☑️Deploy the connector
Important
test your jdbc connection
Specify incrementing mode and column name

connector name: oracle26ai-customers-source

Run the deploy-oracle-connector.sh script

#!/bin/bash
#
#########################################################################
#
#      Script name : deploy-oracle-connector.sh
#      Created by: Salem Houali
#      Created on : 2026-04-26
#      ==================================================================
#     Purpose: Oracle connector deployment automation Script
#
#           What this script does
#      1. Checks if the connector exists
#      2. Creates it if missing
#      3. Updates it if already deployed
#
#        Note: Run this script as a privilleged user
##########################################################################

set -e

CONNECT_URL="http: // localhost:8083"
CONNECTOR_NAME="oracle26ai-customers-source"
CONFIG_FILE="oracle26ai-customers-source.json"
echo "Checking if connector '${CONNECTOR_NAME}' exists..."
EXISTS=$(curl -s -o /dev/null -w "%{http_code}" \
 ${CONNECT_URL}/connectors/${CONNECTOR_NAME}) 
 if [ "$EXISTS" -eq 200 ]; then
  echo "Connector exists. Updating configuration..."
  curl -s -X PUT \    -H "Content-Type: application/json" \
    --data @"${CONFIG_FILE}" \
    ${CONNECT_URL}/connectors/${CONNECTOR_NAME}/config
  echo -e "\nUpdated."
else
  echo "Connector does not exist. Creating..."
  curl -s -X POST \
    -H "Content-Type: application/json" \
    --data @"${CONFIG_FILE}" \
    ${CONNECT_URL}/connectors
  echo -e "\nCreated."
fi

Move the jdbc – connector and oracle jdbc to /opt/kafka/libs
rwxr-xr-x. 9 kafka kafka 132 May 2 12:15 ..
-rwxr-xr-x. 1 kafka kafka 277230 May 2 14:0kafka-connect-jdbc-10.7.4.jar
-rwxr-xr-x. 1 kafka kafka 7625330 May 2 14:02 ojdbc11.jar

Run the following script ./deploy-oracle-connector.sh

Check the status of the connector

Now let’s run the cr_demo_kafka.sh that creates customers table for data ingestion in Kafka

#!/bin/bash
#
# Script Name : cr_demo_kafka.sh
# Purpose : Test messages ingestion in kafka from oracle database26ai
# Author : Salem Houali
# Created On : 2026-05-02
#
# Create HR.CUSTOMERS table + load sample data + spool report
# Notes :
# - Ensure you can connect to your freepdb1 instance
# - Generates customer report in /opt/kafka-admin/tmp
# - Designed for Oracle 26ai / FreePDB1
#

# -----------------------------
# CONFIGURATION
# -----------------------------
ORACLE_USER="hr"
ORACLE_PASS="hr"
ORACLE_CONN="localhost:1521/freepdb1"

REPORT_DIR="/opt/kafka-admin"
REPORT_FILE="${REPORT_DIR}/cust_report.txt"
LOG_FILE="/opt/kafka-admin/tmp/hr_demo_kafka.log"

mkdir -p "$REPORT_DIR"

echo "======================================================" | tee "$LOG_FILE"
echo " HR CUSTOMERS DEMO – Kafka JDBC Source Prep" | tee -a "$LOG_FILE"
echo " Started: $(date)" | tee -a "$LOG_FILE"
echo "======================================================" | tee -a "$LOG_FILE"

# -----------------------------
# RUN SQL BLOCK
# -----------------------------
sqlplus -s "${ORACLE_USER}/${ORACLE_PASS}@${ORACLE_CONN}" <<EOF >> "$LOG_FILE" 2>&1

WHENEVER SQLERROR EXIT SQL.SQLCODE;

PROMPT Dropping CUSTOMERS table if exists...

BEGIN
EXECUTE IMMEDIATE 'DROP TABLE CUSTOMERS PURGE';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/

PROMPT Creating CUSTOMERS table...

CREATE TABLE CUSTOMERS (
ID NUMBER(10,0),
FIRST_NAME VARCHAR2(50),
LAST_NAME VARCHAR2(50),
EMAIL VARCHAR2(100),
PHONE_NUMBER VARCHAR2(20),
CREATED_AT TIMESTAMP DEFAULT SYSTIMESTAMP,
UPDATED_AT TIMESTAMP,
STATUS VARCHAR2(20)
);

PROMPT Inserting sample data from employees (Finance + Executive)...

INSERT INTO CUSTOMERS (
ID,
FIRST_NAME,
LAST_NAME,
EMAIL,
PHONE_NUMBER
)
SELECT
EMPLOYEE_ID,
FIRST_NAME,
LAST_NAME,
EMAIL,
PHONE_NUMBER
FROM
EMPLOYEES
WHERE
DEPARTMENT_ID IN (90, 100);

COMMIT;

PROMPT Generating report...

SET PAGESIZE 100
SET LINESIZE 2000
SET FEEDBACK OFF
SET HEADING OFF
SET TRIMSPOOL ON

COLUMN id FORMAT 999
COLUMN FIRST_NAME FORMAT A15
COLUMN LAST_NAME FORMAT A15
COLUMN EMAIL FORMAT A15
COLUMN PHONE_NUMBER FORMAT A20
COLUMN CREATED_AT FORMAT A30
COLUMN UPDATED_AT FORMAT A30
COLUMN STATUS FORMAT A10

SPOOL ${REPORT_FILE}

SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, CREATED_AT, UPDATED_AT, STATUS
FROM CUSTOMERS;

SPOOL OFF

PROMPT Done.

EXIT
EOF

# -----------------------------
# FINAL STATUS
# -----------------------------
if [ $? -eq 0 ]; then
echo "SQL execution completed successfully." | tee -a "$LOG_FILE"
echo "Customer report generated at: $REPORT_FILE" | tee -a "$LOG_FILE"
else
echo "ERROR: SQL execution failed. Check log: $LOG_FILE" | tee -a "$LOG_FILE"
exit 1
fi

echo "Finished: $(date)" | tee -a "$LOG_FILE"
echo "======================================================" | tee -a "$LOG_FILE"

exit 0

☑️Execution output

☑️ Let’s create a topic and test if we are able to push data into it
☑️create oracle26ai-CUSTOMERS topic

cd /opt/kafka/bin

./kafka-topics.sh \
--create \
--topic oracle26ai-CUSTOMERS \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1

☑️Start a consumer 

./kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic oracle26ai-CUSTOMERS \
--from-beginning

☑️add one row in the table and check again the consumer window in a terminal

— insert one new row and check the message in the consumer terminal
INSERT INTO customers (
id,
first_name,
last_name,
email,
phone_number,
status
) VALUES ( 300,
'Salem',
'Salem',
'SMAIL',
'1.500.0302.8922',
'active' );
commit;
/

☑️ connect as hr and run the ins_data.sql script

[oracle@ol9m1 kafka-admin]$ sqlplus hr/hr@freepdb1;

SQL*Plus: Release 23.26.0.0.0 - Production on Sat May 2 20:42:35 2026
Version 23.26.0.0.0

Copyright (c) 1982, 2025, Oracle. All rights reserved.

Last Successful login time: Sat May 02 2026 20:42:05 -04:00

Connected to:
Oracle AI Database 26ai Free Release 23.26.0.0.0 - Develop, Learn, and Run for Free
Version 23.26.0.0.0

SQL>
SQL>
SQL> show user;
USER is "HR"
SQL>
SQL> @ins_data.sql;

1 row created.

Commit complete.

SQL>

☑️Check again the consumer

The row inserted via the ins_data.sql script appears in the consumer window
This ends the demo 😊

Leave a Reply