Skip to content

Commit 8497978

Browse files
authored
Fix ordering of publishing (#140)
* Fix ordering of publishing * fix suite test startup * More topics updates * Update test deps * Fix deps
1 parent 5ed59f7 commit 8497978

File tree

9 files changed

+286
-68
lines changed

9 files changed

+286
-68
lines changed

coherence/topics.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
pb1 "github.com/oracle/coherence-go-client/v2/proto/v1"
2020
"google.golang.org/protobuf/types/known/anypb"
2121
"google.golang.org/protobuf/types/known/wrapperspb"
22+
"math/rand/v2"
2223
"strings"
2324
"sync"
2425
)
@@ -161,8 +162,6 @@ func GetNamedTopic[V any](ctx context.Context, session *Session, topicName strin
161162
return nil, getExistingError("NamedTopic", topicName)
162163
}
163164

164-
// check any topic options
165-
166165
session.debug("using existing NamedTopic: %v", existing)
167166
return existing, nil
168167
}
@@ -190,6 +189,7 @@ type topicPublisher[V any] struct {
190189
proxyID int32
191190
publisherID int64
192191
channelCount int32
192+
defaultOrderingSeed int32
193193
options *publisher.Options
194194
valueSerializer Serializer[V]
195195
mutex sync.RWMutex
@@ -213,7 +213,12 @@ func (tp *topicPublisher[V]) Publish(ctx context.Context, value V) (*publisher.P
213213
return nil, ErrPublisherClosed
214214
}
215215

216-
publishChannel := tp.ensureTopicChannel()
216+
var publishChannel = tp.defaultOrderingSeed // use defaultOrderingSeed as default
217+
218+
if tp.defaultOrderingSeed == -1 {
219+
// use the hash from ordering option
220+
publishChannel = tp.ensureTopicChannel()
221+
}
217222

218223
binValue, err := tp.valueSerializer.Serialize(value)
219224
if err != nil {
@@ -327,16 +332,26 @@ func (bt *baseTopicsClient[V]) setReleased() {
327332

328333
func newPublisher[V any](session *Session, bt *baseTopicsClient[V], result *publisher.EnsurePublisherResult, topicName string, options *publisher.Options) (Publisher[V], error) {
329334
tp := &topicPublisher[V]{
330-
namedTopic: bt,
331-
publisherID: result.PublisherID,
332-
session: session,
333-
options: options,
334-
valueSerializer: NewSerializer[V](session.sessOpts.Format),
335-
topicName: topicName,
336-
proxyID: result.ProxyID,
337-
channelCount: result.ChannelCount,
338-
isClosed: false,
335+
namedTopic: bt,
336+
publisherID: result.PublisherID,
337+
session: session,
338+
options: options,
339+
valueSerializer: NewSerializer[V](session.sessOpts.Format),
340+
topicName: topicName,
341+
proxyID: result.ProxyID,
342+
defaultOrderingSeed: -1,
343+
channelCount: result.ChannelCount,
344+
isClosed: false,
345+
}
346+
347+
ordering := options.GetOrdering()
348+
if _, ok := ordering.(*publisher.OrderByDefault); ok {
349+
// set the defaultOrderSeed to a non -1 value which means to use this number for the channel
350+
// hash all the time.
351+
// #nosec G404 -- math/rand is fine here for non-security use
352+
tp.defaultOrderingSeed = rand.Int32() % tp.channelCount
339353
}
354+
340355
session.mapMutex.Lock()
341356
defer session.mapMutex.Unlock()
342357
session.publishers[result.PublisherID] = tp

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ toolchain go1.23.7
1111

1212
require (
1313
github.com/google/uuid v1.6.0
14-
golang.org/x/text v0.26.0
14+
golang.org/x/text v0.28.0
1515
google.golang.org/grpc v1.73.0
1616
google.golang.org/protobuf v1.36.6
1717
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
2424
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
2525
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
2626
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
27-
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
28-
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
27+
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
28+
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
2929
google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a h1:v2PbRU4K3llS09c7zodFpNePeamkAwG3mPrAery9VeE=
3030
google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
3131
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=

java/coherence-go-test/src/main/java/com/oracle/coherence/go/testing/RestServer.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,31 @@
1616
import java.util.HashSet;
1717
import java.util.Map;
1818
import java.util.Set;
19-
import java.util.concurrent.ExecutionException;
20-
import java.util.logging.Logger;
19+
import java.util.concurrent.CancellationException;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
2123
import java.util.stream.Collectors;
2224

23-
import com.tangosol.net.CacheFactory;
24-
import com.tangosol.net.Cluster;
25-
import com.tangosol.net.Coherence;
26-
import com.tangosol.net.CoherenceConfiguration;
27-
import com.tangosol.net.DefaultCacheServer;
28-
import com.tangosol.net.NamedCache;
25+
import com.tangosol.net.*;
2926

3027
import com.sun.net.httpserver.HttpExchange;
3128
import com.sun.net.httpserver.HttpServer;
32-
import com.tangosol.net.NamedMap;
33-
import com.tangosol.net.SessionConfiguration;
29+
3430
import com.tangosol.net.management.MBeanServerProxy;
31+
3532
import com.tangosol.net.topic.NamedTopic;
33+
import com.tangosol.net.topic.Publisher;
34+
import com.tangosol.net.topic.Subscriber;
35+
36+
import static com.tangosol.net.topic.Subscriber.Name.inGroup;
37+
import static com.tangosol.util.Base.log;
3638

3739
/**
3840
* A simple Http server that is deployed into a Coherence cluster
3941
* and can be used to perform various tests.
4042
*
41-
* @author jk 2019.08.09
43+
* @author jk 2019.08.09
4244
* @author tam 2022.02.08
4345
*/
4446
public class RestServer {
@@ -74,6 +76,7 @@ public static void main(String[] args) {
7476
server.createContext("/isIsReadyPresent", RestServer::isIsReadyPresent);
7577
server.createContext("/populateQueue", RestServer::populateQueue);
7678
server.createContext("/destroyTopic", RestServer::destroyTopic);
79+
server.createContext("/createCustomerTopic", RestServer::createCustomerTopic);
7780

7881
server.setExecutor(null); // creates a default executor
7982
server.start();
@@ -143,6 +146,45 @@ private static void destroyTopic(HttpExchange t) throws IOException {
143146
send(t, 200, "OK");
144147
}
145148

149+
private static void createCustomerTopic(HttpExchange t) throws IOException {
150+
try {
151+
URI uri = t.getRequestURI();
152+
String path = uri.getPath();
153+
String[] pathComponents = path.split("/");
154+
155+
if (pathComponents.length < 4 || !pathComponents[pathComponents.length - 3].equals("createCustomerTopic")) {
156+
t.sendResponseHeaders(400, -1); // Bad Request
157+
return;
158+
}
159+
160+
String topicName = pathComponents[pathComponents.length - 2];
161+
int count = Integer.parseInt(pathComponents[pathComponents.length - 1]);
162+
163+
Coherence coherence = Coherence.getInstance();
164+
if (coherence == null) {
165+
coherence = Coherence.clusterMember().start().get();
166+
}
167+
Session session = coherence.getSession();
168+
169+
NamedTopic<Customer> topic = session.getTopic(topicName);
170+
Publisher<Customer> publisher = topic.createPublisher();
171+
172+
for (int i = 0; i < count; i++) {
173+
Customer customer = new Customer(i, "name-" + i, getAddress(i), getAddress(i), Customer.GOLD, 1000* i);
174+
publisher.publish(customer).join();
175+
}
176+
publisher.close();
177+
} catch (Exception e) {
178+
e.printStackTrace();
179+
send(t, 404, "Error: " + e.getMessage());
180+
}
181+
send(t, 200, "OK");
182+
}
183+
184+
private static Address getAddress(int id) {
185+
return new Address("address-line-1-" + id, "address-line-2-" + id, "Suburb-" + id, "City-" + id, "State-" + id, id);
186+
}
187+
146188
private static void ready(HttpExchange t) throws IOException {
147189
send(t, 200, "OK");
148190
}

test/e2e/standalone/java_object_test.go

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,6 @@ import (
1313
"testing"
1414
)
1515

16-
type Customer struct {
17-
Class string `json:"@class"`
18-
ID int `json:"id"`
19-
CustomerName string `json:"customerName"`
20-
HomeAddress CustomerAddress `json:"homeAddress"`
21-
PostalAddress CustomerAddress `json:"postalAddress"`
22-
CustomerType string `json:"customerType"`
23-
OutstandingBalance float32 `json:"outstandingBalance"`
24-
}
25-
26-
type CustomerAddress struct {
27-
Class string `json:"@class"`
28-
AddressLine1 string `json:"addressLine1"`
29-
AddressLine2 string `json:"addressLine2"`
30-
Suburb string `json:"suburb"`
31-
City string `json:"city"`
32-
State string `json:"state"`
33-
PostCode int `json:"postCode"`
34-
}
35-
3616
// TestBasicOperationsAgainstMapAndCache runs all tests against NamedMap and NamedCache
3717
func TestJavaSerializationAgainstMapAndCache(t *testing.T) {
3818
g := gomega.NewWithT(t)
@@ -42,11 +22,11 @@ func TestJavaSerializationAgainstMapAndCache(t *testing.T) {
4222

4323
testCases := []struct {
4424
testName string
45-
nameMap coherence.NamedMap[int, Customer]
46-
test func(t *testing.T, namedCache coherence.NamedMap[int, Customer])
25+
nameMap coherence.NamedMap[int, utils.Customer]
26+
test func(t *testing.T, namedCache coherence.NamedMap[int, utils.Customer])
4727
}{
48-
{"NamedMapSerializationTest", GetNamedMap[int, Customer](g, session, "customer-map"), RunSerializationTest},
49-
{"NamedCacheSerializationTest", GetNamedCache[int, Customer](g, session, "customer-cache"), RunSerializationTest},
28+
{"NamedMapSerializationTest", GetNamedMap[int, utils.Customer](g, session, "customer-map"), RunSerializationTest},
29+
{"NamedCacheSerializationTest", GetNamedCache[int, utils.Customer](g, session, "customer-cache"), RunSerializationTest},
5030
}
5131
for _, tc := range testCases {
5232
t.Run(tc.testName, func(t *testing.T) {
@@ -55,10 +35,10 @@ func TestJavaSerializationAgainstMapAndCache(t *testing.T) {
5535
}
5636
}
5737

58-
func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Customer]) {
38+
func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, utils.Customer]) {
5939
var (
6040
g = gomega.NewWithT(t)
61-
result *Customer
41+
result *utils.Customer
6242
err error
6343
)
6444

@@ -71,7 +51,7 @@ func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Custome
7151
err = namedMap.Clear(ctx)
7252
g.Expect(err).ShouldNot(gomega.HaveOccurred())
7353

74-
homeAddress := CustomerAddress{
54+
homeAddress := utils.CustomerAddress{
7555
Class: addressClass,
7656
AddressLine1: "123 James Street",
7757
Suburb: "Balcatta",
@@ -80,7 +60,7 @@ func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Custome
8060
PostCode: 6000,
8161
}
8262

83-
postalAddress := CustomerAddress{
63+
postalAddress := utils.CustomerAddress{
8464
Class: addressClass,
8565
AddressLine1: "PO Box 1000",
8666
AddressLine2: "Balcatta Post Office",
@@ -90,7 +70,7 @@ func RunSerializationTest(t *testing.T, namedMap coherence.NamedMap[int, Custome
9070
PostCode: 6000,
9171
}
9272

93-
customer := Customer{
73+
customer := utils.Customer{
9474
Class: customerClass,
9575
ID: 1,
9676
CustomerName: "Tim",

0 commit comments

Comments
 (0)