Skip to content

Pub/Sub for Dynamic Consumers #1436

@DavidKmn

Description

@DavidKmn

I am trying to understand how we could use notify dynamically created subscribers about messages being published. Reviewing previous issues (#785) I followed the advice and integrated a singe pubSub for the whole application. (by the way the ring client still requires a dummy channel, issue #459 only removed the dummy channel requirement for the normal redis client). What we have now is:


func (svc *StandardService) Start() error {
	if !svc.isRunning() {
		svc.cli = redis.NewClient(&redis.Options{
			Addr:        svc.Configuration.Address,
			PoolSize:    svc.Configuration.PoolSize,
			IdleTimeout: 240 * time.Second,
		})
		err := svc.Ping()
		if err != nil {
			return errors.Wrap(err, "Failed to start Redis ClusterService")
		}
		if svc.ps == nil {
			svc.ps = svc.cli.Subscribe(context.Background())
		}
		go svc.startListening()
		svc.setRunning(true)
	}

	return nil
}

func (svc *StandardService) startListening() {
	ch := svc.ps.Channel()
	defer svc.ps.Close()
	for msg := range ch {
		consumers := getConsumers(msg.Channel)
		for i, _ := range consumers {
			err := consumers[i](msg.Channel, []byte(msg.Payload))
			if err != nil {
				print(err)
			}
		}
	}
}

func subscribe(pubSub *redis.PubSub, ctx context.Context, subscription SubscriptionHandler, channels ...string) error {
	err := pubSub.Subscribe(ctx, channels...)

	if err != nil {
		return err
	}

	for _, c := range channels {
		subscribers[c] = append(subscribers[c], subscription)
	}

	return nil
}

// SubscriptionHandler is called for each new message.
type SubscriptionHandler func(channel string, data []byte) error

var subscribers = make(map[string][]SubscriptionHandler)

func getConsumers(c string) []SubscriptionHandler {
	return subscribers[c]
}

However this does not seem like a great way, and has issues. First off we have a map that is prone to concurrent read/writes, this can be fixed by obviously we have performance implications. Second of all now we also have memory and GC implications due to a map that has ref types as both keys and values, with a large number of subscribers we would be putting a lot of pressure on the GC. Any suggestions on how we could solve this without the bottlenecks outlined?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions