Skip to content

Event Streams SDK allows you to integrate your application with Event Streams RESTful APIs.

License

Notifications You must be signed in to change notification settings

IBM/eventstreams-go-sdk

Repository files navigation

Build Status semantic-release

IBM Cloud Event Streams Go SDK Version 1.3.1

Introduction

IBM Event Streams for IBM Cloud™ is a high-throughput message bus built with Apache Kafka. It is optimized for event ingestion into IBM Cloud and event stream distribution between your services and applications.

Event Streams provides a REST API to help connect your existing systems to your Event Streams Kafka cluster. Using the API, you can integrate Event Streams with any system that supports RESTful APIs.

Documentation IBM Cloud Event Streams Service APIs.

This is the Event Streams Software Development Kit for Go It includes a library of functions used to access an Event Streams service instance.

Table of Contents

Overview

The IBM Cloud Event Streams Go SDK allows developers to programmatically interact with the following IBM Cloud services:

Service Name Package name
Admin Rest pkg/adminrestv1
Schema Registry pkg/schemaregistryv1

Prerequisites

  • An IBM Cloud account.
  • The IBM Cloud CLI.
  • An IAM API key to allow the SDK to access your account. Create one here.
  • An IBM Cloud Event Streams Instance Create one here
  • Go version 1.14 or above.

Installation

The current version of this SDK: 1.3.1

There are a few different ways to download and install the Event Streams Go SDK project for use by your Go application:

go get command

Use this command to download and install the SDK to allow your Go application to use it:

go get -u github.ibm.com/IBM/eventstreams-go-sdk 

Go modules

If your application is using Go modules, you can add a suitable import to your Go application, like this:

import (
	"github.ibm.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
)

then run go mod tidy to download and install the new dependency and update your Go application's go.mod file.

dep dependency manager

If your application is using the dep dependency management tool, you can add a dependency to your Gopkg.toml file. Here is an example:

[[constraint]]
  name = "github.ibm.com/IBM/eventstreams-go-sdk"
  version = "1.3.1"

then run dep ensure.

Using the SDK

For general SDK usage information, please see this link

For details on using the SDK for administering Kafka topics, please see kafka_topic_operations.md.

For details on using the SDK for schema operations, please see schema_operations.md

See examples for examples on using adminrest and schema SDKs.

Questions

If you are having difficulties using this SDK or have a question about the IBM Cloud services, please ask a question at Stack Overflow.

Issues

If you encounter an issue with the project, you are welcome to submit a bug report. Before that, please search for similar issues. It's possible that someone has already reported the problem.

Open source @ IBM

Find more open source projects on the IBM Github Page

Contributing

See CONTRIBUTING.

License

This SDK project is released under the Apache 2.0 license. The license's full text can be found in LICENSE.

Event Streams Administration REST API

This REST API allows users of the IBM Event Streams service to administer Kafka topics associated with an instance of the service. You can use this API to perform the following operations:

The Admin REST API is also documented using swagger.

Access control


All requests support below authorization methods:

  • Basic authorization with user and password. ( For both standard, enterprise and lite plans, user is 'token', password is the API key from ibmcloud resource service-keys for the service instance.)
  • Bearer authorization with bearer token. (This token can be either API key or JWT token obtained from IAM upon login to IBM Cloud. Use ibmcloud iam oauth-tokens to retrieve the token after ibmcloud login)
  • X-Auth-Token header to be set to the API key. This header is deprecated.

Administration API endpoint


Administration API endpoint is the kafka_admin_url property in the service key for the service instance. This command can be used to retrieve this property.

$ibmcloud resource service-key "${service_instance_key_name}" --output json > jq -r '.[]|.credentials.kafka_admin_url'

Environment Setup

In the examples you must set and export environment variables as follows:

  • Either the API_KEY or BEARER_TOKEN to use for authentication.
  • KAFKA_ADMIN_URL to point to your Event Streams administration endpoint.

In addition, the Content-type header has to be set to application/json.

Common HTTP status codes:

  • 200: Request succeeded.
  • 202: Request was accepted.
  • 400: Invalid request JSON.
  • 401: The authentication header is not set or provided information is not valid.
  • 403: Not authorized to perform the operation. Usually it means the API key used is missing a certain role. More details on what role can perform what operation refers to this document.
  • 404: Unable to find the topic with topic name given by user.
  • 422: Semantically invalid request.
  • 503: An error occurred handling the request.

Error responses carry a JSON body like the following:

{"error_code":50301,"message":"Unknown Kafka Error", "incident_id": "17afe715-0ff5-4c49-9acc-a4204244a331"}

Error codes are of the format HHHKK where HHH is the HTTP Status Code and KK is the Kafka protocol error.

For E2E debugging purposes, the transaction ID of every request is returned in the HTTP header X-Global-Transaction-Id. If the header is set on the request, it will be honored. If not, it will be generated. In the event of a non-200 error return code, the transaction ID is also returned in the JSON error response as incident_id.

Using the REST API to administer Kafka topics


To run the example :-

Compile the code.

go build -o example

Or simply

make build

Set the required environment variables

# Set your API KEY (or a bearer token could be used by setting the BEARER_TOKEN environment variable instead, but not both)
export API_KEY="abc123456789"

# Set the Admin Endpoint to point to your cluster.
export KAFKA_ADMIN_URL="https://xyzclustername.svc01.region.eventstreams.test.cloud.ibm.com"

Run the example

./example 

REST API


The following sections explain how the REST API works with examples.

Code Setup

// Code Setup
import (
	"fmt"
	"net/http"
	"os"

	"github.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
	"github.com/IBM/go-sdk-core/v5/core"
)

// End Code Setup

Authentication


Use one of the following methods to authenticate:

  • To authenticate using Basic Auth: Place these values into the Authorization header of the HTTP request in the form Basic where is the username and password joined by a single colon : base64 encoded. For example:

    echo -n "token:<APIKEY>" | base64
  • To authenticate using a bearer token: To obtain your token using the IBM Cloud CLI, first log in to IBM Cloud, then run the following command:

    ibmcloud iam oauth-tokens
    

    Place this token in the Authorization header of the HTTP request in the form Bearer. Both API key or JWT tokens are supported.

  • To authenticate directly using the api_key: Place the key directly as the value of the X-Auth-Token HTTP header.

Example

Here's an example of how to create the authenticator using either an API key or a BEARER_TOKEN

	// Create Authenticator
	var authenticator core.Authenticator

	if apiKey != "" {
		var err error
		// Create an Basic IAM authenticator.
		authenticator, err = core.NewBasicAuthenticator("token", apiKey)
		if err != nil {
			fmt.Printf("failed to create new basic authenticator: %s\n", err.Error())
			os.Exit(1)
		}
	} else {
		var err error
		// Create an IAM Bearer Token authenticator.
		authenticator, err = core.NewBearerTokenAuthenticator(bearerToken)
		if err != nil {
			fmt.Printf("failed to create new bearer token authenticator: %s\n", err.Error())
			os.Exit(1)
		}
	}
	// End Authenticator

Creating a client for the Admin REST API.


Create a new service object.

	// Create Service
	serviceAPI, serviceErr := adminrestv1.NewAdminrestV1(&adminrestv1.AdminrestV1Options{
		URL:           URL,
		Authenticator: authenticator,
	})
	// End Create Service

Creating a Kafka topic


To create a Kafka topic the admin REST SDK issues a POST request to the /admin/topics path. The body of the request contains a JSON document, for example:

{
    "name": "topicname",
    "partitions": 1,
    "configs": {
        "retentionMs": 86400000,
        "cleanupPolicy": "delete"
    }
}

The only required field is name. The partitions fields defaults to 1 if not set.

Expected HTTP status codes:

  • 202: Topic creation request was accepted.
  • 400: Invalid request JSON.
  • 403: Not authorized to create topic.
  • 422: Semantically invalid request.

If the request to create a Kafka topic succeeds then HTTP status code 202 (Accepted) is returned. If the operation fails then a HTTP status code of 422 (Un-processable Entity) is returned, and a JSON object containing additional information about the failure is returned as the body of the response.

Example

func createTopic(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the createTopicOptionsModel.
	createTopicOptionsModel := new(adminrestv1.CreateTopicOptions)
	createTopicOptionsModel.Name = core.StringPtr("test-topic")
	createTopicOptionsModel.PartitionCount = core.Int64Ptr(int64(1))

	// Create the Topic.
	response, operationErr := serviceAPI.CreateTopic(createTopicOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Creating Topics: %s\n", operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("Error Creating Topic: status %d\n", response.StatusCode)
	}

	fmt.Printf("\tname: %s created\n", *createTopicOptionsModel.Name)

	return nil
} 

Deleting a Kafka topic


To delete a Kafka topic, the admin REST SDK issues a DELETE request to the /admin/topics/TOPICNAME path (where TOPICNAME is the name of the Kafka topic that you want to delete).

Expected return codes:

  • 202: Topic deletion request was accepted.
  • 403: Not authorized to delete topic.
  • 404: Topic does not exist.
  • 422: Semantically invalid request.

A 202 (Accepted) status code is returned if the REST API accepts the delete request or status code 422 (Un-processable Entity) if the delete request is rejected. If a delete request is rejected then the body of the HTTP response will contain a JSON object which provides additional information about why the request was rejected.

Kafka deletes topics asynchronously. Deleted topics may still appear in the response to a list topics request for a short period of time after the completion of a REST request to delete the topic.

Example

func deleteTopic(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the DeleteTopicOptions model
	deleteTopicOptionsModel := new(adminrestv1.DeleteTopicOptions)
	deleteTopicOptionsModel.TopicName = core.StringPtr("test-topic")

	// Delete Topic
	response, operationErr := serviceAPI.DeleteTopic(deleteTopicOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Deleting Topic: %s\n", operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("Error Deleting Topic: status %d\n", response.StatusCode)
	}

	fmt.Printf("\tname: %s deleted\n", *deleteTopicOptionsModel.TopicName)
	return nil
} 

Listing Kafka topics


You can list all of your Kafka topics by issuing a GET request to the /admin/topics path.

Expected status codes:

  • 200: the topic list is returned as JSON in the following format:
[
  {
    "name": "topic1",
    "partitions": 1,
    "retentionMs": 86400000,
    "cleanupPolicy": "delete"
  },
  { "name": "topic2",
    "partitions": 2,
    "retentionMs": 86400000,
    "cleanupPolicy": "delete"
  }
]

A successful response will have HTTP status code 200 (OK) and contain an array of JSON objects, where each object represents a Kafka topic and has the following properties:

Property name Description
name The name of the Kafka topic.
partitions The number of partitions of the Kafka topic.
retentionsMs The retention period for messages on the topic (in ms).
cleanupPolicy The cleanup policy of the Kafka topic.

Example

func listTopics(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the ListTopicsOptions model
	listTopicsOptionsModel := new(adminrestv1.ListTopicsOptions)

	// Call ListTopics.
	result, response, operationErr := serviceAPI.ListTopics(listTopicsOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Listing Topics" + operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusOK {
		return fmt.Errorf("Error Listing Topics: status %d\n", response.StatusCode)
	}

	// Loop and print topics.
	for _, topicDetail := range result {
		fmt.Printf("\tname: %s\n", *topicDetail.Name)
	}
	return nil
} 

Getting a Kafka topic


To get a Kafka topic detail information, issue a GET request to the /admin/topics/TOPICNAME path (where TOPICNAME is the name of the Kafka topic that you want to get).

Expected status codes

  • 200: Retrieve topic details successfully in following format:
{
  "name": "MYTOPIC",
  "partitions": 1,
  "replicationFactor": 3,
  "retentionMs": 86400000,
  "cleanupPolicy": "delete",
  "configs": {
    "cleanup.policy": "delete",
    "min.insync.replicas": "2",
    "retention.bytes": "1073741824",
    "retention.ms": "86400000",
    "segment.bytes": "536870912"
  },
  "replicaAssignments": [
    {
      "id": 0,
      "brokers": {
        "replicas": [
          3,
          2,
          4
        ]
      }
    }
  ]
}
  • 403: Not authorized.
  • 404: Topic does not exist.

Example

func topicDetails(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the GetTopicOptions model
	getTopicOptionsModel := new(adminrestv1.GetTopicOptions)
	getTopicOptionsModel.TopicName = core.StringPtr("test-topic")

	// Call List Topic Details.
	result, response, operationErr := serviceAPI.GetTopic(getTopicOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Listing Topic Details" + operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusOK {
		return fmt.Errorf("Error Listing Topic Details: status %d\n", response.StatusCode)
	}

	// Print topics details.
	fmt.Printf("\tname: \t\t\t%s\n", *result.Name)

	// The number of partitions.
	fmt.Printf("\tno of partitions: \t%d\n", *result.Partitions)

	// The number of replication factor.
	fmt.Printf("\treplication factor: \t%d\n", *result.ReplicationFactor)

	// // The value of config property 'retention.ms'.
	fmt.Printf("\tretention (ms): \t%d\n", *result.RetentionMs)

	// // The value of config property 'cleanup.policy'.
	fmt.Printf("\tcleanup policy: \t%s\n", *result.CleanupPolicy)

	// Configs *TopicConfigs
	fmt.Printf("\ttopic configs: \t\t%+v\n", *result.Configs)

	// The replia assignment of the topic.
	// ReplicaAssignments []ReplicaAssignment
	for _, assignment := range result.ReplicaAssignments {
		fmt.Printf("\tassignment:  \t\tid:%d,  \tbrokers: %+v\n", assignment.ID, assignment.Brokers)
	}

	return nil

} 

Updating Kafka topic's configuration


To increase a Kafka topic's partition number or to update a Kafka topic's configuration, issue a PATCH request to /admin/topics/TOPICNAME with the following body: (where TOPICNAME is the name of the Kafka topic that you want to update).

{
  "new_total_partition_count": 4,
  "configs": [
    {
      "name": "cleanup.policy",
      "value": "compact"
    }
  ]
}

Supported configuration keys are 'cleanup.policy', 'retention.ms', 'retention.bytes', 'segment.bytes', 'segment.ms', 'segment.index.bytes'. And partition number can only be increased, not decreased.

Expected status codes

  • 202: Update topic request was accepted.
  • 400: Invalid request JSON/number of partitions is invalid.
  • 404: Topic specified does not exist.
  • 422: Semantically invalid request.

Example

func updateTopicDetails(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the UpdateTopicOptions model
	updateTopicOptionsModel := new(adminrestv1.UpdateTopicOptions)
	updateTopicOptionsModel.TopicName = core.StringPtr("test-topic")
	updateTopicOptionsModel.NewTotalPartitionCount = core.Int64Ptr(int64(6))

	// Invoke operation with valid options model.
	response, operationErr := serviceAPI.UpdateTopic(updateTopicOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Updating Topic: %s\n", operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("Error Updating Topics: status %d\n", response.StatusCode)
	}

	fmt.Printf("\tname: %s updated\n", *updateTopicOptionsModel.TopicName)

	return nil
} 

List current mirroring topic selection

Mirroring user controls are only available on the target cluster in a mirroring environment.

To get the current topic selection, issue an GET request to /admin/mirroring/topic-selection

Expected status codes

  • 200: Retrieved topic selection successfully in following format:
{
  "includes": [
    "^prefix1_.*",
    "^prefix2_.*"
  ]
}
  • 403: Unauthorized to use mirroring user controls.
  • 404: Mirroring not enabled. The mirroring user control APIs are only available on the target cluster of a mirrored pair.
  • 503: An error occurred handling the request.

Example

func listMirroringTopicSelection(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the GetMirroringTopicSelectionOptions model
	getMirroringTopicSelectionOptionsModel := new(adminrestv1.GetMirroringTopicSelectionOptions)

	// Call GetMirroringTopicSelection.
	result, response, operationErr := serviceAPI.GetMirroringTopicSelection(getMirroringTopicSelectionOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Listing Mirroring Topics: %s\n", operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("Error Listing Mirroring Topics: status %d\n", response.StatusCode)
	}

	// Loop and print mirroring topics.
	for _, topicName := range result.Includes {
		fmt.Printf("\tname: %s\n", topicName)
	}

	return nil
} 

Replace selection of topics which are mirrored

Replace mirroring topic selection

Mirroring user controls are available on the target cluster in a mirroring environment.

To replace the current topic selection, issue a POST request to /admin/mirroring/topic-selection

Expected status codes

  • 200: Replaced topic selection successfully. The new selection is returned in following format:
{
  "includes": [
    "^prefix1_.*",
    "^prefix2_.*"
  ]
}
  • 400: Invalid request. The request data cannot be parsed and used to replace the topic selection.
  • 403: Unauthorized to use mirroring user controls.
  • 404: Mirroring not enabled. The mirroring user control APIs are only available on the target cluster of a mirrored pair.
  • 415: Unsupported media type. Content-Type header with application/json is required.
  • 503: An error occurred handling the request.

Example

func replaceMirroringTopicSelection(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the ReplaceMirroringTopicSelectionOptions model
	replaceMirroringTopicSelectionOptionsModel := new(adminrestv1.ReplaceMirroringTopicSelectionOptions)
	replaceMirroringTopicSelectionOptionsModel.Includes = []string{"test-topic"}

	// Invoke operation with valid options model.
	result, response, operationErr := serviceAPI.ReplaceMirroringTopicSelection(replaceMirroringTopicSelectionOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Replacing Mirroring Topics: %s\n", operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("Error Replacing Mirroring Topics: status %d\n", response.StatusCode)
	}

	// Loop and print mirroring topics.
	for _, topicName := range result.Includes {
		fmt.Printf("\ttopic added: %s\n", topicName)
	}

	return nil
} 

List active mirroring topics


Mirroring user controls are available on the target cluster in a mirroring environment.

To get the list of currently mirrored topics, issue an GET request to /admin/mirroring/active-topics

Expected status codes

  • 200: Retrieved active topics successfully in following format:
{
  "active_topics": [
    "topic1",
    "topic2"
  ]
}
  • 403: Unauthorized to use mirroring user controls.
  • 404: Mirroring not enabled. The mirroring user control APIs are only available on the target cluster of a mirrored pair.
  • 503: An error occurred handling the request.

Example

func getMirroringActiveTopics(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the GetMirroringActiveTopicsOptions model
	getMirroringActiveTopicsOptionsModel := new(adminrestv1.GetMirroringActiveTopicsOptions)

	// Call GetMirroringActiveTopics.
	result, response, operationErr := serviceAPI.GetMirroringActiveTopics(getMirroringActiveTopicsOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("Error Listing Active Mirroring Topics: %s\n", operationErr.Error())
	}

	// Check the result.
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("Error Listing Active Mirroring Topics: status %d\n", response.StatusCode)
	}

	// Loop and print mirroring topics.
	for _, topicName := range result.ActiveTopics {
		fmt.Printf("\tname: %s\n", topicName)
	}

	return nil
} 

Creating a Kafka quota


To create a Kafka quota the admin REST SDK issues a POST request to the /admin/quotas/ENTITYNAME path (where ENTITYNAME is the name of the entity that you want to create. The entity name of the quota can be default or an IAM Service ID that starts with an iam-ServiceId prefix). The body of the request contains a JSON document, for example:

{
    "producer_byte_rate": 1024,
    "consumer_byte_rate": 1024
}

Create Quota would create either 1 or 2 quotas depending on what data is passed in.

Expected HTTP status codes:

  • 201: Quota creation request was created.
  • 400: Invalid request JSON.
  • 403: Not authorized to create quota.
  • 422: Semantically invalid request.

If the request to create a Kafka quota succeeds then HTTP status code 201 (Created) is returned. If the operation fails then a HTTP status code of 422 (Un-processable Entity) is returned, and a JSON object containing additional information about the failure is returned as the body of the response.

Example

func createQuota(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the createQuotaOptionsModel
	createQuotaOptionsModel := new(adminrestv1.CreateQuotaOptions)
	createQuotaOptionsModel.SetEntityName("iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890af")
	createQuotaOptionsModel.SetProducerByteRate(1024)
	createQuotaOptionsModel.SetConsumerByteRate(1024)

	// Create Quota
	response, operationErr := serviceAPI.CreateQuota(createQuotaOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("error creating quota: %s", operationErr.Error())
	}

	// Check the result
	if response.StatusCode != http.StatusCreated {
		return fmt.Errorf("error creating quota with status %d", response.StatusCode)
	}

	fmt.Printf("\n\tquota for the entity '%s' has been created", *createQuotaOptionsModel.EntityName)

	return nil
} 

Deleting a Kafka quota


To delete a Kafka quota, the admin REST SDK issues a DELETE request to the /admin/quotas/ENTITYNAME path (where ENTITYNAME is the name of the entity that you want to delete. The entity name of the quota can be default or an IAM Service ID that starts with an iam-ServiceId prefix).

Expected return codes:

  • 202: Quota deletion request was accepted.
  • 403: Not authorized to delete quota.
  • 404: Entity Quota does not exist.
  • 422: Semantically invalid request.

A 202 (Accepted) status code is returned if the REST API accepts the delete request or status code 422 (Un-processable Entity) if the delete request is rejected. If a delete request is rejected then the body of the HTTP response will contain a JSON object which provides additional information about why the request was rejected.

Example

func deleteQuota(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the DeleteQuotaOptions model
	deleteQuotaOptionsModel := new(adminrestv1.DeleteQuotaOptions)
	deleteQuotaOptionsModel.EntityName = core.StringPtr("iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890af")

	// Delete Quotas
	response, operationErr := serviceAPI.DeleteQuota(deleteQuotaOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("error deleting quota: %s", operationErr.Error())
	}

	// Check the result
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("error deleting quota with status %d", response.StatusCode)
	}

	fmt.Printf("\n\tquota for the entity '%s' has been deleted", *deleteQuotaOptionsModel.EntityName)
	return nil
} 

Listing Kafka quotas


You can list all of your Kafka quotas by issuing a GET request to the /admin/quotas path.

Expected status codes:

  • 200: quotas list is returned as JSON in the following format:
{
  "data": [
    {
      "entity_name": "default",
      "producer_byte_rate": 1024,
      "consumer_byte_rate": 1024
    },
    {
      "entity_name": "iam-ServiceId-38288dac-1f80-46dd-b135-a56153296bcd",
      "producer_byte_rate": 1024
    },
    {
      "entity_name": "iam-ServiceId-38288dac-1f80-46dd-b135-e56153296fgh",
      "consumer_byte_rate": 2048
    },
    {
      "entity_name": "iam-ServiceId-38288dac-1f80-46dd-b135-f56153296bfa",
      "producer_byte_rate": 2048,
      "consumer_byte_rate": 1024
    }
  ]
}

A successful response will have HTTP status code 200 (OK) and contain an array of JSON objects, where each object represents a Kafka quota and has the following properties:

Property name Description
entity_name The entity name of the quota can be default or an IAM Service ID that starts with an iam-ServiceId prefix.
producer_byte_rate The producer byte rate quota value.
consumer_byte_rate The consumer byte rate quota value.

Example

func listQuotas(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the ListQuotasOptions model
	listQuotasOptionsModel := new(adminrestv1.ListQuotasOptions)

	// Call ListQuotas
	result, response, operationErr := serviceAPI.ListQuotas(listQuotasOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("error listing quotas: %s" + operationErr.Error())
	}

	// Check the result
	if response.StatusCode != http.StatusOK {
		return fmt.Errorf("error listing quotas with status %d", response.StatusCode)
	}

	// Loop and print quota
	for _, quota := range result.Data {
		fmt.Printf("\n\tentity_name: %s, %s", *quota.EntityName, prepareQuotaDetails(quota.ProducerByteRate, quota.ConsumerByteRate))
	}
	return nil
} 

Getting a Kafka quota


To get a Kafka quota detail information, issue a GET request to the /admin/quotas/ENTITYNAME path (where ENTITYNAME is the name of the entity that you want to get. The entity name of the quota can be default or an IAM Service ID that starts with an iam-ServiceId prefix).

Expected status codes

  • 200: Retrieve quota details successfully in following format:
{
  "producer_byte_rate": 1024,
  "consumer_byte_rate": 1024
}
  • 403: Not authorized.

Example

func getQuota(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the GetQuotaOptions model
	getQuotaOptionsModel := new(adminrestv1.GetQuotaOptions)
	getQuotaOptionsModel.EntityName = core.StringPtr("iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890af")

	// Call Get Quota
	quota, response, operationErr := serviceAPI.GetQuota(getQuotaOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("error getting quota: %s" + operationErr.Error())
	}

	// Check the result
	if response.StatusCode != http.StatusOK {
		return fmt.Errorf("error getting quota with status %d", response.StatusCode)
	}

	fmt.Print("\n\t" + prepareQuotaDetails(quota.ProducerByteRate, quota.ConsumerByteRate))
	return nil

} 

Updating Kafka quota's information


To Update an entity's quota, issue a PATCH request to /admin/quotas/ENTITYNAME with the following body: (where ENTITYNAME is the name of the entity that you want to update. The entity name of the quota can be default or an IAM Service ID that starts with an iam-ServiceId prefix).

{
  "producer_byte_rate": 2048,
  "consumer_byte_rate": 2048
}

Expected status codes

  • 202: Update quota request was accepted.
  • 400: Invalid request JSON.
  • 404: Entity quota specified does not exist.
  • 422: Semantically invalid request.

Example

func updateQuota(serviceAPI *adminrestv1.AdminrestV1) error {
	// Construct an instance of the updateQuotaOptionsModel
	updateQuotaOptionsModel := new(adminrestv1.UpdateQuotaOptions)
	updateQuotaOptionsModel.SetEntityName("iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890af")
	updateQuotaOptionsModel.SetProducerByteRate(2048)
	updateQuotaOptionsModel.SetConsumerByteRate(2048)

	// Update Quota
	response, operationErr := serviceAPI.UpdateQuota(updateQuotaOptionsModel)
	if operationErr != nil {
		return fmt.Errorf("error updating quota: %s", operationErr.Error())
	}

	// Check the result
	if response.StatusCode != http.StatusAccepted {
		return fmt.Errorf("error updating quota with status %d", response.StatusCode)
	}

	fmt.Printf("\n\tquota for the entity '%s' has been updated", *updateQuotaOptionsModel.EntityName)

	return nil
} 

About

Event Streams SDK allows you to integrate your application with Event Streams RESTful APIs.

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Languages