Byte Array Source (Consumer)¶
This guide walks you through consuming raw byte array messages from a Pulsar topic into a Numaflow pipeline.
The example builds a pipeline that reads from a Pulsar topic and prints received messages using the built-in Numaflow log sink.
How the consumer reads and acks¶
The consumer follows a strict read → ack cycle with no read ahead. Numaflow calls read() and ack() on the source in alternating fashion, and this consumer enforces that pattern tightly:
- No readahead.
read()returns immediately without pulling a new batch from Pulsar if the previous batch has not yet been fully acknowledged. A new batch is only fetched oncemessagesToAckis empty. - Strict ack matching.
ack()compares the offsets in the request against the set of pending message IDs from the last read. If they don't match exactly (missing IDs, extra IDs, reordered in a way that changes the set), the ack is skipped and logged as an error — no partial acks. Pulsar will eventually redeliver those messages. - No reack of already-acked messages. Because
messagesToAckis cleared after a successful ack, a repeated ack for the same batch is treated as a mismatch and ignored.
This design keeps the consumer's state simple: at any moment, either a batch is in flight waiting to be acked, or nothing is pending and the next read can proceed. There is no overlap between batches and no prefetch beyond Pulsar's own receiverQueueSize.
Prerequisites¶
- A running Pulsar cluster — run one locally with
docker-compose up(see the Home page) or deploy on StreamNative. - If you want a partitioned topic, create it before deploying the pipeline.
1. Create the ConfigMap¶
The ConfigMap holds application.yml, which configures the Pulsar client, consumer, and admin.
apiVersion: v1
kind: ConfigMap
metadata:
name: pulsar-config
data:
application.yml: |
pulsar:
client:
clientConfig:
serviceUrl: "https://pc-xxxxx.streamnative.aws.snio.cloud"
authPluginClassName: org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams: "${PULSAR_AUTH_TOKEN}"
# Example: OAuth2 - authParams: "${PULSAR_OAUTH_CLIENT_SECRET}"
consumer:
enabled: true
useAutoConsumeSchema: true
consumerConfig:
# Single topic (string) or multiple topics (comma-separated string)
topicNames: "persistent://public/default/test-topic"
# topicNames: "persistent://public/default/topic-a, persistent://public/default/topic-b"
subscriptionName: "test-subscription"
admin:
adminConfig:
serviceUrl: "https://pc-xxxxx.streamnative.aws.snio.cloud"
authPluginClassName: org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams: "${PULSAR_AUTH_TOKEN}"
Both client and admin are required
The application uses the Pulsar client to consume messages and the Pulsar admin API to inspect topics (partition counts, subscription backlog). Both sections must be present in the ConfigMap — omitting admin will cause the consumer to fail on startup.
How ${PULSAR_AUTH_TOKEN} works
${PULSAR_AUTH_TOKEN} is not resolved by Kubernetes — it's resolved by the application at runtime. The Pipeline spec uses envFrom to inject Secret keys as env vars into the container. When the app reads application.yml, it substitutes ${PULSAR_AUTH_TOKEN} with the env value.
Why authParams and not authParamMap?
Pulsar generally supports both authParams (single string) and authParamMap (key/value map) for auth plugins. For AuthenticationToken specifically, only the string form works — the class has no constructor that accepts a Map, so authParamMap will fail at startup. See the AuthenticationToken source.
Key fields¶
The table below highlights the most common fields. For the full list of accepted keys under each section, see the official Pulsar docs:
clientConfig→ all client configurationsconsumerConfig→ all consumer configurationsadminConfig→ accepts the same keys asclientConfig
| Field | Required | Notes |
|---|---|---|
client.clientConfig.serviceUrl |
yes | The broker URL. |
consumer.consumerConfig.topicNames |
yes | Single topic as a string, or multiple topics as a comma-separated string (e.g. "persistent://public/default/topic-a, persistent://public/default/topic-b"). |
consumer.consumerConfig.subscriptionName |
no | Defaults to {PipelineName}-{VertexName}-sub. |
consumer.consumerConfig.subscriptionInitialPosition |
no | Earliest or Latest. Defaults to Latest — use Earliest to replay messages produced before the subscription existed. |
admin.adminConfig.serviceUrl |
yes | The admin (web) URL. For StreamNative Cloud and most managed clusters, this is the same HTTPS URL as clientConfig.serviceUrl. For self-hosted Pulsar, it's typically a separate URL (e.g. http://broker:8080 vs pulsar://broker:6650). |
Object types in the ConfigMap
Pulsar types that are Enums can be passed as strings. Other complex types need YAML List/Map support — check the Pulsar docs if a field isn't behaving as expected.
Keep receiverQueueSize equal to readBatchSize
The Pulsar consumer maintains a local prefetch buffer (receiverQueueSize) separate from Numaflow's readBatchSize. When these two values differ, behavior changes across Pulsar client versions due to apache/pulsar#22619:
- Pulsar client 3.x — the smaller
receiverQueueSizesilently overrides a largerreadBatchSize. - Pulsar client 4.x+ — the larger
readBatchSizerewritesreceiverQueueSizeto match.
To avoid version-dependent surprises, always set consumer.consumerConfig.receiverQueueSize equal to the pipeline's limits.readBatchSize.
2. Create the Secret¶
Create a Kubernetes Secret with your Pulsar credentials. The ConfigMap references these via ${PULSAR_AUTH_TOKEN}.
apiVersion: v1
kind: Secret
metadata:
name: pulsar-consumer-auth-secret
type: Opaque
stringData:
# API Token Authentication (StreamNative Cloud, DataStax Astra, self-hosted with tokens)
PULSAR_AUTH_TOKEN: "YOUR-API-KEY-HERE"
# Example: OAuth2 client secret
# PULSAR_OAUTH_CLIENT_SECRET: "your-oauth-client-secret"
Local clusters without auth
For local Pulsar clusters without authentication, skip the Secret and remove authPluginClassName, authParams, and envFrom from the ConfigMap and Pipeline.
3. Create the Pipeline¶
Update the image field to the version you want (from Quay.io tags or a locally built image). Make sure the args under the consumer vertex match the file path in the ConfigMap.
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: raw-consumer-pipeline
spec:
limits:
readBatchSize: 1
vertices:
- name: in
scale:
min: 1
volumes:
- name: pulsar-config-volume
configMap:
name: pulsar-config
items:
- key: application.yml
path: application.yml
source:
udsource:
container:
image: apache-pulsar-java:v0.3.0
args: ["--config=file:/conf/application.yml"]
imagePullPolicy: Never
env:
# Uncomment to enable debug-level per-message logs
# - name: LOGGING_LEVEL_IO_NUMAPROJ_PULSAR
# value: "DEBUG"
# Uncomment to switch from JSON to plain-text log output
# - name: NUMAFLOW_DEBUG
# value: "true"
envFrom:
- secretRef:
name: pulsar-consumer-auth-secret
volumeMounts:
- name: pulsar-config-volume
mountPath: /conf
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: out
4. Observe the messages¶
Wait for the pipeline to be up and running, then produce messages to the topic. You should see them printed in the log sink.