파란배개 2025. 6. 16. 07:09

데이터 파이프라인 문서 바로가기: 데이터 파이프라인 설계

1. 개요

우리 프로젝트는 매시간 트렌드 키워드를 추출하고, 관련 콘텐츠를 생성하여 사용자에게 제공하는 배치 파이프라인을 목표로 합니다. 실시간 처리가 이상적이지만, MVP 단계에서는 시간 기반 배치로 구현하기로 했습니다. 이를 위해 Spring Scheduler를 사용해 매시 정각에 실행되며, Spring Batch를 도입해 안정적이고 확장 가능한 데이터 처리를 보장합니다.

  • 주요 기능: 실시간 트렌드 키워드 추출, 관련 뉴스 및 YouTube 콘텐츠 검색, LLM 기반 요약 Post 생성
  • 활용 API: Google Trends RSS, Naver Search API, YouTube Data API, Gemini API 등

2. Spring Batch란?

Spring Batch는 대량의 데이터를 처리하기 위한 프레임워크로, 안정성과 확장성을 갖춘 배치 작업을 지원합니다. 로깅/추적, 트랜잭션 관리, 작업 처리 통계 등 기능으로 복잡한 데이터 처리 과정을 체계적으로 관리하며, 오류 처리와 재시작 기능으로 신뢰성을 높입니다.

배치가 실패하여 작업을 재시작하게 된다면 처음부터가 아닌 실패한 지점부터 실행 가능하며, 중복 실행을 막기 위해 Parameters 정보를 저장합니다.

배치 작업이 필요한 상황

  • 대용량의 비즈니스 데이터를 복잡한 작업으로 처리해야하는 경우
  • 특정한 시점에 스케줄러를 통해 자동화된 작업이 필요한 경우
  • 대용량 데이터의 포맷을 변경, 유효성 검사 등의 작업을 트랜잭션 안에서 처리 후 기록해야하는 경우

Batch와 Scheduler의 차이

배치 어플리케이션의 절대적인 목적은 대용량 데이터를 처리하는 것으로, Batch 프레임워크인 Spring Batch에서 스케줄링 기능을 제공하지 않습니다. 스케줄링은 스케줄링 프레임워크인 Quartz에서 제공하기에 둘은 보완적인 관계로 볼 수 있습니다.


Spring Batch의 핵심 구성 요소


Job

배치 계층 구조에서 가장 상위에 있는 개념으로 하나의 배치 작업 자체

  • Job Configuration을 통해 생성되는 객체 단위로서 배치 작업을 어떻게 구성하고 실행할 것인지 전체적으로 설정하고 명세해놓은 객체
  • 우리가 실행할 Step의 모든 정보를 저장
  • 여러 Step을 포함하고 있는 컨테이너로서 반드시 한 개 이상의 Step으로 구성
  • JobBuilder
    • Job을 구성하는 설정 조건에 따라 두 개의 하위 빌더 클래스를 생성하고 실제 Job 생성 위임
    • SimpleJobBuilder, FlowJobBuilder 클래스 존재

Job의 기본 구현체 종류

  • SimpleJob
    • 순차적으로 Step을 실행시키는 Job
    • Step 1 → Step 2 → Step 3
  • FlowJob
    • 특정한 조건과 흐름에 따라 Step을 구성하여 실행시키는 Job
    • Flow 객체를 실행시켜서 작업 진행
    • Step 1 → [Step 2 or Step 1] → Step 3
    • 조건문 분기 예시
@Bean
public Job batchJob() {
    return jobBuilderFactory.get("batchJob")
            .start(step1()) // Flow 시작하는 Step 설정
            .on("COMPLETED") // Step의 실행 결과로 돌려받는 종료 상태 (ExitStatus)를 캐치하여 매칭하는 패턴, TransitionBuilder 반환
            .to(step2()) // 다음으로 이동할 Step 지정 .stop()/fail()/end()/stopAndRestart() -> Flow를 중지/실패/종료 하도록 Flow 종료
            // => Step1이 성공하면 Step2로 가라.
            .from(step1()) // 이전 단계에서 정의한 Step 의 Flow 를 추가적으로 정의함
            .on("FAILED").to(step3()) // => Step1이 실패하면 Step3로 가라.
            //.next(step2()) // 다음으로 이동할 Step 지정
            .end() // build() 앞에 위치하면 FlowBuilder 를 종료하고 SimpleFlow 객체 생성
            .build(); // FlowJob 생성하고 flow 필드에 SimpleFlow 저장
}

JobLauncher

Job를 실행시키는 주체

  • 스프링 부트 배치가 구동되면 JobLauncher Bean 자동 생성됨 (DI 받아 사용)
  • Job과 JobParameter를 사용하여 Job을 실행시키는 객체
  • **public** **interface** **JobLauncher** { JobExecution **run**(Job job, JobParameters jobParameters)}
  • 배치처리를 완료 한 다음에 Client에게 JobExecution을 반환(동기적 실행)
    • 비동기적 실행도 가능
  • 스케줄러와 연동 가능

JobInstance

Job의 논리적 실행단위 객체로써 고유하게 식별 가능한 작업 실행

  • Job:JobInstance = 1:N
  • JobName + JobParameter 가 같을 경우 동일한 JobInstance를 리턴해 Exception 발생 (중복불가)
  • 빌드 시 Job이 자동 실행되는 것을 막기 위해 다음과 같은 설정 추가
spring:
  batch:
    job:
      enabled: false # 자동 실행을 막는다.

실행 엔드포인트 직접 추가

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/batch")
public class BatchJobControllerV1 {

	private final JobLauncher jobLauncher;
	private final Job trendToPostJob;

	@PostMapping("/run/trend")
	public ResponseEntity<String> runTrendToPostJob() {
		LocalDateTime bucketAt = LocalDateTime.now();

		JobParameters params = new JobParametersBuilder()
			.addString("runTime", bucketAt.toString())
			.toJobParameters();

		jobLauncher.run(trendToPostJob, params);
		
		return ResponseEntity.ok("trendToPostJob 실행: " + bucketAt);
	}
}

JobParameters

Job을 실행할 때 함께 포함되어 사용되는 파라미터를 가진 도메인 객체

  • Job Parameter는 key, value로 구성되어진 map을 포함
  • 하나의 Job에 존재하는 여러 개의 JobInstance를 구분하기 위한 용도
  • JobParameters:JobInstance = 1:1
**public** **class** **JobParameters** **implements** **Serializable** {
    **private** **final** Map<String, JobParameter> parameters;

    **public** **JobParameters**() {
        **this**.parameters = **new** LinkedHashMap<>();
    }
    ...
}
  • BATCH_JOB_EXECUTION_PARAM table 예시


JobExecution

JobInstance에 대한 한 번의 시도를 의미하는 객체로서 Job 실행 중에 발생한 정보를 저장하고 있는 객체

  • 시작시간(START_TIME), 종료 시간(END_TIME), 상태(STATUS), 종료 상태(EXIT_CODE , EXIT_MESSAGE) 등의 속성 포함
    • 상태 종류 : 실행중(STARTED), 실패(FAILED), 성공(COMPLETED)
  • JobParameter가 동일한 값으로 Job을 실행할지라도 상태가 FAILED 이라면 하나의 JobInstance 내에서 성공할 때까지 여러 번의 시도 가능
    • COMPLITED이면 재실행이 불가하며 실행시도시 JobInstanceAlreadyCompleteException 발생
  • JobInstance:JobExecution = 1:N

Step

Batch Job을 구성하는 독립적이고 순차적으로 단계를 캡슐화하는 도메인 객체

  • Step은 독립적이고 순차적으로 배치 처리를 수행
  • 트랜잭션은 Step 내에서 처리
  • 단순한 단일 Task 뿐만 아니라 입력(ItemReader) → 처리(ItemProcessor) → 출력(ItemWriter)과 관련된 복잡한 비즈니스 로직을 포함하는 모든 설정을 포함
  • Job:Step = 1:N
@Bean
public Job trendToPostJob() {
	return new JobBuilder("trendToPostJob", jobRepository)
		.incrementer(new RunIdIncrementer())        // JobParameter의 값 자동 증가  
		.start(fetchTrendingKeywordsStep())         // 처음 실행할 Step 설정
		.next(searchSourcesStep())                  // 다음에 실행할 Step 설정
		.next(evaluateNoveltyStep())                      
		.next(generatePostStep())     
		// .preventRestart(true)              // Job의 재시작 가능 여부(default:true)
		// .validator(JobParameterValidator)  // JobParameter 검증
		// .listener(JobExecutionListener)    // 라이프 사이클의 특정 시점에 콜백 제공        
		.build();
}

Step의 기본 구현제 종류

  • TaskletStep
    • 가장 기본이 되는 클래스로서 Tasklet 타입의 구현체 제어
    public Step taskletStep() {
    	return this.stepBuilderFactor.get("step")
    																.tasklet(myTasklet())
    																.build();
    
  • PartitionStep
    • 멀티 스레드 방식으로 Step을 여러 개로 분리해서 실행
  • JobStep
    • Step 내에서 Job 실행
    • Step이 Job의 하위 개념이지만, Step 안에서 Job을 구성 가능
    public Step jobStep() {
    	return this.stepBuilderFactory.get("step")
    															.job(job())
    															.launcher(jobLauncher)
    															.parametersExtractor(jobParametersExtractor())
    															.build();
    
  • FlowStep
    • Step 내에서 Flow 실행

StepExecution

Step에 대한 한 번의 시도를 의미하며 Step 실행 중에 발생한 정보들을 저장하고 있는 객체

  • 시작시간(START_TIME), 종료시간(END_TIME), 상태(STATUS), 커밋수(COMMIT_COUNT), 롤백수(ROLLBACK_COUNT) 등의 속성 포함
  • Step이 시도될 때마다 각 Step별로 StepExecution 생성
  • Job이 재시작되더라도 이미 성공적으로 완료된 Step은 재실행되지 않도 실패한 Step만 실행
  • JobExecution:StepExecution = 1:N
    • Step의 StepExecution이 모두 정상적으로 완료되어야 → JobExecution 성공
    • Step의 StepExecution 중 하나라도 실패하면 → JobExecution 실패
  • StepBuilder
    • 호출하는 메서드에 따라 사용하는 하위 StepBuilder가 다르며 그에 따른 Step이 생성됨
    • StepBuilder 종류
public class StepBuilder extends StepBuilderHelper<StepBuilder> {

    // Tasklet 사용
    public TaskletStepBuilder tasklet(Tasklet tasklet) {
        return new TaskletStepBuilder(this).tasklet(tasklet);
    }

    // Chunk 기반의 Tasklet 사용
    public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
        return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
    }
    public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
        return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
    }

    // PartitionStep
    public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
        return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
    }
    public PartitionStepBuilder partitioner(Step step) {
        return new PartitionStepBuilder(this).step(step);
    }

    // JobStep
    public JobStepBuilder job(Job job) {
        return new JobStepBuilder(this).job(job);
    }

    // FlowStep
    public FlowStepBuilder flow(Flow flow) {
        return new FlowStepBuilder(this).flow(flow);
    }
}

ExecutionContext

StepExecution 또는 JobExecition 객체의 상태를 저장하는 공유 객체

  • Step 간 데이터를 공유하거나 재시작 시 상태 복구에 사용되는 저장소 역할
  • Job 재시작 시 이미 처리한 Row는 건너뛰고 이후로 수행하도록 할 때 상태 정보 활용
  • DB에 직렬화한 값으로 저장됨 {”key”: “value”}
  • 공유 범위
    • Step 범위 - 각 Step의 StepExecution에 저장되며 Step간 공유 불가능
    • Job 범위 - 각 Job의 JobExecution에 저장되며 Job간 공유 불가능 + 해당 Job의 Step간 공유 가능

JobRepository

Batch Job 중의 정보를 저장하는 저장소 역할

  • Job이 언제 수행되었고 언제 끝났으며, 몇 번이 실행되었고 실행에 대한 결과 등의 배치 실행 상태와 이력에 관련된 모든 메타 정보 저장
  • 오류 발생 시 재시작 지원
  • JobLauncher, Job, Step 구현체 내부에서 CRUD 기능 처리
    • Job이 실행되게 되면 JobRepository에 JobExecution과 StepExecution을 생성하게 되며 JobRepository에서 Execution 정보들을 DB에 저장하고 조회하며 사용
  • 두 가지의 방식
    • JDBC 방식: JobRepositoryFactoryBean
      • 내부적으로 AOP 기술을 통해 트랜잭션 처리
      • 트랜잭선 isolation의 기본 값은 SERIALIZEBLE로 최고 수준. 다른 레벨(READ_COMMITED, REPEATABLE_READ)로 지정 가능
      • override
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE"); // isolation 수준, 기본값은 “ISOLATION_SERIALIZABLE”
    factory.setTablePrefix(“SYSTEM_"); // 테이블 Prefix, 기본값은 “BATCH_”, BATCH_JOB_EXECUTION 가 SYSTEM_JOB_EXECUTION 으로 변경됨
    factory.setMaxVarCharLength(1000); // varchar 최대 길이(기본값 2500)
    return factory.getObject(); // Proxy 객체가 생성됨 (트랜잭션 Advice 적용 등을 위해 AOP 기술 적용)
}
  • In-Memory 방식: MapJobRepositoryFactoryBean
    • 성능 등의 이유로 도메인 객체를 굳이 데이터베이스에 저장하고 싶지 않은 경우
    • override
@Override
protected JobRepository createJobRepository() throws Exception {
    MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
    factory.setTransactionManager(transactionManager); // ResourcelessTransactionManager 사용
    return factory.getObject();
}

Tasklet

Step 내에서 실행되는 객체, 주로 단일 task를 수행

  • TaskletStep에 의해 반복적으로 실행(While loop)되며 반환값에 따라 실행/종료
    • RepeatStatus.CONTINUABLE : 반복
    • RepeatStatus.FINISHED : 종료
  • Step:Tasklet = 1:1
**public** **interface** **Tasklet** {
    @Nullable
    RepeatStatus **execute**(StepContribution contribution, ChunkContext chunkContext) **throws** Exception;
}

Task 기반

단일 작업 기반으로 처리하는 방식

  • 주로 Tasklet 구현체를 만들어 사용
  • 대량 처리를 하는 경우 chunk 기반에 비해 더 복잡한 구현 필요
@Bean
public Step taskStep() {
    return stepBuilderFactory.get("taskStep") // StepBuilder를 생성하는 팩토리. Step의 이름을 매개변수로 받음.
            .**tasklet**((contribution, chunkContext) -> { // Tasklet 클래스 설정. 이 메서드를 실행하면 TaskletStepBuilder 반환.
                return RepeatStatus.FINISHED;
            })
            .startLimit(10) // Step의 실행 횟수를 설정. 설정한 만큼 실행되고 초과시 오류 발생. 기본 값은 INTEGER.MAX_VALUE
            .allowStartIfComplete(true) // Step의 성공, 실패와 상관없이 항상 Step을 실행하기 위한 설정. 기본값은 false
//                .listener(StepExecutionListener) // Step 라이프 사이클의 특정 시점에 콜백 제공받도록 StepExecutionListener 설정
            .build();
}

Chunk 기반

데이터를 일정 단위로 묶어 처리하며, 메모리와 성능 최적화 설계

  • ItemReader, ItemProcessor, ItemWriter를 사용하며 Chunk 기반 전용 Tasklet인 ChunkOrientedTasklet 구현제 제공
    • Step 과정에서 Item을 읽어 데이터를 처리한 다음 결과를 처리하는 객체
    • ItemReader와 ItemProcesseor는 Chunk 내의 개별 item을 처리하지만 ItemWriter는 Chunk 모든 items를 일괄 처리
    • ItemReader와 ItemWriter는 필수요소 지만 ItemProcesseor는 선택요소
  • Chunk 단위로 트랜잭션 처리 → JPA 영속성 컨텍스트 사용 가능
    1. Item을 Chunk size 만큼 반복해서 읽은 다음 Chunk에 저장하고 반환
    2. ChunkProvider로부터 받은 Chunk의 아이템 개수만큼 데이터를 가공하고 저장
    3. 읽을 Item이 더 존재하는지 확인해서 존재하면 Chunk 프로세스 반복하고, null일 경우 Repeatstatus.FINISHED 반환하고 프로세스 종료
@Bean
public Step chunkStep() {
    return stepBuilderFactory.get("chunkStep")
            .<String, String>chunk(3)
            .**reader**(new ListItemReader(Arrays.asList("item1","item2","item3")))
            .**processor**(new ItemProcessor<String, String>() {
                @Override
                public String process(String item) throws Exception {
                    return item.toUpperCase();
                }
            })
            .**writer**(list -> {
                list.forEach(item -> System.out.println(item));
            })
            .build();
}

ItemReader

input 데이터를 읽어오는 인터페이스

  • Cursor와 Paging 기능 제공
  • 구현체 종류
    • FlatFileItemReader : txt, csv 등 파일
    • JsonItemReader : json 파일
    • MultiResourceItemReader : 여러 개의 파일 조합
    • JdbcCursorItemReader : JDBC Cursor
    • JpaCursorItemReader : JPA Cursor
    • JdbcPagingItemReader : JDBC Paging
    • JpaPagingItemReader : JPA Paging
    • SynchronizeditemStreamReader : Thread-safe ItemReader
    • CustomItemReader : Custom ItemReader

ItemProcessor

데이터를 출력하기 전에 데이터를 가공, 변형, 필터링 하는 역할

  • 구현체 종류
    • CompositeItemProcessor : ItemProcessor들을 체이닝 실행
    • ClassifierCompositeItemProcessor : 라우팅으로 ItemProcessor들 중 하나를 호출

ItemWriter

Chunk 단위로 데이터를 받아 일괄 출력하는 인터페이스

  • 출력이 완료되면 트랜잭션이 종료되고 새로운 Chunk 단위 프로세스로 이동
  • 구현체 종류
    • FlatFileItemWriter : txt, csv 등 파일
    • JsonFileItemWriter : json 파일
    • JdbcBatchItemWriter : JDBC
      • JDBC Batch 기능을 사용해 bulk update 처리
    • JpaItemWriter : JPA
      • Entity를 하나씩 chunk 크기만큼 update 처리
    • CustomItemWriter : Custom ItemWriter

Listener

Job, Step, Chunk 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용하는 인터셉터 개념의 클래스

  • 주로 각 단계별 로그를 남기거나 실행 상태 정보를 참조하기 위해 사용되며 모든 단계별로 지정 가능
  • Job, Step, Skip, Retry 모든 단계를 지원하며 각 작업의 성공/실패와 관계없이 무조건 호출 가능
  • JobExecutionListener
public class CustomJobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("JobExecution.getJobName() : " + jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("JobExecution.getStatus() : " + jobExecution.getStatus());
        long startTime = jobExecution.getStartTime().getTime();
        long endTime = jobExecution.getEndTime().getTime();
        System.out.println("(endTime-startTime) = " + (endTime - startTime));
    }
}
  • StepExecutionListener
@Component
public class CustomStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("stepExecution.getStepName() : " + stepExecution.getStepName());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("stepExecution.getStatus() : " + stepExecution.getStatus());
        return ExitStatus.COMPLETED;
    }
}

3. Spring Batch를 도입한 이유

우리의 뉴스/트렌드 요약 서비스는 시간 단위로 유입되는 키워드와 컨텐츠를 기반으로 요약 포스트를 만들어야 합니다. 초기에는 간단한 @Scheduled 로직으로 해결하려고 했지만 아래와 같은 문제를 발견했습니다.

  • 작업 단계 간 의존성과 상태 관리가 어려웠고,
  • 특정 API 실패 시 전체 작업 실패하는 문제가 생겼고,
  • 리소스 병목으로 인해 성능 저하가 우려되었습니다.

이러한 문제를 해결하고자, 각 단계를 분리하고 실패 복구가 가능한 구조로 전환하기 위해 Spring Batch를 도입하게 되었습니다.

  • 안정성: 트랜잭션 관리와 오류 처리로 데이터 손실을 방지합니다.
  • 확장성: 대량의 키워드와 콘텐츠를 처리하며, 병렬 Step으로 성능을 최적화합니다.
  • 유연성: RSS, API, LLM 등 다양한 소스와 통합 가능하며, 커스텀 로직을 쉽게 추가할 수 있습니다.
  • 모니터링: JobRepository로 실행 이력을 추적하고, 오류를 분석합니다.
  • Spring 통합: 기존 Spring 기반 프로젝트와 자연스럽게 연동됩니다.

4. 시스템 구성

  • 배치 스케줄러: Spring Scheduler (HourScheduler)
  • 배치 프레임워크: Spring Batch
  • 데이터베이스(JobRepository): 서비스 DB인 MySQL과 통합
  • 캐시: Redis (빠른 응답을 위한 Post 캐싱)
  • 외부 API: Google Trends, Naver News, YouTube Data API

파이프라인 흐름

  1. Step 1: fetchTrendingKeywords
    • Google Trends RSS에서 Top-10 키워드 수집 → DB 저장
  2. Step 2: searchSources (병렬)
    • Naver News와 YouTube API 호출 → 소스 저장
    1. Step 3: evaluateNovelty
    • 키워드 신규성 평가 → 메트릭 업데이트
  3. Step 4: generatePost
    • LLM으로 요약 → Post 생성 및 저장
  4. Step 5: cachePost
    • 캐싱에 필요해도 포스트는 생성되도록 관심사 분리
    • Redis 로 카드뷰 캐시 처리

핵심 포인트

포인트 구현 방법

JobParameter runtime(bucketAt)을 파라미터로 설정하여 수동 재실행시 특정 시간대 지정 가능
Step 간 데이터 공유 ExecutionContext (topKeywordIds, postableKeywordIds)
병렬 처리 SplitFlow (뉴스·유튜브 동시 호출 - API 왕복 시간 절반으로 단축)
조건부 실행 JobExecutionDecider (low_variation 전부 true → 포스트 스킵)
Idempotent preventRestart() 로 동일 bucketAt 중복 실행 차단 + insertIgnoreAll
외부 API 장애 대응 각 Adapter에 @Retry, @CircuitBreaker, @RateLimiter (Resilience4j)
Skip Resilience4j 쉘이 retry를 이미 시도했으므로 건너뛰기만 처리
모니터링 ErrorLogStepListener와 BatchMetricsListener로 상세 로그 확인

5. Spring Batch를 활용하기

5.1 Job 구성

Job: trendToPostJob

@Configuration
@RequiredArgsConstructor
public class BatchConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final BatchMetricsListener metrics;
    
    /* === Job 정의 === */
    @Bean
    public Job trendToPostJob() {
        return new JobBuilder("trendToPostJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .listener(metrics)
            .start(fetchTrendingKeywordsStep)   // 1
            .next(searchSourcesFlow)            // 2 (병렬)
            .next(evaluateNoveltyStep)          // 3
            .next(noveltyDecider)               // Decider
                .on("NO_POST_NEEDED").to(cachePostStep)
            .from(noveltyDecider).on("*")
		            .to(generatePostStep)           // 4
                .next(cachePostStep)            // 5
            .end()
            .build();
    }

5.2 Execution context

JobExecutionContext

변수 설명 alias

topKeywordIds Top-N 키워드 PK 리스트 JC_TOP_IDS
topKeywordCount 이번 주기 Top-N 수 JC_TOP_COUNT
postableKeywordIds low_variation = false 키워드 PK JC_POSTABLE_IDS
postableKeywordCount 위 리스트 개수 JC_POSTABLE_COUNT
noPostNeeded 이번 주기 포스트 생성 스킵 여부 JC_NO_POST_NEEDED
newPostIds 새로운 포스트 PK 리스트 JC_NEW_POST_IDS

StepExecutionContext → 모니터링 메트릭용

변수 설명 alias

newsFetched 뉴스 URL 수집 개수 SC_NEWS_FETCHED
newsApiFail 뉴스 API 실패 건수 SC_NEWS_API_FAIL
videoFetched 영상 URL 수집 개수 SC_VIDEO_FETCHED
videoApiFail 영상 API 실패 건수 SC_VIDEO_API_FAIL
noveltyLowVarCount low_variation = true 키워드 수 SC_NOVELTY_SKIPPED
postCreated 실제 생성된 포스트 개수 SC_POST_CREATED
cacheEntryCount Redis에 캐싱된 카드뷰 개수 SC_CACHE_SIZE

5.3 Step & Tasklet 구성

  • step 1: 키워드 수집 Tasklet
@Bean
	public Step fetchTrendingKeywordsStep() {
		return new StepBuilder("fetchTrendingKeywordsStep", jobRepository)
			.tasklet((contrib, ctx) -> {

				// bucketAt : JobParameter 로 넘어왔으면 재사용, 없으면 생성
				LocalDateTime bucketAt = jobEc.containsKey(EC_BUCKET_AT)
					? (LocalDateTime)jobEc.get(EC_BUCKET_AT)
					: LocalDateTime.now().truncatedTo(ChronoUnit.HOURS);

				if (!jobEc.containsKey(EC_BUCKET_AT)) {
					jobEc.put(EC_BUCKET_AT, bucketAt);
				}

				// Google Trends 호출 + 지연(ms) 측정
				long t0 = System.currentTimeMillis();
				List<Keyword> keywords = trendsService.getTrendingKeywordsFromRss();
				long latency = System.currentTimeMillis() - t0;

				// JobExecutionContext 업데이트
				List<Long> keywordIds = keywords.stream().map(Keyword::getId).toList();
				jobEc.put(EC_TOP_IDS, keywordIds);
				jobEc.putInt(EC_TOP_COUNT, keywordIds.size());

				// StepExecutionContext 업데이트
				stepEc.putLong(SC_RSS_LATENCY_MS, latency);

				return RepeatStatus.FINISHED;
			}, transactionManager)
        .listener(err)                           //     상세 로그 Listener
        .listener(metrics)                       //     메트릭 Listener
        .allowStartIfComplete(true)   
        .build();
}
  • step 2: 뉴스/유튜브 병렬 Split Flow
    @Bean
    public Step searchNewsStep() {
        return new StepBuilder("searchNewsStep", jobRepository)
            .tasklet((contrib, ctx) -> {
            
                SearchStats stat = sourceService.searchNews();
                
                stepEc.putInt(SC_NEWS_FETCHED,   stat.fetched());   // Counter
                stepEc.putInt(SC_NEWS_API_FAIL,  stat.failed());    // Counter
    
                return RepeatStatus.FINISHED;
            }, transactionManager)
            .listener(err)
            .listener(metrics)
            .allowStartIfComplete(true)
            .build();
    }
    
    @Bean
    public Step searchVideosStep() {
        return new StepBuilder("searchVideosStep", jobRepository)
            .tasklet((contrib, ctx) -> {
                                             
                SearchStats stat = sourceService.searchYoutube();
    
                stepEc.putInt(SC_VIDEO_FETCHED,  stat.fetched());
                stepEc.putInt(SC_VIDEO_API_FAIL, stat.failed());
    
                return RepeatStatus.FINISHED;
            }, transactionManager)
            .listener(err)
            .listener(metrics)
            .allowStartIfComplete(true)
            .build();
    }
    
    // 병렬 Flow :  뉴스 + 유튜브 Step 동시 실행
    @Bean
    public Flow searchSourcesFlow() {
        return new FlowBuilder<SimpleFlow>("searchSourcesFlow")
            .split(new SimpleAsyncTaskExecutor())
            .add(
                new FlowBuilder<Flow>("searchNewsFlow")
                    .start(searchNewsStep())
                    .build(),
    
                new FlowBuilder<Flow>("searchVideosFlow")
                    .start(searchVideosStep())
                    .build()
            )
            .build();
    }
    
  • SearchStats의 통계 정보 반환 필요 - 추후 : .skipLimit(10)
@Bean
public Step searchNewsStep() {
    return new StepBuilder("searchNewsStep", jobRepository)
        .tasklet((contrib, ctx) -> {
        
            SearchStats stat = sourceService.searchNews();
            
            stepEc.putInt(SC_NEWS_FETCHED,   stat.fetched());   // Counter
            stepEc.putInt(SC_NEWS_API_FAIL,  stat.failed());    // Counter

            return RepeatStatus.FINISHED;
        }, transactionManager)
        .listener(err)
        .listener(metrics)
        .allowStartIfComplete(true)
        .build();
}

@Bean
public Step searchVideosStep() {
    return new StepBuilder("searchVideosStep", jobRepository)
        .tasklet((contrib, ctx) -> {
                                         
            SearchStats stat = sourceService.searchYoutube();

            stepEc.putInt(SC_VIDEO_FETCHED,  stat.fetched());
            stepEc.putInt(SC_VIDEO_API_FAIL, stat.failed());

            return RepeatStatus.FINISHED;
        }, transactionManager)
        .listener(err)
        .listener(metrics)
        .allowStartIfComplete(true)
        .build();
}

// 병렬 Flow :  뉴스 + 유튜브 Step 동시 실행
@Bean
public Flow searchSourcesFlow() {
    return new FlowBuilder<SimpleFlow>("searchSourcesFlow")
        .split(new SimpleAsyncTaskExecutor())
        .add(
            new FlowBuilder<Flow>("searchNewsFlow")
                .start(searchNewsStep())
                .build(),

            new FlowBuilder<Flow>("searchVideosFlow")
                .start(searchVideosStep())
                .build()
        )
        .build();
}
  • step 3: 신규성 평가 Tasklet
@Bean
public Step evaluateNoveltyStep() {
    return new StepBuilder("evaluateNoveltyStep", jobRepository)
        .tasklet((contrib, ctx) -> {

            // Top-10 키워드 ID 가져오기
            @SuppressWarnings("unchecked")
            List<Long> topIds = (List<Long>) jobEc.get(EC_TOP_IDS);

            // 1) Novelty 계산 & keyword_metric_hourly UPDATE
            NoveltyStats ns = noveltyService.evaluateNovelty(topIds);
            int lowVarCnt   = ns.lowVariationCount();

            // 2) low_variation = false 인 키워드만 다음 Step 전달
            jobEc.put(EC_POSTABLE_IDS,    ns.postableIds());
            jobEc.putInt(EC_POSTABLE_COUNT, ns.postableIds().size());

            // 3) StepExecutionContext 업데이트
            stepEc.putInt(SC_NOVELTY_SKIPPED, lowVarCnt);   // Counter

            return RepeatStatus.FINISHED;
        }, transactionManager)
        .listener(err)
        .listener(metrics)
        .build();
}
  • decider: 포스트 생성 필요 여부
@Bean
public JobExecutionDecider noveltyDecider() {
    return (jobExec, stepExec) -> {

        // low-variation 키워드 유무 판정
        @SuppressWarnings("unchecked")
        List<Long> ids = (List<Long>) jobExec.getExecutionContext()
                                             .get(EC_POSTABLE_IDS);

        boolean noPostNeeded = (ids == null || ids.isEmpty());

        jobExec.getExecutionContext().put("noPostNeeded", noPostNeeded);

        return noPostNeeded
               ? new FlowExecutionStatus("NO_POST_NEEDED")
               : FlowExecutionStatus.COMPLETED;
    };
}
  • step 4: 포스트 생성 Tasklet (조건부)
    @Bean
    public Step generatePostStep() {
        return new StepBuilder("generatePostStep", jobRepository)
            .tasklet((contrib, ctx) -> {
    
                // 대상 키워드(id) 목록
                @SuppressWarnings("unchecked")
    						List<Long> ids = (List<Long>)jobEc.get(EC_POSTABLE_IDS);
    
                // Post 생성 (신규 Source ↔ LLM 요약)
                int created = ids.isEmpty() ? 0 : postService.generatePosts(ids);
    
                // StepExecutionContext 업데이트
                stepEc.putInt(SC_POST_CREATED, created);          // Counter
    
                return RepeatStatus.FINISHED;
            }, transactionManager)
            .listener(err)
            .listener(metrics)
            .allowStartIfComplete(true)
            .build();
    }
    
  • .skipLimit(3) 추후 적용
  • step 5: Redis 캐싱 Tasklet
@Bean
public Step cachePostStep() {
    return new StepBuilder("cachePostStep", jobRepository)
        .tasklet((contrib, ctx) -> {

            // Redis 캐싱 (24h TTL)
            CacheStat stat = postService.cacheLatestPosts(Duration.ofHours(24));

            // StepExecutionContext 업데이트
            stepEc.putInt(SC_CACHE_SIZE, stat.size());  // Gauge (batch_cache_size)

            return RepeatStatus.FINISHED;
        }, transactionManager)
        .listener(err)
        .listener(metrics)
        .build();
}

6. 모니터링 시스템

[공통 Tag]:

  • application: trend-batch
  • instance: SPRING_ACTIVE_PROFILES과 동일
  • job: trendToPostJob
  • step: fetchTrendingKeywordsStep, searchNewsStep, searchVideosStep, evaluateNoveltyStep, generatePostStep, cachePostStep,
  • bucket: bucketAt UTC 시각(YYYY-MM-DDTHH)
  • status: COMPLETED, FAILED
  • task: runTrendToPostJob (@Scheduled 메서드 식별자)

Prometheus Actuator가 Spring Batch 메트릭을 기본 제공하므로 Job / Step 실행 정보는 바로 얻을 수 있습니다.

단, ExecutionContext에 보관해둔 lowVariation 수, Post 개수나 각종 모니터링 메트릭은 Listener에서 Gauge/Counter/Summary로 직접 등록해야 합니다.

범주 Metric Type Tag (key="value")

Tasklet 지표 (Listener) batch_news_fetched Summary application, instance, job, step, bucket
  batch_news_api_fail_total Counter application, instance, job, step, bucket
  batch_video_fetched Summary application, instance, job, step, bucket
  batch_video_api_fail_total Counter application, instance, job, step, bucket
  batch_novelty_lowvar Summary application, instance, job, step, bucket
  batch_post_added Summary application, instance, job, step, bucket
  batch_post_cached Gauge application, instance, job, step, bucket
Job-Level Flag batch_no_post_needed Gauge application, instance, job

시스템 설정

  • build.gradle
implementation "org.springframework.boot:spring-boot-starter-actuator"
implementation "io.micrometer:micrometer-registry-prometheus"
  • application.yaml
management:
  endpoints:
    web:
      exposure:
        include: prometheus 
  metrics:
    tags:
      application: trend-batch
      instance: ${SPRING_ACTIVE_PROFILES:test}
    enable:
      scheduling: true
  prometheus:
    metrics:
      export:
        enabled: true
  • prometheus.yaml 에 추가
scrape_configs:
  - job_name: "app"
    metrics_path: "/actuator/prometheus"
    honor_labels: true       # ← 우리가 붙인 tag 우선
    static_configs:
      - targets: ["host.docker.internal:8080"]
  • ExecutionContext → Prometheus: Listener 설정
@Component
@RequiredArgsConstructor
public class BatchMetricsListener implements StepExecutionListener, JobExecutionListener {

	private final MeterRegistry meter;

	@Value("${spring.application.name:trend-batch}")
	private String application;

	@Value("${spring.profiles.active:local}")
	private String profile;

	// Step 종료 :  ExecutionContext → Gauge / Counter
	@Override
	public ExitStatus afterStep(StepExecution stepExec) {

		JobExecution je = stepExec.getJobExecution();
		ExecutionContext ctx = stepExec.getExecutionContext();

		Tags base = Tags.of(
			"application", application,
			"instance", profile,
			"job", je.getJobInstance().getJobName(),
			"step", stepExec.getStepName(),
			"bucket", String.valueOf(je.getExecutionContext().get("bucketAt"))
		);

		// Gauge & Counter 기록
		recordGauge(ctx, "rssLatencyMs", "batch_rss_latency_ms", base);
		recordCounter(ctx, "newsFetched", "batch_news_fetched_total", base);
		recordCounter(ctx, "newsApiFail", "batch_news_api_fail_total", base);
		recordCounter(ctx, "videoFetched", "batch_video_fetched_total", base);
		recordCounter(ctx, "videoApiFail", "batch_video_api_fail_total", base);
		recordCounter(ctx, "noveltyLowVarCount", "batch_novelty_lowvar_total", base);
		recordCounter(ctx, "postCreated", "batch_post_created_total", base);
		recordGauge(ctx, "cacheEntryCount", "batch_cache_size", base);

		return stepExec.getExitStatus();
	}

	// Job 종료 : noPostNeeded 플래그 Gauge
	@Override
	public void afterJob(JobExecution jobExec) {
		boolean skip = Boolean.TRUE.equals(jobExec.getExecutionContext().get("noPostNeeded"));

		Gauge.builder("batch_no_post_needed", () -> skip ? 1.0 : 0.0)
			.tags("application", application,
				"instance", System.getenv().getOrDefault("HOSTNAME", "local"),
				"job", jobExec.getJobInstance().getJobName())
			.register(meter);
	}

	private void recordGauge(ExecutionContext ctx, String key, String name, Tags tags) {
		if (ctx.containsKey(key)) {
			Number number = ctx.get(key, Number.class);
			Gauge.builder(name, number::doubleValue)
				.tags(tags)
				.register(meter);
		}
	}

	private void recordCounter(ExecutionContext ctx, String key, String name, Tags tags) {
		if (ctx.containsKey(key)) {
			Number number = ctx.get(key, Number.class);
			Counter counter = Counter.builder(name)
				.tags(tags)
				.register(meter);
			counter.increment(number.doubleValue());
		}
	}
}
  • Error Log → Listener 설정
@Slf4j
@Component
public class ErrorLogStepListener implements StepExecutionListener {

	@Override
	public void beforeStep(StepExecution stepExecution) {
		log.info("[BEFORE STEP] [{}] started (id={})",
			stepExecution.getStepName(),
			stepExecution.getId());
	}

	@Override
	public ExitStatus afterStep(StepExecution stepExecution) {
		log.info("[AFTER STEP]  [{}] finished (read={}, write={}, skip={}, commit={}, rollback={})",
			stepExecution.getStepName(),
			stepExecution.getReadCount(),
			stepExecution.getWriteCount(),
			stepExecution.getSkipCount(),
			stepExecution.getCommitCount(),
			stepExecution.getRollbackCount());
		return stepExecution.getExitStatus();
	}
}

Grafana: Panel & Query

대시보드 구성 템플릿은 infra/grafana/dashboard/ 폴더 안에 있습니다. (import으로 사용 가능)

<aside> 💡

  • 템플릿(https://grafana.com/grafana/dashboards/)
    • 14430: Spring Boot Statistics
    • 7362: MySQL Overview
    • 1860: Node Exporter Full
    • 13639: Loki 로그 뷰어
      • Query 예시: {app="backend", stream="stdout"} |= "ERROR" | logfmt | line_format "{{.msg}} [{{.method}} {{.status}}]" </aside>

Alert Rules 예시

알람 조건 PromQL 식 for / Severity

포스트 생성 0건 3 h increase(batch_post_created_total{job="$job"}[3h]) == 0 for: 3h, warning
“포스트 스킵” 발생 batch_no_post_needed{job="$job"} == 1 for: 0m, info
뉴스 API 실패율 > 10 % sum(increase(batch_news_api_fail_total[30m])) / sum(increase(batch_news_fetched_total[30m])) > 0.1 for: 10m, warning
스케줄러 지연 > 30 s max_over_time(scheduler_execution_latency_seconds_max[5m]) > 30 for: 5m, critical
Job 연속 실패 3회 sum_over_time(spring_batch_job_status{job="$job",status="FAILED"}[3h]) >= 3 for: 0m, critical

References

https://docs.spring.io/spring-batch/docs/3.0.x/reference/htmlsingle/

https://europani.github.io/spring/2023/02/04/049-spring-batch-basic.html

https://amabile29.tistory.com/53

https://blog.naver.com/seong1542/223174079490

https://hevodata.com/learn/spring-batch-scheduling/

https://docs.spring.io/spring-batch/reference/schema-appendix.html

https://stackoverflow.com/questions/2292667/how-can-we-share-data-between-the-different-steps-of-a-job-in-spring-batch

https://www.youtube.com/watch?v=1xJU8HfBREY

https://www.devkuma.com/docs/prometheus/spring-batch/#pushgateway

 

bemcatu2uno5ce