The culprit behind SeaTunnel Kafka Connector "OutOfMemory" found.The culprit behind SeaTunnel Kafka Connector "OutOfMemory" found.

The One Line of Code That Ate 12GB of SeaTunnel Kafka Connector's Memory in 5 Minutes

2025/09/12 13:30
2분 읽기
이 콘텐츠에 대한 의견이나 우려 사항이 있으시면 crypto.news@mexc.com으로 연락주시기 바랍니다

\

What happened?

In Apache SeaTunnel version 2.3.9, the Kafka connector implementation contained a potential memory leak risk. When users configured streaming jobs to read data from Kafka, even with a read rate limit (read_limit.rows_per_second) set, the system could still experience continuous memory growth until an OOM (Out Of Memory) occurred.

What's the key issue?

In real deployments, users observed the following phenomena:

  1. Running a Kafka-to-HDFS streaming job on an 8-core, 12G memory SeaTunnel Engine cluster
  2. Although read_limit.rows_per_second=1 was configured, memory usage soared from 200MB to 5GB within 5 minutes
  3. After stopping the job, memory was not released; upon resuming, memory kept growing until OOM
  4. Ultimately, worker nodes restarted

Root Cause Analysis

Through code review, it was found that the root cause lay in the createReader method of the KafkaSource class, where elementsQueue was initialized as an unbounded queue:

elementsQueue = new LinkedBlockingQueue<>(); 

This implementation had two critical issues:

  1. Unbounded Queue: LinkedBlockingQueue without a specified capacity can theoretically grow indefinitely. When producer speed far exceeds consumer speed, memory continuously grows.
  2. Ineffective Rate Limiting: Although users configured read_limit.rows_per_second=1, this limit did not actually apply to Kafka data reading, causing data to accumulate in the memory queue.

Solution

The community resolved this issue via PR #9041. The main improvements include:

  1. Introducing a Bounded Queue: Replacing LinkedBlockingQueue with a fixed-size ArrayBlockingQueue
  2. Configurable Queue Size: Adding a queue.size configuration parameter, allowing users to adjust as needed
  3. Safe Default Value: Setting DEFAULT_QUEUE_SIZE=1000 as the default queue capacity

Core implementation changes:

public class KafkaSource {     private static final String QUEUE_SIZE_KEY = "queue.size";     private static final int DEFAULT_QUEUE_SIZE = 1000;      public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(             SourceReader.Context readerContext) {         int queueSize = kafkaSourceConfig.getInt(QUEUE_SIZE_KEY, DEFAULT_QUEUE_SIZE);         BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue =                  new ArrayBlockingQueue<>(queueSize);         // ...     } } 

Best Practice Recommendations

For users of the SeaTunnel Kafka connector, it is recommended to:

  1. Upgrade Version: Use the SeaTunnel version containing this fix
  2. Configure Properly: Set an appropriate queue.size value according to business needs and data characteristics
  3. Monitor Memory: Even with a bounded queue, monitor system memory usage
  4. Understand Rate Limiting: The read_limit.rows_per_second parameter applies to downstream processing, not Kafka consumption

Summary

This fix not only resolved the memory overflow risk but also improved system stability and configurability. By introducing bounded queues and configurable parameters, users can better control system resource usage and avoid OOM caused by data backlog. It also reflects the virtuous cycle of open-source communities continuously improving product quality through user feedback.

면책 조항: 본 사이트에 재게시된 글들은 공개 플랫폼에서 가져온 것으로 정보 제공 목적으로만 제공됩니다. 이는 반드시 MEXC의 견해를 반영하는 것은 아닙니다. 모든 권리는 원저자에게 있습니다. 제3자의 권리를 침해하는 콘텐츠가 있다고 판단될 경우, crypto.news@mexc.com으로 연락하여 삭제 요청을 해주시기 바랍니다. MEXC는 콘텐츠의 정확성, 완전성 또는 시의적절성에 대해 어떠한 보증도 하지 않으며, 제공된 정보에 기반하여 취해진 어떠한 조치에 대해서도 책임을 지지 않습니다. 본 콘텐츠는 금융, 법률 또는 기타 전문적인 조언을 구성하지 않으며, MEXC의 추천이나 보증으로 간주되어서는 안 됩니다.

USD1 Genesis: 0 Fees + 12% APR

USD1 Genesis: 0 Fees + 12% APRUSD1 Genesis: 0 Fees + 12% APR

New users: stake for up to 600% APR. Limited time!