🎁 본 글은 실무로 '배우는 빅데이터기술' 책을 따라해보고 실행하여보는 과정을 기록한 글이다.
🎁 빅데이터 처리의 전체적인 흐름과 과정을 학습하기 쉬우며 빅데이터에 관심있는 사람들에게 추천한다.
http://server01.hadoop.com:9870/
설치한 하둡 HDFS 를 URL로 접속. (클라우데라 매니저를 설치해서 UI를 사용하여 볼 수 있다.)
이전 글에서는 flume(수집) -> kafka -> kafka(topic) 으로 데이터를 보내는것을 진행 해 보았으며
이번 글에서는 kafka -> Consumer ->HDFS(or Redis) 로 데이터를 보내는 것을 진행 해 볼 것이다.
클라우데라에서 다음 내용을 변경한다.
flume - 상태 - 구설 파일에서 다음의 내요으로 바꾸어 줄 것이다.
SmartCar_Agent.sources = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
HDFS Sink 정보를 설정하기 위한 변수 선언
SmartCar_Agent.sinks = SmartCarInfo_HdfsSink DriverCarInfo_KafkaSink
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000
1.timeInterceptor : timestamp를 활용하기 위한 변수
2.typeInterceptor : 로그유형에 해당하는 상수값을 정의하기 위한 변수
3.collectDayInterceptor : 수집일자를 추가하기 위한 변수
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = timeInterceptor typeInterceptor collectDayInterceptor filterInterceptor
#일반적으로 날짜와 시간을 나타내는 형식
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.type = timestamp
#설정은 스마트카 에이전트의 시간 인터셉터에서 기존 시간 정보를 보존하는 것을 의미합니다. 이 설정은 새로운 시간 정보가 추가되더라도 기존의 시간 정보를 덮어쓰지 않도록 보장합니다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.preserveExisting = true
#설정은 스마트카 에이전트의 타입 인터셉터를 정적 타입으로 설정하는 것을 의미합니다. 이를 통해 특정 데이터 타입을 고정시키는 설정을 합니다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.type = static
#인터셉터가 작동할 때 참조할 데이터의 키를 logType으로 설정합니다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.key = logType
#logType 키에 대한 value를 car-batch-log로 설정합니다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.value = car-batch-log
#com.wikibook.bigdata.smartcar.flume.CollectDayInterceptor$Builder는 Java에서 특정 빌더 패턴 클래스를 나타내며, 이를 통해 CollectDayInterceptor가 생성됩니다. 이 인터셉터는 데이터 이벤트가 처리될 때마다 수집된 날짜 정보를 추가하거나 수정하는 역할을 수행할 수 있습니다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.collectDayInterceptor.type = com.wikibook.bigdata.smartcar.flume.CollectDayInterceptor$Builder
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false
SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity = 10000
#스마트카 에이전트의 데이터 싱크(sink) 중 하나가 HDFS(Hadoop Distributed File System)를 사용하도록 지정하는 것을 의미합니다. 싱크는 데이터가 최종적으로 저장되는 위치를 나타내며, 이 경우 데이터를 HDFS에 저장하게 됩니다.
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.type = hdfs
#스마트카 에이전트가 HDFS에 데이터를 저장할 경로를 지정
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.path = /pilot-pjt/collect/%{logType}/wrk_date=%Y%m%d
#스마트카 에이전트가 HDFS에 데이터를 저장할 때 파일 이름의 접두사를 동적으로 설정하도록 지정하는 것입니다. 이 설정을 통해 저장되는 파일 이름에 logType 값이 포함되도록 합니다.
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.filePrefix = %{logType}
#스마트카 에이전트가 HDFS에 데이터를 저장할 때 파일 이름의 접미사를 .log로 지정
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileSuffix = .log
#파일이 순차적으로 추가되는 데이터 스트림 형태로 저장됨을 의미합니다. 데이터가 계속해서 쓰여지는 데이터 소스에서 유용합니다.
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileType = DataStream
#스마트카 에이전트가 HDFS에 데이터를 저장시 파일 형태 지정
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.writeFormat = Text
#데이터를 HDFS에 저장할 때 한 번에 처리하는 데이터의 행(row) 수를 나타냅니다
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.batchSize = 10000
#HDFS에 데이터를 저장할 때 watting 시간을 지정하는 것으로 0이면 즉시 저장
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollInterval = 0
#롤링 횟수를 0으로 설정하면, 특정 횟수마다 파일을 롤링하지 않고 즉시 파일에 쓰여집니다.
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollCount = 0
#스마트카 에이전트가 HDFS에 데이터를 저장하는 동안의 대기 시간을 지정하는 것입니다. 여기서는 대기 시간을 100초로 설정하였습니다.
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.idleTimeout = 100
#스마트카 에이전트가 HDFS에 데이터를 저장하는 동안의 호출 시간 제한을 지정하는 것입니다. 여기서는 호출 시간 제한을 600,000밀리초(600초)로 설정
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.callTimeout = 600000
#HDFS에 데이터를 저장할 때 파일 크기가 일정 임계값을 초과하면 새로운 파일을 생성
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollSize = 67108864
#스마트카 에이전트가 HDFS에 데이터를 저장할 때 사용하는 스레드 풀의 크기를 지정
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.threadsPoolSize = 10
#스마트카 에이전트의 데이터 소스가 데이터를 전송할 채널을 지정하는 것입니다. 여기서는 데이터 소스가 SmartCarInfo_Channel이라는 채널로 데이터를 전송하도록 설정
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
#스마트카 에이전트의 HDFS 싱크가 데이터를 가져올 채널을 지정하는 것입니다. 여기서는 HDFS 싱크가 SmartCarInfo_Channel이라는 채널로부터 데이터를 가져오도록 설정되었습니다.
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.channel = SmartCarInfo_Channel
SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000
SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000
SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel
이후 라이브러리를 하나 넣어줄 것인데 플럼의 라이브러리 폴더로 이동한다. (server02 에서 진행)
cd /opt/cloudera/parcels/CDH/lib/flume-ng/lib
업로드 할 파일을 옮겨서 넣어 준다.
이후 클라우데라 매니저에서 플럼을 재시작하자.
이후 server02 에서 파일을 실행 시켜보자.
cd /home/pilot-pjt/working
java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.CarLogMain 20240612 100 &
터미널을 하나 더 실행하여 확인해보자.
cd /home/pilot-pjt/working/SmartCar
tail -f SmartCarStatusInfo_20240612.txt
파일을 복사 하여 플럼이 동작하게하자.
mv /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20240612.txt /home/pilot-pjt/working/car-batch-log/
동작을 확인 해보자
cd /var/log/flume-ng/
tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
HDFS에 생성된 파일 확인을 해보자
hdfs dfs -ls -R /pilot-pjt/collect/car-batch-log/
파일의 내용 조회 (자신의 파일 이름으로 해야 한다.)
hdfs dfs -tail /pilot-pjt/collect/car-batch-log/wrk_date=20240612/car-batch-log.1718154908177.log
Hbase 설치
클라우데라 매니저 - 클러스터 - 서비스 추가
완료 후 HBase 구성으로 이동
HBase Thrift Http 검색 후 HBase 서비스 전체 체크를 하고 변경 내용을 저장 해 준다.
이후 재시작.
HBase 테이블 생성 (Server 02 에서 실행)
hbase shell
다음과 같이 실행 하여 테이블을 만들어 보자
smartcar_test_table 이라는 이름의 테이블을 만드는데 cf 라는 이름은 컬럼의 집합의 이름을 의미한다.
create 'smartcar_test_table','cf'
put 명령으로 컬럼을 만들어 데이터를 삽입 해보자
model 이라는 컬럼을 만들어 데이터를 넣고
no 컬럼을 만들어 데이터를 넣는다.
비 정형 데이터 베이스이기 때문에 이와 같이 저장 할 수 있다.
put 'smartcar_test_table','row-key1','cf:model','Z0001'
put 'smartcar_test_table','row-key1','cf:no','12345'
get 명령으로 데이터를 확인 해보자
get 'smartcar_test_table','row-key1'
테이블 삭제는 다음과 같이 진행한다.
disable 'smartcar_test_table'
drop 'smartcar_test_table'
exit
HBase 모니터링
아래 주소에서 확인 할 수 있다.
'빅데이터' 카테고리의 다른 글
[빅데이터] Storm 설치 및 사용법 (1) | 2024.06.12 |
---|---|
[빅데이터] Redis 설치 및 사용법 (0) | 2024.06.12 |
[빅데이터] VirtualBox 서버 백업 (0) | 2024.06.11 |
[빅데이터] 하둡이란? namenode, balancer, yarn, zookeeper (0) | 2024.06.11 |
[빅데이터] 카프카(kafka) 설치 및 사용 (0) | 2024.06.11 |