Kafka - Consumindo dados do Postgres

Lucas Yuri
4 min readFeb 10, 2021

Fala pessoal!

Nesse tutorial estarei ensinando como conectar sua aplicação para escutar sempre que houver alguma alteração em seu banco de dados, recebendo uma mensagem/notificação.

Para auxiliar seu aprendizado, deixei uma aplicação pronta como exemplo. Recomendo que apenas faça o clone da aplicação e siga o passo a passo desse tutorial. Link abaixo:

Além de nossa aplicação e o banco de dados, precisaremos também de serviços externos para realizar essa integração. Para isso utilizaremos o debezium, kafka e kafka connect.

O Debezium é um serviço para transformar as alterações realizadas no banco de dados em eventos. Saiba Mais

O Kafka é um serviço para a transmissão de eventos, sendo o serviço que a aplicação se conecta para receber as mensagens. Saiba Mais

O Kafka Connect é um serviço para realizar as configurações e conexão do Kafka com outros serviços (em nosso caso é o Debezium). Saiba Mais

Para criação desses serviços, basta usar o docker-compose presente no respositório disponibilizado. Lembrando que você deverá possuir o Docker instalado em sua máquina.

Antes de subir os container, é necessário que você crie o arquivo .env baseado no seu .env.example. Você deverá apenas alterar a variável HOST inserindo o IP sua máquina.

Após a criação do .env, rode o seguinte comando para inicializar os serviços: (note que o postgres também será inicializado)

docker-compose -f docker-compose.yml up -d

Com nossos serviços rodando, primeiramente vamos utilizar o postgres. Como será necessário realizar algumas queries em nosso banco, sugiro que utilize o Postbird:

Postbird Exemplo — Basta apenas se conectar com o banco de dados e rodar as queries no campo “query”

Primeiramente vamos alterar a seguinte configuração em nosso banco de dados:
*Pode utilizar outra forma para alterar a configuração do banco de dados, mas rodar a query abaixo através do postbird já é suficiente

ALTER SYSTEM SET wal_level TO logical;

Aproveitando, já vamos criar a nossa tabela user_info no banco de dados:

CREATE TABLE user_info(
id integer not null,
name varchar,
alias varchar,
age integer,
active boolean default true,
created_at timestamp not null default now(),
PRIMARY KEY(id)
);

Para que a alteração de configuração do banco tenha efeito, precisamos restartar o serviço:

docker-compose -f docker-compose.yml restart

Para finalizar as configurações, vamos criar a conexão do kafka connect.

Com todos os serviços do docker-compose rodando, você deverá fazer uma requisição do tipo POST para a URL: http://localhost:8083/connectors contendo o body abaixo:

Após a criação, você deverá obter a seguinte resposta:

Com as configurações finalizadas, chegou a hora de consumir os dados em nossa aplicação.
Vamos então iniciar nossa aplicação:
*Lembrando que todos os serviços devem estar rodando

yarn start

Com nossa aplicação em pé, chegou a hora de inserir dados em nossa tabela para recebermos o dados através do kafka:

INSERT INTO user_info VALUES (1, 'Lucas Yuri', 'luryy', 20);

Após os dados serem inseridos em nossa tabela user_info, no console da nossa aplicação deverá aparecer o seguinte log:

{
before: null,
after: {
id: 3,
name: 'Lucas Yuri',
alias: 'luryy',
age: 20,
active: true,
created_at: 1612453841643182
},
source: {
version: '1.2.5.Final',
connector: 'postgresql',
name: 'kafka-tutorial',
ts_ms: 1612453841643,
snapshot: 'false',
db: 'postgres',
schema: 'public',
table: 'user_info',
txId: 490,
lsn: 23478448,
xmin: null
},
op: 'c',
ts_ms: 1612453842388,
transaction: null
}

Com isso finalizamos mais um tutorial. Agora você já tem uma aplicação consumido através do debezium-kafka as alterações realizadas em seu banco de dados postgres!

Vale resaltar que esse tutorial ensina o básico para o consumo das mensagens gerados, cabe a você explorar os detalhes e otimizar a aplicação!
Sugiro que tente consumir esses dados gerados do postgres (SQL) e tentar inserir em um Mongo(NoSQL) — que foi a minha motivação para usar o kafka-connect.

Caso queira outro artigo contendo os detalhes das configurações dos serviços, deixa aqui nos comentários, e para qualquer dúvida, fico sempre a disposição!

--

--