Skip to content

Commit

Permalink
fix tests, add limit option to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
vetcher committed Mar 22, 2018
1 parent 0bb248b commit 07d0921
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 72 deletions.
6 changes: 6 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func (c *Channel) consume(queueName string, cfg Consumer) (<-chan amqp.Delivery,
)
}

func (c *Channel) qos(count, size int) error {
c.callMx.Lock()
defer c.callMx.Unlock()
return c.channel.Qos(count, size, false)
}

func (c *Channel) declareExchange(exchange Exchange) error {
c.callMx.Lock()
defer c.callMx.Unlock()
Expand Down
22 changes: 11 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,16 @@ type Client struct {
bindings []Binding
conn *conn.Connection
observer *observer
//connector conn.Connector
logger logger.Logger
done func()
ctx context.Context
logger logger.Logger
done func()
ctx context.Context
}

func NewClient(connector conn.Connector, decls ...Declaration) (cl Client, err error) {
cl.constructorBefore(decls...)
ctx, done := context.WithCancel(context.Background())
cl.done = done
cl.ctx = ctx
//cl.connector = connector
cl.conn, err = connector()
if err != nil {
return
Expand Down Expand Up @@ -218,12 +216,14 @@ func (b Binding) declare(c *Client) {
}

type Consumer struct {
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqp.Table
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqp.Table
LimitCount int
LimitSize int
}

type Publish struct {
Expand Down
88 changes: 31 additions & 57 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package amqp

import (
"context"
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"sync"
"testing"
"time"
Expand All @@ -11,6 +14,8 @@ import (
"github.com/devimteam/amqp/logger"
)

var DEBUG = flag.Bool("debug", false, "Activate pprof")

const testExchangeName = "amqp-client-test"

type X struct {
Expand Down Expand Up @@ -42,8 +47,9 @@ func (s *XStorage) Consume(val int) {
func (s *XStorage) Check() bool {
s.m.Lock()
defer s.m.Unlock()
for _, v := range s.storage {
for k, v := range s.storage {
if v < 0 || v > 0 {
fmt.Println(k, ":", v)
return true
}
}
Expand All @@ -54,6 +60,17 @@ func (s *XStorage) Error() string {
return fmt.Sprint(s.storage)
}

func TestMain(t *testing.M) {
flag.Parse()
if *DEBUG {
go func() {
fmt.Println("serving profiler")
fmt.Println(http.ListenAndServe(":6060", nil))
}()
}
t.Run()
}

func TestNewClient(t *testing.T) {
ch := make(chan []interface{})
store := NewXStorage(1)
Expand Down Expand Up @@ -89,37 +106,12 @@ func TestNewClient2(t *testing.T) {
}
}

/*
func TestHighLoad(t *testing.T) {
ch := make(chan []interface{})
store := NewXStorage(8)
queuecfg := DefaultQueueConfig()
queuecfg.AutoDelete = true
go listenAndPrintlnSuff("recon", ch)
cl1, err := NewClient("amqp://localhost:5672",
WithOptions(
SetMessageIdBuilder(CommonMessageIdBuilder),
),
WithConnOptions(
conn.WithLogger(logger.NewChanLogger(ch)),
),
SetQueueConfig(queuecfg),
)
if err != nil {
t.Fatal(err)
}
cl2, err := NewClient("amqp://localhost:5672",
WithOptions(
SetMessageIdBuilder(CommonMessageIdBuilder),
),
WithConnOptions(
conn.WithLogger(logger.NewChanLogger(ch)),
),
SetQueueConfig(queuecfg),
)
if err != nil {
t.Fatal(err)
}
cl1 := initClient(t, TemporaryExchange(testExchangeName))
cl2 := initClient(t, TemporaryExchange(testExchangeName))
var wg sync.WaitGroup
wg.Add(12)
{
Expand Down Expand Up @@ -156,14 +148,8 @@ func TestHighLoad(t *testing.T) {
func TestLong(t *testing.T) {
ch := make(chan []interface{})
store := NewXStorage(1)
queuecfg := DefaultQueueConfig()
queuecfg.AutoDelete = true
go listenAndPrintlnSuff("recon", ch)
cl, err := NewClient("amqp://localhost:5672", WithConnOptions(conn.WithLogger(logger.NewChanLogger(ch))), SetQueueConfig(queuecfg),
WithOptions(DebugLogger(logger.NewChanLogger(ch))))
if err != nil {
t.Fatal(err)
}
cl := initClient(t, TemporaryExchange(testExchangeName))
go subFunc("sub", cl, store)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -178,38 +164,26 @@ func TestLong(t *testing.T) {

func TestLimits(t *testing.T) {
ch := make(chan []interface{})
store := NewXStorage(1)
queuecfg := DefaultQueueConfig()
queuecfg.Name = "aaaa"
store := NewXStorage(2)
go listenAndPrintlnSuff("recon", ch)
cl, err := NewClient("amqp://localhost:5672",
WithConnOptions(conn.WithLogger(logger.NewChanLogger(ch))), SetQueueConfig(queuecfg),
WithObserverOptions(LimitCount(5)),
WithOptions(DebugLogger(logger.NewChanLogger(ch))))
if err != nil {
t.Fatal(err)
}
go fatSubFunc("sub", cl, store)
go fatSubFunc("sub2", cl, store)
go fatSubFunc("sub3", cl, store)
go fatSubFunc("sub4", cl, store)
go fatSubFunc("sub5", cl, store)
cl := initClient(t, TemporaryExchange(testExchangeName))
go fatSubFunc("sub", cl, store, SubscriberLogger(logger.NewChanLogger(ch)))
go fatSubFunc("sub2", cl, store, SubscriberLogger(logger.NewChanLogger(ch)))
var wg sync.WaitGroup
wg.Add(1)
go pubFuncGroup("c1p1", 0, 100, cl, time.Millisecond, &wg, store)
go pubFuncGroup("c1p1", 0, 30, cl, time.Millisecond, &wg, store)
wg.Wait()
time.Sleep(time.Second * 25)
time.Sleep(time.Second * 35)
if store.Check() {
t.Fatal(store.Error())
}
}
*/

func subFunc(prefix string, client Client, storage *XStorage) { //, options ...ClientConfig) {
ch := make(chan []interface{})
go listenAndPrintln(ch)
s := client.Subscriber(SubscriberLogger(logger.NewChanLogger(ch)))
events := s.SubscribeToExchange(context.Background(), testExchangeName, X{}, Consumer{})
//events, _ := client.Subscriber()testExchangeName, X{}, append(options, WithOptions(AllLoggers(logger.NewChanLogger(ch))))...)
for ev := range events {
fmt.Println(prefix, "event data: ", ev.Data)
storage.Consume(ev.Data.(*X).Num)
Expand All @@ -218,11 +192,11 @@ func subFunc(prefix string, client Client, storage *XStorage) { //, options ...C
fmt.Println("end of events")
}

func fatSubFunc(prefix string, client Client, storage *XStorage) { //, options ...ClientConfig) {
func fatSubFunc(prefix string, client Client, storage *XStorage, options ...SubscriberOption) {
ch := make(chan []interface{})
go listenAndPrintln(ch)
s := client.Subscriber()
events := s.SubscribeToExchange(context.Background(), testExchangeName, X{}, Consumer{})
s := client.Subscriber(options...)
events := s.SubscribeToExchange(context.Background(), testExchangeName, X{}, Consumer{LimitCount: 5})
for ev := range events {
fmt.Println(prefix, "event data: ", ev.Data)
storage.Consume(ev.Data.(*X).Num)
Expand Down
10 changes: 6 additions & 4 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (s Subscriber) SubscribeToExchange(ctx context.Context, exchangeName string

func (s Subscriber) listen(ctx context.Context, channel *Channel, exchangeName, queueName string, dataType interface{}, eventChan chan<- Event, cfg Consumer) {
for {
fmt.Println("listen: select")
select {
case <-ctx.Done():
if channel != nil {
Expand All @@ -57,19 +56,16 @@ func (s Subscriber) listen(ctx context.Context, channel *Channel, exchangeName,
return
default:
if s.opts.wait.flag {
fmt.Println("listen: wait")
err := s.conn.NotifyConnected(s.opts.wait.timeout)
if err != nil {
s.opts.log.Log(err)
}
}
fmt.Println("listen: prepare")
deliveryCh, err := s.prepareDeliveryChan(channel, exchangeName, queueName, cfg)
if err != nil {
s.opts.log.Log(err)
continue
}
fmt.Println("listen: pool")
s.workersPool(ctx, queueName, deliveryCh, dataType, eventChan)
}
}
Expand Down Expand Up @@ -99,6 +95,12 @@ func (s Subscriber) prepareDeliveryChan(
return nil, WrapError("bind", queue.Name, "to", exchangeName, err)
}
}
if cfg.LimitCount > 0 || cfg.LimitSize > 0 {
err := channel.qos(cfg.LimitCount, cfg.LimitSize)
if err != nil {
return nil, WrapError("channel qos err", err)
}
}
ch, err := channel.consume(queueName, cfg)
if err != nil {
return nil, WrapError("channel consume err", err)
Expand Down

0 comments on commit 07d0921

Please sign in to comment.