Class PulsarSource
java.lang.Object
io.numaproj.numaflow.sourcer.Sourcer
io.numaproj.pulsar.consumer.PulsarSource
public class PulsarSource
extends io.numaproj.numaflow.sourcer.Sourcer
Numaflow user-defined source that reads messages from one or more Pulsar topics.
Supports raw byte[] messages (Schema.BYTES) or schema-validated messages (Schema.AUTO_CONSUME),
selected via PulsarConsumerProperties.useAutoConsumeSchema.
Pulsar message metadata is forwarded to Numaflow as x-pulsar-* headers.
-
Constructor Summary
ConstructorsConstructorDescriptionPulsarSource(PulsarConsumerManager pulsarConsumerManager, org.apache.pulsar.client.admin.PulsarAdmin pulsarAdmin, PulsarConsumerProperties pulsarConsumerProperties) Creates a new source. -
Method Summary
Modifier and TypeMethodDescriptionvoidack(io.numaproj.numaflow.sourcer.AckRequest request) Acknowledges all messages delivered in the previous read call.voidcleanup()Closes the underlying Pulsar consumers, client, and admin.Returns partition indices across all configured topics as a flat list [0, 1, 2, ...].longReturns the total subscription backlog across all configured topics.voidnack(io.numaproj.numaflow.sourcer.NackRequest request) Not yet implemented.voidread(io.numaproj.numaflow.sourcer.ReadRequest request, io.numaproj.numaflow.sourcer.OutputObserver observer) Reads a batch of messages from the configured Pulsar topic(s) and forwards them to Numaflow.voidStarts the Numaflow gRPC server and blocks until it terminates.Methods inherited from class io.numaproj.numaflow.sourcer.Sourcer
defaultPartitions
-
Constructor Details
-
PulsarSource
public PulsarSource(PulsarConsumerManager pulsarConsumerManager, org.apache.pulsar.client.admin.PulsarAdmin pulsarAdmin, PulsarConsumerProperties pulsarConsumerProperties) Creates a new source.- Parameters:
pulsarConsumerManager- factory for the byte-array or GenericRecord consumerpulsarAdmin- admin client used to query topic partitions and backlogpulsarConsumerProperties- parsed pulsar.consumer section of the config
-
-
Method Details
-
startServer
-
read
public void read(io.numaproj.numaflow.sourcer.ReadRequest request, io.numaproj.numaflow.sourcer.OutputObserver observer) Reads a batch of messages from the configured Pulsar topic(s) and forwards them to Numaflow. Returns early without reading if the previous batch has not yet been acknowledged.- Specified by:
readin classio.numaproj.numaflow.sourcer.Sourcer- Parameters:
request- the read request (count and timeout)observer- output channel that emits messages back to Numaflow
-
ack
public void ack(io.numaproj.numaflow.sourcer.AckRequest request) Acknowledges all messages delivered in the previous read call. The request offsets must match the messages from the previous read; otherwise the ack is skipped.- Specified by:
ackin classio.numaproj.numaflow.sourcer.Sourcer- Parameters:
request- the ack request containing the offsets to acknowledge
-
nack
public void nack(io.numaproj.numaflow.sourcer.NackRequest request) Not yet implemented.- Specified by:
nackin classio.numaproj.numaflow.sourcer.Sourcer- Throws:
UnsupportedOperationException- always
-
getPending
public long getPending()Returns the total subscription backlog across all configured topics.- Specified by:
getPendingin classio.numaproj.numaflow.sourcer.Sourcer- Returns:
- the total number of un-acknowledged messages, or -1 if the admin call fails
-
getPartitions
Returns partition indices across all configured topics as a flat list [0, 1, 2, ...]. Each Pulsar partition contributes one index. Non-partitioned topics count as one partition. Falls back to defaultPartitions() if partition lookup fails.- Specified by:
getPartitionsin classio.numaproj.numaflow.sourcer.Sourcer- Returns:
- the list of partition indices
-
cleanup
public void cleanup()Closes the underlying Pulsar consumers, client, and admin. Errors during shutdown are logged rather than thrown.
-