Kafka - Consumindo dados do Postgres

Fala pessoal!

Nesse tutorial estarei ensinando como conectar sua aplicação para escutar sempre que houver alguma alteração em seu banco de dados, recebendo uma mensagem/notificação.

Para auxiliar seu aprendizado, deixei uma aplicação pronta como exemplo. Recomendo que apenas faça o clone da aplicação e siga o passo a passo desse tutorial. Link abaixo:

Além de nossa aplicação e o banco de dados, precisaremos também de serviços externos para realizar essa integração. Para isso utilizaremos o debezium, kafka e kafka connect.

O Debezium é um serviço para transformar as alterações realizadas no banco de dados em eventos. Saiba Mais

O Kafka é um serviço para a transmissão de eventos, sendo o serviço que a aplicação se conecta para receber as mensagens. Saiba Mais

O Kafka Connect é um serviço para realizar as configurações e conexão do Kafka com outros serviços (em nosso caso é o Debezium). Saiba Mais

Para criação desses serviços, basta usar o docker-compose presente no respositório disponibilizado. Lembrando que você deverá possuir o Docker instalado em sua máquina.

Antes de subir os container, é necessário que você crie o arquivo .env baseado no seu .env.example. Você deverá apenas alterar a variável HOST inserindo o IP sua máquina.

Após a criação do .env, rode o seguinte comando para inicializar os serviços: (note que o postgres também será inicializado)

docker-compose -f docker-compose.yml up -d

Com nossos serviços rodando, primeiramente vamos utilizar o postgres. Como será necessário realizar algumas queries em nosso banco, sugiro que utilize o Postbird:

Postbird Exemplo — Basta apenas se conectar com o banco de dados e rodar as queries no campo “query”

Primeiramente vamos alterar a seguinte configuração em nosso banco de dados:
*Pode utilizar outra forma para alterar a configuração do banco de dados, mas rodar a query abaixo através do postbird já é suficiente

ALTER SYSTEM SET wal_level TO logical;

Aproveitando, já vamos criar a nossa tabela user_info no banco de dados:

CREATE TABLE user_info(
id integer not null,
name varchar,
alias varchar,
age integer,
active boolean default true,
created_at timestamp not null default now(),
PRIMARY KEY(id)
);

Para que a alteração de configuração do banco tenha efeito, precisamos restartar o serviço:

docker-compose -f docker-compose.yml restart

Para finalizar as configurações, vamos criar a conexão do kafka connect.

Com todos os serviços do docker-compose rodando, você deverá fazer uma requisição do tipo POST para a URL: http://localhost:8083/connectors contendo o body abaixo:

Após a criação, você deverá obter a seguinte resposta:

Com as configurações finalizadas, chegou a hora de consumir os dados em nossa aplicação.
Vamos então iniciar nossa aplicação:
*Lembrando que todos os serviços devem estar rodando

yarn start

Com nossa aplicação em pé, chegou a hora de inserir dados em nossa tabela para recebermos o dados através do kafka:

INSERT INTO user_info VALUES (1, 'Lucas Yuri', 'luryy', 20);

Após os dados serem inseridos em nossa tabela user_info, no console da nossa aplicação deverá aparecer o seguinte log:

{
before: null,
after: {
id: 3,
name: 'Lucas Yuri',
alias: 'luryy',
age: 20,
active: true,
created_at: 1612453841643182
},
source: {
version: '1.2.5.Final',
connector: 'postgresql',
name: 'kafka-tutorial',
ts_ms: 1612453841643,
snapshot: 'false',
db: 'postgres',
schema: 'public',
table: 'user_info',
txId: 490,
lsn: 23478448,
xmin: null
},
op: 'c',
ts_ms: 1612453842388,
transaction: null
}

Com isso finalizamos mais um tutorial. Agora você já tem uma aplicação consumido através do debezium-kafka as alterações realizadas em seu banco de dados postgres!

Vale resaltar que esse tutorial ensina o básico para o consumo das mensagens gerados, cabe a você explorar os detalhes e otimizar a aplicação!
Sugiro que tente consumir esses dados gerados do postgres (SQL) e tentar inserir em um Mongo(NoSQL) — que foi a minha motivação para usar o kafka-connect.

Caso queira outro artigo contendo os detalhes das configurações dos serviços, deixa aqui nos comentários, e para qualquer dúvida, fico sempre a disposição!

--

--

--

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Lucas Yuri

Lucas Yuri

More from Medium

Comparison and Analysis of Various NoSQL Databases

Intro to Event-Driven Architecture

[TIL-6] Why pagination is slow?

UniversalClient — One client for message brokers and NoSQL DBs