Class PulsarConsumerManager
java.lang.Object
io.numaproj.pulsar.consumer.PulsarConsumerManager
Lazily creates and caches the Pulsar consumer used by PulsarSource.
Supports two consumer shapes: raw byte[] (Schema.BYTES) or GenericRecord (Schema.AUTO_CONSUME).
Both consumers use Shared subscriptions so multiple pods can consume the same topic concurrently.
-
Constructor Summary
ConstructorsConstructorDescriptionPulsarConsumerManager(PulsarConsumerProperties pulsarConsumerProperties, org.apache.pulsar.client.api.PulsarClient pulsarClient) Creates a new manager. -
Method Summary
Modifier and TypeMethodDescriptionvoidcleanup()Closes any created consumers and the underlying Pulsar client.org.apache.pulsar.client.api.Consumer<byte[]> getOrCreateBytesConsumer(long count, long timeoutMillis) Returns the byte-array consumer, creating it on first call.org.apache.pulsar.client.api.Consumer<org.apache.pulsar.client.api.schema.GenericRecord> getOrCreateGenericRecordConsumer(long count, long timeoutMillis) Returns the AUTO_CONSUME (GenericRecord) consumer, creating it on first call.
-
Constructor Details
-
PulsarConsumerManager
public PulsarConsumerManager(PulsarConsumerProperties pulsarConsumerProperties, org.apache.pulsar.client.api.PulsarClient pulsarClient) Creates a new manager. Consumers are created lazily on first use.- Parameters:
pulsarConsumerProperties- parsed pulsar.consumer sectionpulsarClient- the Pulsar client
-
-
Method Details
-
getOrCreateBytesConsumer
public org.apache.pulsar.client.api.Consumer<byte[]> getOrCreateBytesConsumer(long count, long timeoutMillis) throws org.apache.pulsar.client.api.PulsarClientException Returns the byte-array consumer, creating it on first call.- Parameters:
count- maximum number of messages per batchtimeoutMillis- maximum time to wait for the batch to fill- Returns:
- the byte-array consumer
- Throws:
org.apache.pulsar.client.api.PulsarClientException- if consumer creation fails
-
getOrCreateGenericRecordConsumer
public org.apache.pulsar.client.api.Consumer<org.apache.pulsar.client.api.schema.GenericRecord> getOrCreateGenericRecordConsumer(long count, long timeoutMillis) throws org.apache.pulsar.client.api.PulsarClientException Returns the AUTO_CONSUME (GenericRecord) consumer, creating it on first call. Each message is validated against the registered topic schema.- Parameters:
count- maximum number of messages per batchtimeoutMillis- maximum time to wait for the batch to fill- Returns:
- the schema-aware consumer
- Throws:
org.apache.pulsar.client.api.PulsarClientException- if consumer creation fails
-
cleanup
public void cleanup()Closes any created consumers and the underlying Pulsar client. Individual close failures are logged rather than thrown.
-