Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Kafka
- 아파치 스파크
- python
- 스파크
- Yolo5
- YOLO
- SPARK
- 내일채움공제만기
- 앱생명주기
- 준지도학습
- face blur
- Swift
- 파이썬
- roboflow
- IOS
- SeSAC
- caffemodel
- iOS부트캠프
- opencv
- 내채공만기
- Apache Spark
- train data
- 얼굴 비식별화
- 아파치 카프카
- 카프카
- Apache Kafka
- Yolo5 custom dataset
- 비식별화
- scenedelegate
- yolov5
Archives
- Today
- Total
봉식이와 캔따개
Spark Streaming으로 Kafka Topic 값 받아오기 본문
반응형
오늘은 Spark Streaming 으로 Kafka를 통해 집어넣은 값을 뽑아내보라는... 이야기를 듣고 진행해보았습니다.
Spark도 잘 모르는데 하려니까 너무 막막해요...
2020/01/29 - [프로그래밍/Etc] - [Kafka] java로 kafka producer 작성하기
위 글과 이어지는 내용입니다.
일단 Maven Project를 생성하고 Spark Streaming을 사용하기 위해 의존성 추가를 해줍니다.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
의존성 추가할때 저 둘의 버전을 맞춰주어야해요. 버전을 맞추지 않으면 에러가 뜹니다 ㅠㅠ
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
public class Source {
@SuppressWarnings("serial")
public static void main(String[] args) throws InterruptedException {
System.setProperty("hadoop.home.dir", "D:\\program\\hadoop-2.7.1"); // HADOOP_HOME 경로 잡아주기
SparkConf conf = new SparkConf().setAppName("kafka-spark").setMaster("local[2]").set("spark.driver.allowMultipleContexts", "true");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(100));
Map<String,Object> kafkaParams = new HashMap<String,Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id","spark_id");
kafkaParams.put("auto.offset.reset","latest");
kafkaParams.put("enable.auto.commit",false);
Collection<String> topics = Arrays.asList("spark"); // ""안에 kafka topic명 입력
JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(
ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
stream.mapToPair(
new PairFunction<ConsumerRecord<String,String>, String, String>(){
public Tuple2<String,String> call(ConsumerRecord<String,String> record){
return new Tuple2<String,String>(record.key(),record.value());
}
});
stream.map(raw->raw.value()).print();
ssc.start();
ssc.awaitTermination();
}
}
혹시 하둡 웅앵 하는 에러가 뜬다면
System.setProperty("hadoop.home.dir", "D:\\program\\hadoop-2.7.1")
에서 하둡 home 경로를 잘못 잡아주었거나 하둡이 설치돼있지 않아서 입니다.
왜 알고있냐면 ㅎㅎ........ ㅠㅠ......
실행해봅시당!
어쩌구 저쩌구....하면서 1초에 한번씩 값을 가져옵니다 ㅎㅎ 성공~
반응형
'~2022 흔적들 > Spark' 카테고리의 다른 글
[error] Only one SparkContext may be running in this JVM (0) | 2020.02.11 |
---|
Comments