Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions pkg/ctl/topic/set_replication_clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func SetReplicationClustersCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set the replication clusters for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
setReplication := cmdutils.Example{
Desc: "Set the replication clusters for a topic",
Command: "pulsarctl topics set-replication-clusters tenant/namespace/topic --clusters cluster1,cluster2",
}
examples = append(examples, setReplication)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set the replication clusters for [topic] successfully",
}

noTopicName := cmdutils.Output{
Desc: "you must specify a tenant/namespace/topic name, please check if the tenant/namespace/topic name is provided",
Out: "[✖] the topic name is not specified or the topic name is specified more than one",
}

tenantNotExistError := cmdutils.Output{
Desc: "the tenant does not exist",
Out: "[✖] code: 404 reason: Tenant does not exist",
}

nsNotExistError := cmdutils.Output{
Desc: "the namespace does not exist",
Out: "[✖] code: 404 reason: Namespace (tenant/namespace) does not exist",
}

out = append(out, successOut, noTopicName, tenantNotExistError, nsNotExistError)
desc.CommandOutput = out

vc.SetDescription(
"set-replication-clusters",
"Set the replication clusters for a topic",
desc.ToString(),
desc.ExampleToString(),
"set-replication-clusters",
)

var clusters []string

vc.FlagSetGroup.InFlagSet("Set replication clusters", func(flagSet *pflag.FlagSet) {
flagSet.StringSliceVarP(&clusters, "clusters", "c", nil,
"Replication cluster ids.")
_ = cobra.MarkFlagRequired(flagSet, "clusters")
})
vc.EnableOutputFlagSet()

vc.SetRunFuncWithNameArg(func() error {
return doSetReplicationClusters(vc, clusters)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doSetReplicationClusters(vc *cmdutils.VerbCmd, clusters []string) error {
topic := vc.NameArg

if len(clusters) == 0 {
return errors.New("clusters cannot be empty")
}

admin := cmdutils.NewPulsarClient()
topicName, err := utils.GetTopicName(topic)
if err != nil {
return err
}

err = admin.Topics().SetReplicationClusters(*topicName, clusters)
if err == nil {
vc.Command.Printf("Set the replication clusters successfully on [%s]\n", topic)
}

return err
}
48 changes: 48 additions & 0 deletions pkg/ctl/topic/set_replication_clusters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"fmt"
"testing"

"github.com/onsi/gomega"
"github.com/streamnative/pulsarctl/pkg/test"
)

func TestSetReplicationClustersCmd(t *testing.T) {
g := gomega.NewWithT(t)

topic := fmt.Sprintf("test-replication-clusters-topic-%s", test.RandomSuffix())

args := []string{"create", topic, "0"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
g.Expect(execErr).Should(gomega.BeNil())

args = []string{"set-replication-clusters", topic, "--clusters", "standalone"}
out, execErr, nameErr, cmdErr := TestTopicCommands(SetReplicationClustersCmd, args)
g.Expect(execErr).Should(gomega.BeNil())
g.Expect(nameErr).Should(gomega.BeNil())
g.Expect(cmdErr).Should(gomega.BeNil())
g.Expect(out).ShouldNot(gomega.BeNil())
g.Expect(out.String()).ShouldNot(gomega.BeEmpty())

// Since there is no get-replication-clusters command in this PR, we only test the set command success.
// In a real scenario, we might want to verify using the client or adding a get command.
// The set command output verification implies the call was successful.
}
1 change: 1 addition & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
GetInactiveTopicCmd,
SetInactiveTopicCmd,
RemoveInactiveTopicCmd,
SetReplicationClustersCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
Loading