Kafka – Introdução

Introdução

O Apache Kafka é um sistema de código aberto com envio distribuído de mensagens que permite que você crie aplicativos em tempo real usando dados de streaming.

Você pode enviar dados de streaming, como sequências de cliques de sites, transações financeiras e logs de aplicativo, para o cluster do Kafka. Ele reserva os dados e os distribui para aplicativos de processamento de streams incorporados às estruturas, como Apache Spark Streaming, Apache Storm ou Apache Samza.

Desde leitura e escrita direta entre diversos bancos e mecanismos de pesquisas (SQL, MongoDB, RethinkDB, ElasticSearch) até materialização de dados on streaming com KTable por exemplo, é possível utilizar o Kafka de diversas maneiras para mover e transformar grande volume de dados.

Conceitos

Mensagens

Mensagem é o principal recurso do Kafka. Todos os eventos do Kafka podem ser resumidos em mensagens, sendo consumidas e produzidas através de tópicos. Uma mensagem pode ser desde uma simples String com “Hello World!” ou até mesmo um JSON contendo um objeto do seu domínio.

O Kafka permite definir Schemas para mensagens, como por exemplo utilizando o Avro. Como num exemplo de um JSON contendo um objeto do seu domínio, o Schema pode auxiliar impedindo que mensagens contendo conteúdos inválidos sejam trafegadas no tópico.

Uma mensagem pode também ser composta por uma chave (key/value), que é utilizada para sharding e compactação dentro do Kafka. Assim em um ambiente distribuído, é garantido a ordem das mensagens uma vez que mensagens com a mesma chaves são direcionadas para uma única partição do Kafka.

Tópicos

Um tópico é como categorizamos grupos de mensagens dentro do Kafka. Todas as mensagens enviadas para o Kafka permanecem em um tópico. Como comentado sobre Event Sourcing, mensagens são imutáveis e ordenadas.

Para manter a ordenação em um ecossistema de Kafka, os tópicos possuem partições e fatores de replicação. Um tópico pode possuir n partições, mas ao receber uma nova mensagem o Kafka automaticamente direciona aquela mensagem para uma partição específica dependendo de sua chave (key). Assim mensagens de uma mesma chave estarão apenas em uma única partição, garantindo assim a leitura ordenada de todas as mensagens de um tópico.

Producer

Um Kafka Producer é responsável por enviar uma mensagem para um tópico específico. De forma simples, você pode produzir uma mensagem em um tópico.

Uma vez que uma mensagem é produzida em um tópico o próprio Kafka organiza a mensagem em uma partição, garantindo sempre a ordem das mensagens produzidas, como citado anteriormente.

Consumer

Temos os tópicos, e as mensagens dentro dos tópicos. Com o Kafka Consumer é possível ler essas mensagens de volta. Importante entender que, ao ler uma mensagem com o consumer, a mensagem não é retirada do tópico.

Você pode ter vários Kafka Consumers conectados em um mesmo tópico, e cada um terá a posição onde parou de ler. Assim você pode ter um tópico produzindo como no exemplo acima, pontuações de um jogador, e ter diversos consumers em pontos diferentes do tópico realizando ações diferentes. Você também pode escolher ter vários consumers lendo o mesmo tópico e na mesma partição, para escalar sua aplicação por exemplo, neste caso estes consumers fariam parte de um Consumer Group, e compartilharão sempre a posição final de leitura entre eles (offsets).

Apache Zookeeper

O Zookeeper é um serviço centralizado para, entre outras coisas, coordenação de sistemas distribuídos. O Kafka é um sistema distribuído, e consequentemente delega diversas funções de gerenciamento e coordenação para o Zookeeper.

Eles possuem uma dependência muito forte, mas isso não é tão ruim. O Kafka pode fazer o que ele intencionalmente tem que saber fazer de melhor, delegando essas demais funcionalidades para quem sabe fazer isso bem, sem precisar reinventar a roda.

Kafka Brokers / Clusters

O Broker é o coração do ecossistema do Kafka. Um Kafka Broker é executado em uma única instância em sua máquina. Um conjunto de Brokers entre diversas máquinas formam um Kafka Cluster.

Uma das principais características do Kafka é a escalabilidade e resiliência que ele oferece. Você pode rodar o Kafka local na sua máquina onde sua própria máquina teria um Kafka Broker formando um Kafka Cluster, como pode subir n instâncias de Kafka Brokers e todas estarem no mesmo Kafka Cluster. Com isso é possível escalar sua aplicação, e replicar os dados entre os Brokers.

Code Playgrounds – Introdução e soluções

Introdução

Code Playground é uma ferramenta para editar e testar aplicações Web, utilizando HTML, CSS e JavaScript de forma online.

Com esse tipo de ferramenta é possível configurar pré-compiladores HTML, CSS e JavaScript, por exemplo:

  • HTML – Hml, Markdown, Slim e Pug
  • CSS – LESS, SCSS, Sass, Stylus e PostCSS
  • Javascript – CoffeeScript, LiveScript, TypeScript e Babel

Soluções

Abaixo, seguem algumas boas soluções:

  • Codepen – http://codepen.io/
  • JSBin – http://jsbin.com/
  • Plunker – https://plnkr.co/
  • CSSDeck – http://cssdeck.com/
  • Dabblet – http://dabblet.com/
  • Liveweave – http://liveweave.com/
  • JSFiddle – http://jsfiddle.net/

Conclusão

Entre os que mais se destacam, encontram-se o Codepen (possui mais pré-processadores Javascript), CSSDeck (bastante utilizado). Porém, assim como ocorrem com editores (Visual Studio Code, Atom, Sublime e Vim), a escolha depende da melhor adequação da ferramenta ao desenvolvedor.

Desenvolvendo uma aplicação Spring Data REST e H2

Desenvolvendo uma aplicação Spring Data, REST e H2

Criando a aplicação

Acesse o site http://start.spring.io/ e configure uma aplicação conforme imagem abaixo.

Baixe e descompacte o arquivo (spring-rest-data-h2.zip) em algum diretório de trabalho.

Abra o Eclipse e importe o projeto (Maven Project).

Criação da classe controller

No Eclipse, clique em File / New / Java Class e defina-a conforme abaixo.

Código

package br.com.whs.springrestdatah2.controller;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import br.com.whs.springrestdatah2.Model.User;
import br.com.whs.springrestdatah2.repository.UserRepository;

@RestController
@RequestMapping("/user")
public class UserController {
        @Autowired
        private UserRepository userRepo;
    
        @RequestMapping(value = "/list", method = RequestMethod.GET)
        public List<User> findAll(){
                return userRepo.findAll();
        }

        @RequestMapping(value = "/find-by-id/{id}", method = RequestMethod.GET)
        public ResponseEntity<?> findById(@PathVariable final Long id){
                User user = userRepo.findOne(id);
                if( user == null ){
                        String msg = "{\"message\":\"User with id " + id + " not found.\"}";
                        return new ResponseEntity<String>(msg,HttpStatus.NOT_FOUND);
                }else{
                        return new ResponseEntity<User>(user,HttpStatus.OK);
                }
        }

        @RequestMapping(value = "/find-by-login/{loginName}", method = RequestMethod.GET)
        public ResponseEntity<?> findByLoginName(@PathVariable final String loginName){
                User user = userRepo.findByLoginName(loginName);
                if( user == null ){
                        String msg = "{\"message\":\"User with login " + loginName + " not found.\"}";
                        return new ResponseEntity<String>(msg,HttpStatus.NOT_FOUND);
                }else{
                        return new ResponseEntity<User>(user,HttpStatus.OK);
                }
        }
        @RequestMapping(value = "/create", method = RequestMethod.POST)
        public ResponseEntity<User> create(@RequestBody final User user){
                userRepo.save(user);
                return new ResponseEntity<User>(userRepo.findByLoginName(user.getLoginName()),HttpStatus.OK);
        }

        @RequestMapping(value = "/update/{id}", method = RequestMethod.PUT)
        public ResponseEntity<?> update(@PathVariable("id") long id, @RequestBody User user) {
            User userData = userRepo.findOne(id);
            if (userData == null) {
                String msg = "{\"message\":\"User with id " + id + " not found.\"}";
                return new ResponseEntity<String>(msg,HttpStatus.NOT_FOUND);
            }
            userData.setLoginName(user.getLoginName());
            userData.setFullName(user.getFullName());
            userData.setPassword(user.getPassword());
            userRepo.save(userData);
            return new ResponseEntity<User>(userData, HttpStatus.OK);
        }
    
        @RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
        public ResponseEntity<?> delete(@PathVariable("id") long id) {
            User user = userRepo.findOne(id);
            if (user == null) {
                String msg = "{\"message\":\"User with id " + id + " not found.\"}";
                return new ResponseEntity<String>(msg,HttpStatus.NOT_FOUND);
            }
            userRepo.delete(id);
            return new ResponseEntity<User>(HttpStatus.NO_CONTENT);
        }
}

Criando a interface UserRepository

Código

package br.com.whs.springrestdatah2.repository;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;

import br.com.whs.springrestdatah2.Model.User;

@Component
public interface UserRepository extends JpaRepository<User, Long>{
    public User findByLoginName(String loginName);
}

 

Criando a classe Model (User.java)

Código

package br.com.whs.springrestdatah2.Model;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

@Entity
public class User {

    @Id
    @GeneratedValue
    private Long id;
    private String loginName;
    private String fullName;
    private String password;
public Long getId() {
    return id;
}

public void setId(Long id) {
    this.id = id;
}

public String getLoginName() {
    return loginName;
}

public void setLoginName(String loginName) {
    this.loginName = loginName;
}

public String getFullName() {
    return fullName;
}

public void setFullName(String fullName) {
    this.fullName = fullName;
}

public String getPassword() {
    return password;
}

public void setPassword(String password) {
    this.password = password;
}

}

 

Rodando a aplicação

Essa é uma aplicação Spring Boot, onde o framework baixará e utilizará os recursos necessários, conforme a configuração (definida na Criação da Aplicação). Neste caso, será utilizado Tomcat e banco de dados H2 (ambos embedded)

Procedimento

  1. Abra a classe SpringRestDataH2Application.java
  2. Clique no menu Run / Run as / Java Aplication
  3. Espere até que apareça “Tomcat started on port(s): 8080 (http)”
  4. Abra um navegador e forneça a url de listagem (http://localhost:8080/user/list)
  5. Deverá aparecer apenas 2 colchetes ([]), pois o List está vazio

 

Postman

Para facilitar as chamadas de POST (/user/create), PUT (/user/update) e DELETE (/user/delete), vou utilizar o Postman.

Método POST

Método PUT

Método DELETE

Código-Fonte (github)

https://github.com/lfchaim/spring-rest-data-h2