-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
I am trying to understand how we could use notify dynamically created subscribers about messages being published. Reviewing previous issues (#785) I followed the advice and integrated a singe pubSub for the whole application. (by the way the ring client still requires a dummy channel, issue #459 only removed the dummy channel requirement for the normal redis client). What we have now is:
func (svc *StandardService) Start() error {
if !svc.isRunning() {
svc.cli = redis.NewClient(&redis.Options{
Addr: svc.Configuration.Address,
PoolSize: svc.Configuration.PoolSize,
IdleTimeout: 240 * time.Second,
})
err := svc.Ping()
if err != nil {
return errors.Wrap(err, "Failed to start Redis ClusterService")
}
if svc.ps == nil {
svc.ps = svc.cli.Subscribe(context.Background())
}
go svc.startListening()
svc.setRunning(true)
}
return nil
}
func (svc *StandardService) startListening() {
ch := svc.ps.Channel()
defer svc.ps.Close()
for msg := range ch {
consumers := getConsumers(msg.Channel)
for i, _ := range consumers {
err := consumers[i](msg.Channel, []byte(msg.Payload))
if err != nil {
print(err)
}
}
}
}
func subscribe(pubSub *redis.PubSub, ctx context.Context, subscription SubscriptionHandler, channels ...string) error {
err := pubSub.Subscribe(ctx, channels...)
if err != nil {
return err
}
for _, c := range channels {
subscribers[c] = append(subscribers[c], subscription)
}
return nil
}
// SubscriptionHandler is called for each new message.
type SubscriptionHandler func(channel string, data []byte) error
var subscribers = make(map[string][]SubscriptionHandler)
func getConsumers(c string) []SubscriptionHandler {
return subscribers[c]
}
However this does not seem like a great way, and has issues. First off we have a map that is prone to concurrent read/writes, this can be fixed by obviously we have performance implications. Second of all now we also have memory and GC implications due to a map that has ref types as both keys and values, with a large number of subscribers we would be putting a lot of pressure on the GC. Any suggestions on how we could solve this without the bottlenecks outlined?