Documentation
¶
Overview ¶
Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the details of the underlying server RPCs. Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.
If you are migrating from the v1 library, follow the migration guide for a faster way of using this version https://github.com/googleapis/google-cloud-go/blob/main/pubsub/MIGRATING.md
More information about Pub/Sub is available at https://cloud.google.com/pubsub/docs.
See https://godoc.org/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
Publishing ¶
Pub/Sub messages are published to topics via publishers. A Topic may be created like so:
ctx := context.Background() client, _ := pubsub.NewClient(ctx, "my-project") topic, err := client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{ Name: "projects/my-project/topics/my-topic", })
A Publisher client can then be instantiated and used to publish messages.
publisher := client.Publisher(topic.GetName()) res := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")})
Publisher.Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub service.
Publisher.Publish returns a PublishResult, which behaves like a future: its Get method blocks until the message has been sent to the service.
The first time you call Publisher.Publish on a Publisher, goroutines are started in the background. To clean up these goroutines, call Publisher.Stop:
publisher.Stop()
Receiving ¶
To receive messages published to a topic, clients create a subscription for the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all associated subscriptions.
You then need to create a Subscriber client to pull messages from a subscription.
A subscription may be created like so:
ctx := context.Background() client, _ := pubsub.NewClient(ctx, "my-project") subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{ Name: "projects/my-project/subscriptions/my-sub", Topic: "projects/my-project/topics/my-topic"} ), }
A Subscriber client can be instantiated like so:
sub := client.Subscriber(subscription.GetName())
You then provide a callback to Subscriber which processes the messages.
err := sub.Receive(ctx, func(ctx context.Context, m *Message) { log.Printf("Got message: %s", m.Data) m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // Handle error. }
The callback is invoked concurrently by multiple goroutines, maximizing throughput. To terminate a call to Subscriber.Receive, cancel its context.
Once client code has processed the Message, it must call Message.Ack or Message.Nack. If Ack is not called, the Message will eventually be redelivered. Ack/Nack MUST be called within the Subscriber.Receive handler function, and not from a goroutine. Otherwise, flow control (e.g. ReceiveSettings.MaxOutstandingMessages) will not be respected, and messages can get orphaned when cancelling Receive, and redelivered slowly.
If the client cannot or doesn't want to process the message, it can call Message.Nack to speed redelivery. For more information and configuration options, see Ack Deadlines below.
Note: It is possible for a Message to be redelivered even if Message.Ack has been called unless exactly once delivery is enabled. Applications should be aware of these deliveries.
Note: This uses pubsub's streaming pull feature. This feature has properties that may be surprising. Please take a look at https://cloud.google.com/pubsub/docs/pull#streamingpull for more details on how streaming pull behaves compared to the synchronous pull method.
Streams Management ¶
The number of StreamingPull connections can be configured by setting NumGoroutines in ReceiveSettings. The default value of 1 means the client library will maintain 1 StreamingPull connection. This is more than sufficient for most use cases, as StreamingPull connections can handle up to 10 MB/s https://cloud.google.com/pubsub/quotas#resource_limits. In some cases, using too many streams can lead to client library behaving poorly as the application becomes I/O bound.
By default, the number of connections in the gRPC conn pool is min(4,GOMAXPROCS). Each connection supports up to 100 streams. Thus, if you have 4 or more CPU cores, the default setting allows a maximum of 400 streams which is already excessive for most use cases. If you want to change the limits on the number of streams, you can change the number of connections in the gRPC connection pool as shown below:
opts := []option.ClientOption{ option.WithGRPCConnectionPool(2), } client, err := pubsub.NewClient(ctx, projID, opts...)
Ack Deadlines ¶
The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.
Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ack deadline". Unless a message is acknowledged within the ack deadline, or the client requests that the ack deadline be extended, the message will become eligible for redelivery.
As a convenience, the pubsub client will automatically extend deadlines until either:
- Message.Ack or Message.Nack is called, or
- The "MaxExtension" duration elapses from the time the message is fetched from the server. This defaults to 60m.
Ack deadlines are extended periodically by the client. The period between extensions, as well as the length of the extension, automatically adjusts based on the time it takes the subscriber application to ack messages (based on the 99th percentile of ack latency). By default, this extension period is capped at 10m, but this limit can be configured by the Min/MaxDurationPerAckExtension settings. This has the effect that subscribers that process messages quickly have their message ack deadlines extended for a short amount, whereas subscribers that process message slowly have their message ack deadlines extended for a large amount. The net effect is fewer RPCs sent from the client library.
For example, consider a subscriber that takes 3 minutes to process each message. Since the library has already recorded several 3-minute "ack latencies"s in a percentile distribution, future message extensions are sent with a value of 3 minutes, every 3 minutes. Suppose the application crashes 5 seconds after the library sends such an extension: the Pub/Sub server would wait the remaining 2m55s before re-sending the messages out to other subscribers.
Please note that by default, the client library does not use the subscription's AckDeadline for the MaxExtension value.
Slow Message Processing ¶
Since long-lived streams are periodically killed by firewalls, we recommend avoiding message processing that takes longer than 30 minutes. Otherwise, you are more likely to experience message redeliveries.
Emulator ¶
To use an emulator with this library, you can set the PUBSUB_EMULATOR_HOST environment variable to the address at which your emulator is running. This will send requests to that address instead of to Pub/Sub. You can then create and use a client as usual:
// Set PUBSUB_EMULATOR_HOST environment variable. err := os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:9000") if err != nil { // TODO: Handle error. } // Create client as usual. client, err := pubsub.NewClient(ctx, "my-project-id") if err != nil { // TODO: Handle error. } defer client.Close()
Index ¶
- Constants
- Variables
- func NewMessageCarrierFromPB(msg *pb.PubsubMessage) propagation.TextMapCarrier
- type AckResult
- type AcknowledgeStatus
- type Client
- type ClientConfig
- type ErrPublishingPaused
- type FlowControlSettings
- type LimitExceededBehavior
- type Message
- type PublishResult
- type PublishSettings
- type Publisher
- type ReceiveSettings
- type Subscriber
Examples ¶
Constants ¶
const ( // MaxPublishRequestCount is the maximum number of messages that can be in // a single publish request, as defined by the PubSub service. MaxPublishRequestCount = 1000 // MaxPublishRequestBytes is the maximum size of a single publish request // in bytes, as defined by the PubSub service. MaxPublishRequestBytes = 1e7 )
const ( // ScopePubSub grants permissions to view and manage Pub/Sub // topics and subscriptions. ScopePubSub = "https://www.googleapis.com/auth/pubsub" // ScopeCloudPlatform grants permissions to view and manage your data // across Google Cloud Platform services. ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" )
const DetectProjectID = "*detect-project-id*"
DetectProjectID is a sentinel value that instructs NewClient to detect the project ID. It is given in place of the projectID argument. NewClient will use the project ID from the given credentials or the default credentials (https://developers.google.com/accounts/docs/application-default-credentials) if no credentials were provided. When providing credentials, not all options will allow NewClient to extract the project ID. Specifically a JWT does not have the project ID encoded.
Variables ¶
var ( // ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages. ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded") // ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes. ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded") )
var ( // PublishedMessages is a measure of the number of messages published, which may include errors. // It is EXPERIMENTAL and subject to change or removal without notice. PublishedMessages = stats.Int64(statsPrefix+"published_messages", "Number of PubSub message published", stats.UnitDimensionless) // PublishLatency is a measure of the number of milliseconds it took to publish a bundle, // which may consist of one or more messages. // It is EXPERIMENTAL and subject to change or removal without notice. PublishLatency = stats.Float64(statsPrefix+"publish_roundtrip_latency", "The latency in milliseconds per publish batch", stats.UnitMilliseconds) // PullCount is a measure of the number of messages pulled. // It is EXPERIMENTAL and subject to change or removal without notice. PullCount = stats.Int64(statsPrefix+"pull_count", "Number of PubSub messages pulled", stats.UnitDimensionless) // AckCount is a measure of the number of messages acked. // It is EXPERIMENTAL and subject to change or removal without notice. AckCount = stats.Int64(statsPrefix+"ack_count", "Number of PubSub messages acked", stats.UnitDimensionless) // NackCount is a measure of the number of messages nacked. // It is EXPERIMENTAL and subject to change or removal without notice. NackCount = stats.Int64(statsPrefix+"nack_count", "Number of PubSub messages nacked", stats.UnitDimensionless) // ModAckCount is a measure of the number of messages whose ack-deadline was modified. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckCount = stats.Int64(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", stats.UnitDimensionless) // ModAckTimeoutCount is a measure of the number ModifyAckDeadline RPCs that timed out. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckTimeoutCount = stats.Int64(statsPrefix+"mod_ack_timeout_count", "Number of ModifyAckDeadline RPCs that timed out", stats.UnitDimensionless) // StreamOpenCount is a measure of the number of times a streaming-pull stream was opened. // It is EXPERIMENTAL and subject to change or removal without notice. StreamOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", stats.UnitDimensionless) // StreamRetryCount is a measure of the number of times a streaming-pull operation was retried. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRetryCount = stats.Int64(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", stats.UnitDimensionless) // StreamRequestCount is a measure of the number of requests sent on a streaming-pull stream. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRequestCount = stats.Int64(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", stats.UnitDimensionless) // StreamResponseCount is a measure of the number of responses received on a streaming-pull stream. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless) // OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed. // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless) // OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up. // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless) // PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed. // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless) // PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up. // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless) )
The following are measures recorded in publish/subscribe flows.
var ( // PublishedMessagesView is a cumulative sum of PublishedMessages. // It is EXPERIMENTAL and subject to change or removal without notice. PublishedMessagesView *view.View // PublishLatencyView is a distribution of PublishLatency. // It is EXPERIMENTAL and subject to change or removal without notice. PublishLatencyView *view.View // PullCountView is a cumulative sum of PullCount. // It is EXPERIMENTAL and subject to change or removal without notice. PullCountView *view.View // AckCountView is a cumulative sum of AckCount. // It is EXPERIMENTAL and subject to change or removal without notice. AckCountView *view.View // NackCountView is a cumulative sum of NackCount. // It is EXPERIMENTAL and subject to change or removal without notice. NackCountView *view.View // ModAckCountView is a cumulative sum of ModAckCount. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckCountView *view.View // ModAckTimeoutCountView is a cumulative sum of ModAckTimeoutCount. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckTimeoutCountView *view.View // StreamOpenCountView is a cumulative sum of StreamOpenCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamOpenCountView *view.View // StreamRetryCountView is a cumulative sum of StreamRetryCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRetryCountView *view.View // StreamRequestCountView is a cumulative sum of StreamRequestCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRequestCountView *view.View // StreamResponseCountView is a cumulative sum of StreamResponseCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCountView *view.View // OutstandingMessagesView is the last value of OutstandingMessages // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingMessagesView *view.View // OutstandingBytesView is the last value of OutstandingBytes // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytesView *view.View // PublisherOutstandingMessagesView is the last value of OutstandingMessages // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingMessagesView *view.View // PublisherOutstandingBytesView is the last value of OutstandingBytes // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingBytesView *view.View )
var ( DefaultPublishViews []*view.View DefaultSubscribeViews []*view.View )
These arrays hold the default OpenCensus views that keep track of publish/subscribe operations. It is EXPERIMENTAL and subject to change or removal without notice.
var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, Timeout: 60 * time.Second, FlowControlSettings: FlowControlSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlIgnore, }, EnableCompression: false, CompressionBytesThreshold: 240, }
DefaultPublishSettings holds the default values for topics' PublishSettings.
var DefaultReceiveSettings = ReceiveSettings{ MaxExtension: 60 * time.Minute, MaxDurationPerAckExtension: 0, MinDurationPerAckExtension: 0, MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, NumGoroutines: 1, }
DefaultReceiveSettings holds the default values for ReceiveSettings.
var ErrEmptyProjectID = errors.New("pubsub: projectID string is empty")
ErrEmptyProjectID denotes that the project string passed into NewClient was empty. Please provide a valid project ID or use the DetectProjectID sentinel value to detect project ID from well defined sources.
var ErrOversizedMessage = bundler.ErrOversizedItem
ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
var ErrPublisherStopped = errors.New("pubsub: Stop has been called for this publisher")
ErrPublisherStopped indicates that topic has been stopped and further publishing will fail.
Functions ¶
func NewMessageCarrierFromPB ¶
func NewMessageCarrierFromPB(msg *pb.PubsubMessage) propagation.TextMapCarrier
NewMessageCarrierFromPB creates a propagation.TextMapCarrier that can be used to extract the trace context from a protobuf PubsubMessage.
Example: ctx = propagation.TraceContext{}.Extract(ctx, pubsub.NewMessageCarrierFromPB(msg))
Types ¶
type AckResult ¶
AckResult holds the result from a call to Ack or Nack.
Call Get to obtain the result of the Ack/NackWithResult call. Example:
// Get blocks until Ack/NackWithResult completes or ctx is done. ackStatus, err := r.Get(ctx) if err != nil { // TODO: Handle error. }
type AcknowledgeStatus ¶
type AcknowledgeStatus = ipubsub.AcknowledgeStatus
AcknowledgeStatus represents the status of an Ack or Nack request.
const ( // AcknowledgeStatusSuccess indicates the request was a success. AcknowledgeStatusSuccess AcknowledgeStatus = iota // AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions. AcknowledgeStatusPermissionDenied // AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error. AcknowledgeStatusFailedPrecondition // AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid. AcknowledgeStatusInvalidAckID // AcknowledgeStatusOther indicates another unknown error was returned. AcknowledgeStatusOther )
type Client ¶
type Client struct { // TopicAdminClient is a convenience exposure of the underlying gRPC client // for making topic admin calls (CRUDL operations). While this client also // contains data APIs (Publish), it is highly recommended to use Publisher.Publish instead. TopicAdminClient *vkit.TopicAdminClient // SubscriptionAdminClient is a convenience exposure of the underlying gRPC client // for making subscription admin calls (CRUDL operations). While this client also // contains data APIs (StreamingPull), it is highly recommended to use the methods // in Client.Subscriber instead. SubscriptionAdminClient *vkit.SubscriptionAdminClient // contains filtered or unexported fields }
Client is a Pub/Sub client scoped to a single project.
Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.
func NewClient ¶
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error)
NewClient creates a new PubSub client. It uses a default configuration.
Example ¶
ctx := context.Background() _, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // See the other examples to learn how to use the Client.
func NewClientWithConfig ¶
func NewClientWithConfig(ctx context.Context, projectID string, config *ClientConfig, opts ...option.ClientOption) (*Client, error)
NewClientWithConfig creates a new PubSub client.
func (*Client) Close ¶
Close releases any resources held by the client, such as memory and goroutines.
If the client is available for the lifetime of the program, then Close need not be called at exit.
func (*Client) Project ¶
Project returns the project ID or number for this instance of the client, which may have either been explicitly specified or autodetected.
func (*Client) Publisher ¶
Publisher constructs a publisher client from either a topicID or a topic name, otherwise known as a full path.
The client created is a reference and does not return any errors if the topic does not exist. Errors will be returned when attempting to Publish instead. If a Publisher's Publish method is called, it has background goroutines associated with it. Clean them up by calling Publisher.Stop.
It is best practice to reuse the Publisher when publishing to the same topic. Avoid creating many Publisher instances if you use them to publish.
Example ¶
Use Publisher to refer to a topic that is not in the client's project, such as a public topic.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } otherProjectID := "another-project-id" publisher := client.Publisher(fmt.Sprintf("projects/%s/topics/%s", otherProjectID, "my-topic")) _ = publisher // TODO: use the publisher client.
func (*Client) Subscriber ¶
func (c *Client) Subscriber(nameOrID string) *Subscriber
Subscriber creates a subscriber client which references a single subscription.
type ClientConfig ¶
type ClientConfig struct { // TopicAdminCallOptions controls the behavior (e.g. retries) of RPC // calls for the underlying gRPC publisher/topic admin client. // This includes both admin and data plane calls, even though // this is just named SubscriptionAdminCallOptions. TopicAdminCallOptions *vkit.TopicAdminCallOptions // SubscriptionAdminCallOptions controls the behavior (e.g. retries) // of RPC calls for the underlying gRPC subscriber/subscription admin client. // This includes both admin and data plane calls, even though // this is just named SubscriptionAdminCallOptions. SubscriptionAdminCallOptions *vkit.SubscriptionAdminCallOptions // EnableOpenTelemetryTracing enables tracing for this client. // This option allows selectively disabling Pub/Sub traces. // This defaults to false. // OpenTelemetry tracing standards are in active development, and thus // attributes, links, and span names are EXPERIMENTAL and subject to // change or removal without notice. EnableOpenTelemetryTracing bool }
ClientConfig has configurations for the client.
type ErrPublishingPaused ¶
type ErrPublishingPaused struct {
OrderingKey string
}
ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key.
func (ErrPublishingPaused) Error ¶
func (e ErrPublishingPaused) Error() string
type FlowControlSettings ¶
type FlowControlSettings struct { // MaxOutstandingMessages is the maximum number of buffered messages to be published. // If less than or equal to zero, this is disabled. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of buffered messages to be published. // If less than or equal to zero, this is disabled. MaxOutstandingBytes int // LimitExceededBehavior configures the behavior when trying to publish // additional messages while the flow controller is full. The available options // are Ignore (disable, default), Block, and SignalError (publish // results will return an error). LimitExceededBehavior LimitExceededBehavior }
FlowControlSettings controls flow control for messages while publishing or subscribing.
type LimitExceededBehavior ¶
type LimitExceededBehavior int
LimitExceededBehavior configures the behavior that flowController can use in case the flow control limits are exceeded.
const ( // FlowControlIgnore disables flow control. FlowControlIgnore LimitExceededBehavior = iota // FlowControlBlock signals to wait until the request can be made without exceeding the limit. FlowControlBlock // FlowControlSignalError signals an error to the caller of acquire. FlowControlSignalError )
type Message ¶
Message represents a Pub/Sub message.
Message can be passed to Publisher.Publish for publishing.
If received in the callback passed to Subscription.Receive, client code must call Message.Ack or Message.Nack when finished processing the Message. Calls to Ack or Nack have no effect after the first call.
Ack indicates successful processing of a Message. If message acknowledgement fails, the Message will be redelivered. Nack indicates that the client will not or cannot process a Message. Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
If using exactly once delivery, you should call Message.AckWithResult and Message.NackWithResult instead. These methods will return an AckResult, which tracks the state of acknowledgement operation. If the AckResult returns successful, the message is guaranteed NOT to be re-delivered. Otherwise, the AckResult will return an error with more details about the failure and the message may be re-delivered.
type PublishResult ¶
type PublishResult = ipubsub.PublishResult
A PublishResult holds the result from a call to Publish.
Call Get to obtain the result of the Publish call. Example:
// Get blocks until Publish completes or ctx is done. id, err := r.Get(ctx) if err != nil { // TODO: Handle error. }
type PublishSettings ¶
type PublishSettings struct { // Publish a non-empty batch after this delay has passed. DelayThreshold time.Duration // Publish a batch when it has this many messages. The maximum is // MaxPublishRequestCount. CountThreshold int // Publish a batch when its size in bytes reaches this value. ByteThreshold int // The number of goroutines used in each of the data structures that are // involved along the the Publish path. Adjusting this value adjusts // concurrency along the publish path. // // Defaults to a multiple of GOMAXPROCS. NumGoroutines int // The maximum time that the client will attempt to publish a bundle of messages. Timeout time.Duration // FlowControlSettings defines publisher flow control settings. FlowControlSettings FlowControlSettings // EnableCompression enables transport compression for Publish operations EnableCompression bool // CompressionBytesThreshold defines the threshold (in bytes) above which messages // are compressed for transport. Only takes effect if EnableCompression is true. CompressionBytesThreshold int }
PublishSettings control the bundling of published messages.
type Publisher ¶
type Publisher struct { // Settings for publishing messages. All changes must be made before the // first call to Publish. The default is DefaultPublishSettings. PublishSettings PublishSettings // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool // contains filtered or unexported fields }
Publisher is a reference to a PubSub publisher, associated with a single topic.
The methods of Publisher are safe for use by multiple goroutines.
func (*Publisher) Flush ¶
func (t *Publisher) Flush()
Flush blocks until all remaining messages are sent.
func (*Publisher) Publish ¶
func (t *Publisher) Publish(ctx context.Context, msg *Message) *PublishResult
Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's PublishSettings. Publish never blocks.
Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish will immediately return a PublishResult with an error.
Example ¶
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } publisher := client.Publisher("topicName") defer publisher.Stop() var results []*pubsub.PublishResult r := publisher.Publish(ctx, &pubsub.Message{ Data: []byte("hello world"), }) results = append(results, r) // Do other work ... for _, r := range results { id, err := r.Get(ctx) if err != nil { // TODO: Handle error. } fmt.Printf("Published a message with a message ID: %s\n", id) }
func (*Publisher) ResumePublish ¶
ResumePublish resumes accepting messages for the provided ordering key. Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.
type ReceiveSettings ¶
type ReceiveSettings struct { // MaxExtension is the maximum period for which the subscriber should // automatically extend the ack deadline for each message. // // The subscriber will automatically extend the ack deadline of all // fetched Messages up to the duration specified. Automatic deadline // extension beyond the initial receipt may be disabled by specifying a // duration less than 0. MaxExtension time.Duration // MaxDurationPerAckExtension is the maximum duration by which to extend the ack // deadline at a time. The ack deadline will continue to be extended by up // to this duration until MaxExtension is reached. Setting MaxDurationPerAckExtension // bounds the maximum amount of time before a message redelivery in the // event the subscriber fails to extend the deadline. // // MaxDurationPerAckExtension must be between 10s and 600s (inclusive). This configuration // can be disabled by specifying a duration less than (or equal to) 0. MaxDurationPerAckExtension time.Duration // MinDurationPerAckExtension is the the min duration for a single lease extension attempt. // By default the 99th percentile of ack latency is used to determine lease extension // periods but this value can be set to minimize the number of extraneous RPCs sent. // // MinDurationPerAckExtension must be between 10s and 600s (inclusive). This configuration // can be disabled by specifying a duration less than (or equal to) 0. // Disabled by default but set to 60 seconds if the subscription has exactly-once delivery enabled. MinDurationPerAckExtension time.Duration // MaxOutstandingMessages is the maximum number of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. // If the value is negative, then there will be no limit on the number of // unprocessed messages. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If // the value is negative, then there will be no limit on the number of bytes // for unprocessed messages. MaxOutstandingBytes int // NumGoroutines sets the number of StreamingPull streams to pull messages // from the subscription. // // NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines. // // NumGoroutines does not limit the number of messages that can be processed // concurrently. Even with one goroutine, many messages might be processed at // once, because that goroutine may continually receive messages and invoke the // function passed to Receive on them. To limit the number of messages being // processed concurrently, set MaxOutstandingMessages. NumGoroutines int }
ReceiveSettings configure the Receive method. A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type Subscriber ¶
type Subscriber struct { // Settings for pulling messages. Configure these before calling Receive. ReceiveSettings ReceiveSettings // contains filtered or unexported fields }
Subscriber is a subscriber client that references a subscription.
func (*Subscriber) ID ¶
func (s *Subscriber) ID() string
ID returns the unique identifier of the subscription within its project.
func (*Subscriber) Receive ¶
Receive calls f with the outstanding messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.
The standard way to terminate a Receive is to cancel its context:
cctx, cancel := context.WithCancel(ctx) err := sub.Receive(cctx, callback) // Call cancel from callback, or another goroutine.
If the service returns a non-retryable error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns nil after all of the outstanding calls to f have returned and all messages have been acknowledged or have expired.
Receive calls f concurrently from multiple goroutines. It is encouraged to process messages synchronously in f, even if that processing is relatively time-consuming; Receive will spawn new goroutines for incoming messages, limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
The context passed to f will be canceled when ctx is Done or there is a fatal service error.
Receive will send an ack deadline extension on message receipt, then automatically extend the ack deadline of all fetched Messages up to the period specified by s.ReceiveSettings.MaxExtension.
Each Subscriber may have only one invocation of Receive active at a time.
Example ¶
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Can use either "projects/project-id/subscriptions/sub-id" or just "sub-id" here sub := client.Subscriber("sub-id") err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. // NOTE: May be called concurrently; synchronize access to shared memory. m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. }
Example (MaxExtension) ¶
This example shows how to configure keepalive so that unacknowledged messages expire quickly, allowing other subscribers to take them.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscriber("subNameOrID") // This program is expected to process and acknowledge messages in 30 seconds. If // not, the Pub/Sub API will assume the message is not acknowledged. sub.ReceiveSettings.MaxExtension = 30 * time.Second err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. }
Example (MaxOutstanding) ¶
This example shows how to throttle Subscription.Receive, which aims for high throughput by default. By limiting the number of messages and/or bytes being processed at once, you can bound your program's resource consumption.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscriber("subNameOrID") sub.ReceiveSettings.MaxOutstandingMessages = 5 sub.ReceiveSettings.MaxOutstandingBytes = 10e6 err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. }
func (*Subscriber) String ¶
func (s *Subscriber) String() string
String returns the globally unique printable name of the subscription.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package pubsub is an auto-generated package for the Cloud Pub/Sub API.
|
Package pubsub is an auto-generated package for the Cloud Pub/Sub API. |
benchwrapper
Package main wraps the client library in a gRPC interface that a benchmarker can communicate through.
|
Package main wraps the client library in a gRPC interface that a benchmarker can communicate through. |
Package pstest provides a fake Cloud PubSub service for testing.
|
Package pstest provides a fake Cloud PubSub service for testing. |