Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small fixes and refactors #266

Merged
merged 12 commits into from
Jun 6, 2024
6 changes: 0 additions & 6 deletions _examples/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ func (client *Client) handleReconnect(addr string) {
client.infolog.Println("attempting to connect")

conn, err := client.connect(addr)

if err != nil {
client.errlog.Println("failed to connect. Retrying...")

Expand All @@ -201,7 +200,6 @@ func (client *Client) handleReconnect(addr string) {
// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)

if err != nil {
return nil, err
}
Expand All @@ -220,7 +218,6 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
client.m.Unlock()

err := client.init(conn)

if err != nil {
client.errlog.Println("failed to initialize channel, retrying...")

Expand Down Expand Up @@ -250,13 +247,11 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
ch, err := conn.Channel()

if err != nil {
return err
}

err = ch.Confirm(false)

if err != nil {
return err
}
Expand All @@ -268,7 +263,6 @@ func (client *Client) init(conn *amqp.Connection) error {
false, // No-wait
nil, // Arguments
)

if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion _examples/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package main

import (
"flag"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"os"
"os/signal"
"syscall"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion _examples/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"crypto/sha1"
"flag"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"io"
"log"
"os"

amqp "github.com/rabbitmq/amqp091-go"
)

var url = flag.String("url", "amqp:///", "AMQP url for both the publisher and subscriber")
Expand Down
1 change: 0 additions & 1 deletion allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestAllocatorShouldNotReuseEarly(t *testing.T) {
if want, got := first, third; want != got {
t.Fatalf("expected third allocation to be %d, got: %d", want, got)
}

}

func TestAllocatorReleasesKeepUpWithAllocationsForAllSizes(t *testing.T) {
Expand Down
4 changes: 1 addition & 3 deletions auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (auth *AMQPlainAuth) Response() string {
}

// ExternalAuth for RabbitMQ-auth-mechanism-ssl.
type ExternalAuth struct {
}
type ExternalAuth struct{}

// Mechanism returns "EXTERNAL"
func (*ExternalAuth) Mechanism() string {
Expand All @@ -70,7 +69,6 @@ func (*ExternalAuth) Response() string {

// Finds the first mechanism preferred by the client that the server supports.
func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) {

for _, auth = range client {
for _, mech := range serverMechanisms {
if auth.Mechanism() == mech {
Expand Down
2 changes: 1 addition & 1 deletion channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ it must be redelivered or dropped.

See also Delivery.Nack
*/
func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
func (ch *Channel) Nack(tag uint64, multiple, requeue bool) error {
ch.m.Lock()
defer ch.m.Unlock()

Expand Down
25 changes: 10 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ type server struct {
tune connectionTuneOk
}

var defaultLogin = "guest"
var defaultPassword = "guest"
var defaultPlainAuth = &PlainAuth{defaultLogin, defaultPassword}
var defaultAMQPlainAuth = &AMQPlainAuth{defaultLogin, defaultPassword}
var (
defaultLogin = "guest"
defaultPassword = "guest"
defaultPlainAuth = &PlainAuth{defaultLogin, defaultPassword}
defaultAMQPlainAuth = &AMQPlainAuth{defaultLogin, defaultPassword}
)

func defaultConfigWithAuth(auth Authentication) Config {
return Config{
Expand All @@ -48,6 +50,8 @@ func amqplainConfig() Config {
}

func newServer(t *testing.T, serverIO, clientIO io.ReadWriteCloser) *server {
t.Helper()

return &server{
T: t,
r: reader{serverIO},
Expand All @@ -58,6 +62,8 @@ func newServer(t *testing.T, serverIO, clientIO io.ReadWriteCloser) *server {
}

func newSession(t *testing.T) (io.ReadWriteCloser, *server) {
t.Helper()

rs, wc := io.Pipe()
rc, ws := io.Pipe()

Expand Down Expand Up @@ -228,7 +234,6 @@ func TestDefaultClientProperties(t *testing.T) {

go func() {
srv.connectionOpen()

}()

if c, err := Open(rwc, defaultConfig()); err != nil {
Expand All @@ -246,7 +251,6 @@ func TestDefaultClientProperties(t *testing.T) {
if want, got := defaultLocale, srv.start.Locale; want != got {
t.Errorf("expected locale %s got: %s", want, got)
}

}

func TestCustomClientProperties(t *testing.T) {
Expand All @@ -261,7 +265,6 @@ func TestCustomClientProperties(t *testing.T) {

go func() {
srv.connectionOpen()

}()

if c, err := Open(rwc, config); err != nil {
Expand All @@ -275,15 +278,13 @@ func TestCustomClientProperties(t *testing.T) {
if want, got := config.Properties["version"], srv.start.ClientProperties["version"]; want != got {
t.Errorf("expected version %s got: %s", want, got)
}

}

func TestOpen(t *testing.T) {
rwc, srv := newSession(t)
t.Cleanup(func() { rwc.Close() })
go func() {
srv.connectionOpen()

}()

if c, err := Open(rwc, defaultConfig()); err != nil {
Expand Down Expand Up @@ -333,7 +334,6 @@ func TestChannelOpen(t *testing.T) {
go func() {
srv.connectionOpen()
srv.channelOpen(1)

}()

c, err := Open(rwc, defaultConfig())
Expand All @@ -345,7 +345,6 @@ func TestChannelOpen(t *testing.T) {
if err != nil {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}

}

func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
Expand All @@ -361,7 +360,6 @@ func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
if err != ErrSASL {
t.Fatalf("expected ErrSASL got: %+v on %+v", err, c)
}

}

func TestOpenAMQPlainAuth(t *testing.T) {
Expand Down Expand Up @@ -393,7 +391,6 @@ func TestOpenAMQPlainAuth(t *testing.T) {
if table["PASSWORD"] != defaultPassword {
t.Fatalf("unexpected password: want: %s, got: %s", defaultPassword, table["PASSWORD"])
}

}

func TestOpenFailedCredentials(t *testing.T) {
Expand All @@ -410,7 +407,6 @@ func TestOpenFailedCredentials(t *testing.T) {
if err != ErrCredentials {
t.Fatalf("expected ErrCredentials got: %+v on %+v", err, c)
}

}

func TestOpenFailedVhost(t *testing.T) {
Expand Down Expand Up @@ -527,7 +523,6 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
t.Fatalf("failed ack, expected ack#%d to be %d, got %d", i, tag, ack.DeliveryTag)
}
}

}

func TestDeferredConfirmations(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,6 @@ func (c *Connection) dispatch0(f frame) {
return
case c.rpc <- m:
}

}
case *heartbeatFrame:
// kthx - all reads reset our deadline. so we can drop this
Expand Down Expand Up @@ -755,7 +754,6 @@ func (c *Connection) reader(r io.Reader) {

for {
frame, err := frames.ReadFrame()

if err != nil {
c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
return
Expand Down
2 changes: 2 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ func TestConnection_Close_WhenMemoryAlarmIsActive(t *testing.T) {
}

func rabbitmqctl(t *testing.T, args ...string) error {
t.Helper()

rabbitmqctlPath, found := os.LookupEnv(rabbitmqctlEnvKey)
if !found {
t.Skipf("variable for %s for rabbitmqctl not found, skipping", rabbitmqctlEnvKey)
Expand Down
2 changes: 1 addition & 1 deletion consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
defer close(out)
defer subs.Done()

var inflight = in
inflight := in
var queue []*Delivery

for delivery := range in {
Expand Down
2 changes: 1 addition & 1 deletion delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized")
// Applications can provide mock implementations in tests of Delivery handlers.
type Acknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple bool, requeue bool) error
Nack(tag uint64, multiple, requeue bool) error
Reject(tag uint64, requeue bool) error
}

Expand Down
6 changes: 0 additions & 6 deletions example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func (client *Client) handleReconnect(addr string) {
client.logger.Println("Attempting to connect")

conn, err := client.connect(addr)

if err != nil {
client.logger.Println("Failed to connect. Retrying...")

Expand All @@ -195,7 +194,6 @@ func (client *Client) handleReconnect(addr string) {
// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)

if err != nil {
return nil, err
}
Expand All @@ -214,7 +212,6 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
client.m.Unlock()

err := client.init(conn)

if err != nil {
client.logger.Println("Failed to initialize channel. Retrying...")

Expand Down Expand Up @@ -244,13 +241,11 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
ch, err := conn.Channel()

if err != nil {
return err
}

err = ch.Confirm(false)

if err != nil {
return err
}
Expand All @@ -262,7 +257,6 @@ func (client *Client) init(conn *amqp.Connection) error {
false, // No-wait
nil, // Arguments
)

if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func ExampleChannel_Confirm_bridge() {
// And the body
Body: msg.Body,
})

if err != nil {
if e := msg.Nack(false, false); e != nil {
log.Printf("nack error: %+v", e)
Expand Down
Loading
Loading