Class PulsarSource

java.lang.Object
io.numaproj.numaflow.sourcer.Sourcer
io.numaproj.pulsar.consumer.PulsarSource

public class PulsarSource extends io.numaproj.numaflow.sourcer.Sourcer
Numaflow user-defined source that reads messages from one or more Pulsar topics. Supports raw byte[] messages (Schema.BYTES) or schema-validated messages (Schema.AUTO_CONSUME), selected via PulsarConsumerProperties.useAutoConsumeSchema. Pulsar message metadata is forwarded to Numaflow as x-pulsar-* headers.
  • Constructor Summary

    Constructors
    Constructor
    Description
    PulsarSource(PulsarConsumerManager pulsarConsumerManager, org.apache.pulsar.client.admin.PulsarAdmin pulsarAdmin, PulsarConsumerProperties pulsarConsumerProperties)
    Creates a new source.
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    ack(io.numaproj.numaflow.sourcer.AckRequest request)
    Acknowledges all messages delivered in the previous read call.
    void
    Closes the underlying Pulsar consumers, client, and admin.
    Returns partition indices across all configured topics as a flat list [0, 1, 2, ...].
    long
    Returns the total subscription backlog across all configured topics.
    void
    nack(io.numaproj.numaflow.sourcer.NackRequest request)
    Not yet implemented.
    void
    read(io.numaproj.numaflow.sourcer.ReadRequest request, io.numaproj.numaflow.sourcer.OutputObserver observer)
    Reads a batch of messages from the configured Pulsar topic(s) and forwards them to Numaflow.
    void
    Starts the Numaflow gRPC server and blocks until it terminates.

    Methods inherited from class io.numaproj.numaflow.sourcer.Sourcer

    defaultPartitions

    Methods inherited from class java.lang.Object

    equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • PulsarSource

      public PulsarSource(PulsarConsumerManager pulsarConsumerManager, org.apache.pulsar.client.admin.PulsarAdmin pulsarAdmin, PulsarConsumerProperties pulsarConsumerProperties)
      Creates a new source.
      Parameters:
      pulsarConsumerManager - factory for the byte-array or GenericRecord consumer
      pulsarAdmin - admin client used to query topic partitions and backlog
      pulsarConsumerProperties - parsed pulsar.consumer section of the config
  • Method Details

    • startServer

      public void startServer() throws Exception
      Starts the Numaflow gRPC server and blocks until it terminates.
      Throws:
      Exception - if the gRPC server fails
    • read

      public void read(io.numaproj.numaflow.sourcer.ReadRequest request, io.numaproj.numaflow.sourcer.OutputObserver observer)
      Reads a batch of messages from the configured Pulsar topic(s) and forwards them to Numaflow. Returns early without reading if the previous batch has not yet been acknowledged.
      Specified by:
      read in class io.numaproj.numaflow.sourcer.Sourcer
      Parameters:
      request - the read request (count and timeout)
      observer - output channel that emits messages back to Numaflow
    • ack

      public void ack(io.numaproj.numaflow.sourcer.AckRequest request)
      Acknowledges all messages delivered in the previous read call. The request offsets must match the messages from the previous read; otherwise the ack is skipped.
      Specified by:
      ack in class io.numaproj.numaflow.sourcer.Sourcer
      Parameters:
      request - the ack request containing the offsets to acknowledge
    • nack

      public void nack(io.numaproj.numaflow.sourcer.NackRequest request)
      Not yet implemented.
      Specified by:
      nack in class io.numaproj.numaflow.sourcer.Sourcer
      Throws:
      UnsupportedOperationException - always
    • getPending

      public long getPending()
      Returns the total subscription backlog across all configured topics.
      Specified by:
      getPending in class io.numaproj.numaflow.sourcer.Sourcer
      Returns:
      the total number of un-acknowledged messages, or -1 if the admin call fails
    • getPartitions

      public List<Integer> getPartitions()
      Returns partition indices across all configured topics as a flat list [0, 1, 2, ...]. Each Pulsar partition contributes one index. Non-partitioned topics count as one partition. Falls back to defaultPartitions() if partition lookup fails.
      Specified by:
      getPartitions in class io.numaproj.numaflow.sourcer.Sourcer
      Returns:
      the list of partition indices
    • cleanup

      public void cleanup()
      Closes the underlying Pulsar consumers, client, and admin. Errors during shutdown are logged rather than thrown.