Kafka - Consumindo dados do Postgres
Fala pessoal!
Nesse tutorial estarei ensinando como conectar sua aplicação para escutar sempre que houver alguma alteração em seu banco de dados, recebendo uma mensagem/notificação.
Para auxiliar seu aprendizado, deixei uma aplicação pronta como exemplo. Recomendo que apenas faça o clone da aplicação e siga o passo a passo desse tutorial. Link abaixo:
Além de nossa aplicação e o banco de dados, precisaremos também de serviços externos para realizar essa integração. Para isso utilizaremos o debezium, kafka e kafka connect.
O Debezium é um serviço para transformar as alterações realizadas no banco de dados em eventos. Saiba Mais
O Kafka é um serviço para a transmissão de eventos, sendo o serviço que a aplicação se conecta para receber as mensagens. Saiba Mais
O Kafka Connect é um serviço para realizar as configurações e conexão do Kafka com outros serviços (em nosso caso é o Debezium). Saiba Mais
Para criação desses serviços, basta usar o docker-compose presente no respositório disponibilizado. Lembrando que você deverá possuir o Docker instalado em sua máquina.
Antes de subir os container, é necessário que você crie o arquivo .env baseado no seu .env.example. Você deverá apenas alterar a variável HOST inserindo o IP sua máquina.
Após a criação do .env, rode o seguinte comando para inicializar os serviços: (note que o postgres também será inicializado)
docker-compose -f docker-compose.yml up -d
Com nossos serviços rodando, primeiramente vamos utilizar o postgres. Como será necessário realizar algumas queries em nosso banco, sugiro que utilize o Postbird:
Primeiramente vamos alterar a seguinte configuração em nosso banco de dados:
*Pode utilizar outra forma para alterar a configuração do banco de dados, mas rodar a query abaixo através do postbird já é suficiente
ALTER SYSTEM SET wal_level TO logical;
Aproveitando, já vamos criar a nossa tabela user_info no banco de dados:
CREATE TABLE user_info(
id integer not null,
name varchar,
alias varchar,
age integer,
active boolean default true,
created_at timestamp not null default now(),
PRIMARY KEY(id)
);
Para que a alteração de configuração do banco tenha efeito, precisamos restartar o serviço:
docker-compose -f docker-compose.yml restart
Para finalizar as configurações, vamos criar a conexão do kafka connect.
Com todos os serviços do docker-compose rodando, você deverá fazer uma requisição do tipo POST para a URL: http://localhost:8083/connectors contendo o body abaixo:
Após a criação, você deverá obter a seguinte resposta:
Com as configurações finalizadas, chegou a hora de consumir os dados em nossa aplicação.
Vamos então iniciar nossa aplicação:
*Lembrando que todos os serviços devem estar rodando
yarn start
Com nossa aplicação em pé, chegou a hora de inserir dados em nossa tabela para recebermos o dados através do kafka:
INSERT INTO user_info VALUES (1, 'Lucas Yuri', 'luryy', 20);
Após os dados serem inseridos em nossa tabela user_info, no console da nossa aplicação deverá aparecer o seguinte log:
{
before: null,
after: {
id: 3,
name: 'Lucas Yuri',
alias: 'luryy',
age: 20,
active: true,
created_at: 1612453841643182
},
source: {
version: '1.2.5.Final',
connector: 'postgresql',
name: 'kafka-tutorial',
ts_ms: 1612453841643,
snapshot: 'false',
db: 'postgres',
schema: 'public',
table: 'user_info',
txId: 490,
lsn: 23478448,
xmin: null
},
op: 'c',
ts_ms: 1612453842388,
transaction: null
}
Com isso finalizamos mais um tutorial. Agora você já tem uma aplicação consumido através do debezium-kafka as alterações realizadas em seu banco de dados postgres!
Vale resaltar que esse tutorial ensina o básico para o consumo das mensagens gerados, cabe a você explorar os detalhes e otimizar a aplicação!
Sugiro que tente consumir esses dados gerados do postgres (SQL) e tentar inserir em um Mongo(NoSQL) — que foi a minha motivação para usar o kafka-connect.
Caso queira outro artigo contendo os detalhes das configurações dos serviços, deixa aqui nos comentários, e para qualquer dúvida, fico sempre a disposição!