봉식이와 캔따개

[Kafka] java로 kafka producer 작성하기 본문

~2022 흔적들/Etc

[Kafka] java로 kafka producer 작성하기

봉식이누나 2020. 1. 29. 17:43
반응형

 

kafka 글이 벌써 세개... 카테고리를 만들어야할지 고민이 된다.

 

 

 

오늘은 java로 kafka producer를 작성해보겠다.

 

 

목표 : 1초에 한번 씩 랜덤한 숫자를 내보내는 producer 작성하기!!

 

 

 

 

 

 

1. project 생성하기

 

 

우선 Maven 프로젝트를 생성해줍니다. 

 

quickstart 선택

 

 

Gruop id와 Artifact id 입력

 

 

 

 

2. dependency 추가

 

 

pom.xml에 kafka dependency를 추가해줍니다

 

 

kafka 의존성 추가

 

 

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.1.0</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams</artifactId>
	<version>1.1.0</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.11</artifactId>
	<version>0.8.2.1</version>
</dependency>

 

 

 

 

 

3. producer 작성

 

 

1초에 한번씩 1~100 사이의 숫자를 내보내도록 작성하였다.

 

package kafkaProducer.producer;

import java.io.IOException;
import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {
	
	private static final String TOPIC_NAME = "spark"; //토픽명
	
	public static void main(String[] args) throws IOException, InterruptedException {
	
	    Random random = new Random();
            
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "localhost:9092"); // server, kafka host
            prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
            prop.put("acks", "all");   
            prop.put("block.on.buffer.full", "true");  

            String message = null;

            // producer 생성
            @SuppressWarnings("resource")
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

            // message 전달
            while(true) {

                message = Integer.toString(random.nextInt(100)); // 1~100 중 랜덤숫자
                producer.send(new ProducerRecord<String,String>(TOPIC_NAME, message));
                Thread.sleep(1000); // 1초

            }

        }
	
}

 

 

 

 

4. consumer 실행시키고 확인해보기

 

※참고로 spark라는 이름의 topic을 미리 생성한 뒤에 진행했습니다.

 

.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic spark --from-beginning

consumer 실행

 

 

 

 

 

 

짜라라라란~

1초에 한번씩 나오는 모습 확인~

 

반응형
Comments