Skip to content

Commit 3cb2df8

Browse files
committed
do not remove publications of slot defined in manifest
1 parent 2a4be1c commit 3cb2df8

File tree

2 files changed

+22
-13
lines changed

2 files changed

+22
-13
lines changed

pkg/cluster/streams.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
114114
}
115115

116116
for slotName, slotAndPublication := range databaseSlotsList {
117-
tables := slotAndPublication.Publication
118-
tableNames := make([]string, len(tables))
117+
newTables := slotAndPublication.Publication
118+
tableNames := make([]string, len(newTables))
119119
i := 0
120-
for t := range tables {
120+
for t := range newTables {
121121
tableName, schemaName := getTableSchema(t)
122122
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
123123
i++
@@ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
126126
tableList := strings.Join(tableNames, ", ")
127127

128128
currentTables, exists := currentPublications[slotName]
129+
// if newTables is empty it means that it's definition was removed from streams section
130+
// but when slot is defined in manifest we should sync publications, too
131+
// by reusing current tables we make sure it is not
132+
if len(newTables) == 0 {
133+
tableList = currentTables
134+
}
129135
if !exists {
130136
createPublications[slotName] = tableList
131137
} else if currentTables != tableList {
@@ -351,15 +357,6 @@ func (c *Cluster) syncStreams() error {
351357
}
352358

353359
databaseSlots := make(map[string]map[string]zalandov1.Slot)
354-
slotsToSync := make(map[string]map[string]string)
355-
requiredPatroniConfig := c.Spec.Patroni
356-
357-
if len(requiredPatroniConfig.Slots) > 0 {
358-
for slotName, slotConfig := range requiredPatroniConfig.Slots {
359-
slotsToSync[slotName] = slotConfig
360-
}
361-
}
362-
363360
if err := c.initDbConn(); err != nil {
364361
return fmt.Errorf("could not init database connection")
365362
}
@@ -379,6 +376,18 @@ func (c *Cluster) syncStreams() error {
379376
}
380377
}
381378

379+
// need to take explicitly defined slots into account whey syncing Patroni config
380+
slotsToSync := make(map[string]map[string]string)
381+
requiredPatroniConfig := c.Spec.Patroni
382+
if len(requiredPatroniConfig.Slots) > 0 {
383+
for slotName, slotConfig := range requiredPatroniConfig.Slots {
384+
slotsToSync[slotName] = slotConfig
385+
if _, exists := databaseSlots[slotConfig["database"]]; exists {
386+
databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{Slot: slotConfig}
387+
}
388+
}
389+
}
390+
382391
// get list of required slots and publications, group by database
383392
for _, stream := range c.Spec.Streams {
384393
if _, exists := databaseSlots[stream.Database]; !exists {

pkg/cluster/util_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ func Test_trimCronjobName(t *testing.T) {
650650
}
651651
}
652652

653-
func TestisInMaintenanceWindow(t *testing.T) {
653+
func TestIsInMaintenanceWindow(t *testing.T) {
654654
now := time.Now()
655655
futureTimeStart := now.Add(1 * time.Hour)
656656
futureTimeStartFormatted := futureTimeStart.Format("15:04")

0 commit comments

Comments
 (0)