봉식이와 캔따개

Spark Streaming으로 Kafka Topic 값 받아오기 본문

~2022 흔적들/Spark

Spark Streaming으로 Kafka Topic 값 받아오기

봉식이누나 2020. 2. 5. 11:05
반응형

 

 

오늘은 Spark Streaming 으로 Kafka를 통해 집어넣은 값을 뽑아내보라는... 이야기를 듣고 진행해보았습니다.

 

Spark도 잘 모르는데 하려니까 너무 막막해요...

 

 

 

2020/01/29 - [프로그래밍/Etc] - [Kafka] java로 kafka producer 작성하기

 

[Kafka] java로 kafka producer 작성하기

kafka 글이 벌써 세개... 카테고리를 만들어야할지 고민이 된다. 오늘은 java로 kafka producer를 작성해보겠다. 목표 : 1초에 한번 씩 랜덤한 숫자를 내보내는 producer 작성하기!! 1. project 생성하기 우선 Mav..

bong-sik.tistory.com

위 글과 이어지는 내용입니다.

 

 

 

 

 

일단 Maven Project를 생성하고 Spark Streaming을 사용하기 위해 의존성 추가를 해줍니다.

 

 

의존성추가

 

 

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

 

의존성 추가할때 저 둘의 버전을 맞춰주어야해요. 버전을 맞추지 않으면 에러가 뜹니다 ㅠㅠ

 

 

 

 

카프카 토픽에서 값을 가져와서 프린트 어쩌구 저쩌구

 

 

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class Source {

	@SuppressWarnings("serial")
	public static void main(String[] args) throws InterruptedException {
		System.setProperty("hadoop.home.dir", "D:\\program\\hadoop-2.7.1"); // HADOOP_HOME 경로 잡아주기
		
		SparkConf conf = new SparkConf().setAppName("kafka-spark").setMaster("local[2]").set("spark.driver.allowMultipleContexts", "true");
		JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(100));
		
		Map<String,Object> kafkaParams = new HashMap<String,Object>();
		kafkaParams.put("bootstrap.servers", "localhost:9092");
		kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("group.id","spark_id");
		kafkaParams.put("auto.offset.reset","latest");
		kafkaParams.put("enable.auto.commit",false);
		
		Collection<String> topics = Arrays.asList("spark"); // ""안에 kafka topic명 입력
		
		JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(
				ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
		
		stream.mapToPair(
				new PairFunction<ConsumerRecord<String,String>, String, String>(){
					public Tuple2<String,String> call(ConsumerRecord<String,String> record){
						return new Tuple2<String,String>(record.key(),record.value());
					}
				});
		stream.map(raw->raw.value()).print();
		
		ssc.start();
		ssc.awaitTermination();

	}

}

 

 

혹시 하둡 웅앵 하는 에러가 뜬다면

System.setProperty("hadoop.home.dir", "D:\\program\\hadoop-2.7.1")

에서 하둡 home 경로를 잘못 잡아주었거나 하둡이 설치돼있지 않아서 입니다.

 

왜 알고있냐면 ㅎㅎ........ ㅠㅠ......

 

 

실행해봅시당!

 

 

 

 

 

어쩌구 저쩌구....하면서 1초에 한번씩 값을 가져옵니다 ㅎㅎ 성공~

 

 

 

 

반응형

'~2022 흔적들 > Spark' 카테고리의 다른 글

[error] Only one SparkContext may be running in this JVM  (0) 2020.02.11
Comments