50
50
import java .util .concurrent .Executor ;
51
51
import java .util .concurrent .ScheduledExecutorService ;
52
52
import java .util .concurrent .TimeUnit ;
53
+ import java .util .concurrent .atomic .AtomicInteger ;
53
54
import java .util .concurrent .atomic .AtomicReference ;
54
55
import java .util .function .BiConsumer ;
55
56
import java .util .function .Consumer ;
85
86
import org .whispersystems .textsecuregcm .securevaluerecovery .SecureValueRecoveryClient ;
86
87
import org .whispersystems .textsecuregcm .util .ExceptionUtils ;
87
88
import org .whispersystems .textsecuregcm .util .Pair ;
89
+ import org .whispersystems .textsecuregcm .util .RegistrationIdValidator ;
88
90
import org .whispersystems .textsecuregcm .util .SystemMapper ;
89
91
import org .whispersystems .textsecuregcm .util .Util ;
90
92
import reactor .core .publisher .Flux ;
@@ -111,6 +113,8 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
111
113
private static final String DELETE_COUNTER_NAME = name (AccountsManager .class , "deleteCounter" );
112
114
private static final String COUNTRY_CODE_TAG_NAME = "country" ;
113
115
private static final String DELETION_REASON_TAG_NAME = "reason" ;
116
+ private static final String TIMESTAMP_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME = name (AccountsManager .class , "timestampRedisKeyCounter" );
117
+ private static final String REGISTRATION_ID_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME = name (AccountsManager .class ,"registrationIdRedisKeyCounter" );
114
118
115
119
private static final Logger logger = LoggerFactory .getLogger (AccountsManager .class );
116
120
@@ -140,7 +144,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
140
144
private final Map <String , CompletableFuture <Optional <DeviceInfo >>> waitForDeviceFuturesByTokenIdentifier =
141
145
new ConcurrentHashMap <>();
142
146
143
- private final Map <TimestampedDeviceIdentifier , CompletableFuture <Optional <TransferArchiveResult >>> waitForTransferArchiveFuturesByDeviceIdentifier =
147
+ private final Map <DeviceIdentifier , CompletableFuture <Optional <TransferArchiveResult >>> waitForTransferArchiveFuturesByDeviceIdentifier =
144
148
new ConcurrentHashMap <>();
145
149
146
150
private final Map <String , CompletableFuture <Optional <RestoreAccountRequest >>> waitForRestoreAccountRequestFuturesByToken =
@@ -155,6 +159,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
155
159
private static final Duration RECENTLY_ADDED_TRANSFER_ARCHIVE_TTL = Duration .ofHours (1 );
156
160
private static final String TRANSFER_ARCHIVE_PREFIX = "transfer_archive::" ;
157
161
private static final String TRANSFER_ARCHIVE_KEYSPACE_PATTERN = "__keyspace@0__:" + TRANSFER_ARCHIVE_PREFIX + "*" ;
162
+ private static final String TRANSFER_ARCHIVE_REGISTRATION_ID_PATTERN = "registrationId" ;
158
163
159
164
private static final Duration RESTORE_ACCOUNT_REQUEST_TTL = Duration .ofHours (1 );
160
165
private static final String RESTORE_ACCOUNT_REQUEST_PREFIX = "restore_account::" ;
@@ -194,7 +199,14 @@ public enum DeletionReason {
194
199
}
195
200
}
196
201
197
- private record TimestampedDeviceIdentifier (UUID accountIdentifier , byte deviceId , Instant deviceCreationTimestamp ) {
202
+ private interface DeviceIdentifier {}
203
+
204
+ private record TimestampDeviceIdentifier (UUID accountIdentifier , byte deviceId , Instant deviceCreationTimestamp )
205
+ implements DeviceIdentifier {
206
+ }
207
+
208
+ private record RegistrationIdDeviceIdentifier (UUID accountIdentifier , byte deviceId ,
209
+ int registrationId ) implements DeviceIdentifier {
198
210
}
199
211
200
212
public AccountsManager (final Accounts accounts ,
@@ -1509,34 +1521,66 @@ private static String getLinkedDeviceKey(final String linkDeviceTokenIdentifier)
1509
1521
}
1510
1522
1511
1523
public CompletableFuture <Optional <TransferArchiveResult >> waitForTransferArchive (final Account account , final Device device , final Duration timeout ) {
1512
- final TimestampedDeviceIdentifier deviceIdentifier =
1513
- new TimestampedDeviceIdentifier (account .getIdentifier (IdentityType .ACI ),
1514
- device .getId (),
1515
- Instant .ofEpochMilli (device .getCreated ()));
1516
-
1517
- return waitForPubSubKey (waitForTransferArchiveFuturesByDeviceIdentifier ,
1518
- deviceIdentifier ,
1519
- getTransferArchiveKey (account .getIdentifier (IdentityType .ACI ), device .getId (), Instant .ofEpochMilli (device .getCreated ())),
1524
+ final DeviceIdentifier timestampDeviceIdentifier = new TimestampDeviceIdentifier (account .getIdentifier (IdentityType .ACI ), device .getId (), Instant .ofEpochMilli (device .getCreated ()));
1525
+ final String timestampTransferArchiveKey = getTimestampTransferArchiveKey (account .getIdentifier (IdentityType .ACI ), device .getId (), Instant .ofEpochMilli (device .getCreated ()));
1526
+
1527
+ final DeviceIdentifier registrationIdDeviceIdentifier = new RegistrationIdDeviceIdentifier (account .getIdentifier (IdentityType .ACI ), device .getId (), device .getRegistrationId (IdentityType .ACI ));
1528
+ final String registrationIdTransferArchiveKey = getRegistrationIdTransferArchiveKey (account .getIdentifier (IdentityType .ACI ), device .getId (), device .getRegistrationId (IdentityType .ACI ));
1529
+
1530
+ final CompletableFuture <Optional <TransferArchiveResult >> timestampFuture = waitForPubSubKey (waitForTransferArchiveFuturesByDeviceIdentifier ,
1531
+ timestampDeviceIdentifier ,
1532
+ timestampTransferArchiveKey ,
1520
1533
timeout ,
1521
1534
this ::handleTransferArchiveAdded );
1535
+
1536
+ final CompletableFuture <Optional <TransferArchiveResult >> registrationIdFuture = waitForPubSubKey (waitForTransferArchiveFuturesByDeviceIdentifier ,
1537
+ registrationIdDeviceIdentifier ,
1538
+ registrationIdTransferArchiveKey ,
1539
+ timeout ,
1540
+ this ::handleTransferArchiveAdded );
1541
+ return firstSuccessfulTransferArchiveFuture (List .of (timestampFuture , registrationIdFuture ));
1542
+ }
1543
+
1544
+ @ VisibleForTesting
1545
+ static CompletableFuture <Optional <TransferArchiveResult >> firstSuccessfulTransferArchiveFuture (
1546
+ final List <CompletableFuture <Optional <TransferArchiveResult >>> futures ) {
1547
+ final CompletableFuture <Optional <TransferArchiveResult >> result = new CompletableFuture <>();
1548
+ final AtomicInteger remaining = new AtomicInteger (futures .size ());
1549
+
1550
+ for (CompletableFuture <Optional <TransferArchiveResult >> future : futures ) {
1551
+ future .whenComplete ((value , _ ) -> {
1552
+ if (value .isPresent ()) {
1553
+ result .complete (value );
1554
+ } else if (remaining .decrementAndGet () == 0 ) {
1555
+ result .complete (Optional .empty ());
1556
+ }
1557
+ });
1558
+ }
1559
+
1560
+ return result ;
1522
1561
}
1523
1562
1524
1563
public CompletableFuture <Void > recordTransferArchiveUpload (final Account account ,
1525
1564
final byte destinationDeviceId ,
1526
- final Instant destinationDeviceCreationTimestamp ,
1565
+ @ SuppressWarnings ("OptionalUsedAsFieldOrParameterType" ) final Optional <Instant > destinationDeviceCreationTimestamp ,
1566
+ @ SuppressWarnings ("OptionalUsedAsFieldOrParameterType" ) final Optional <Integer > maybeRegistrationId ,
1527
1567
final TransferArchiveResult transferArchiveResult ) {
1528
-
1529
- final String key = getTransferArchiveKey (account .getIdentifier (IdentityType .ACI ),
1530
- destinationDeviceId ,
1531
- destinationDeviceCreationTimestamp );
1532
-
1533
1568
try {
1534
1569
final String transferArchiveJson = SystemMapper .jsonMapper ().writeValueAsString (transferArchiveResult );
1535
1570
1536
- return pubSubRedisClient .withConnection (connection ->
1537
- connection .async ().set (key , transferArchiveJson , SetArgs .Builder .ex (RECENTLY_ADDED_TRANSFER_ARCHIVE_TTL )))
1538
- .thenRun (Util .NOOP )
1539
- .toCompletableFuture ();
1571
+ return pubSubRedisClient .withConnection (connection -> {
1572
+ final String key = destinationDeviceCreationTimestamp
1573
+ .map (timestamp -> getTimestampTransferArchiveKey (account .getIdentifier (IdentityType .ACI ), destinationDeviceId , timestamp ))
1574
+ .orElseGet (() -> maybeRegistrationId
1575
+ .map (registrationId -> getRegistrationIdTransferArchiveKey (account .getIdentifier (IdentityType .ACI ), destinationDeviceId , registrationId ))
1576
+ // We validate the request object so this should never happen
1577
+ .orElseThrow (() -> new AssertionError ("No creation timestamp or registration ID provided" )));
1578
+
1579
+ return connection .async ()
1580
+ .set (key , transferArchiveJson , SetArgs .Builder .ex (RECENTLY_ADDED_TRANSFER_ARCHIVE_TTL ))
1581
+ .thenRun (Util .NOOP )
1582
+ .toCompletableFuture ();
1583
+ });
1540
1584
} catch (final JsonProcessingException e ) {
1541
1585
// This should never happen for well-defined objects we control
1542
1586
throw new UncheckedIOException (e );
@@ -1552,15 +1596,27 @@ private void handleTransferArchiveAdded(final CompletableFuture<Optional<Transfe
1552
1596
}
1553
1597
}
1554
1598
1555
- private static String getTransferArchiveKey (final UUID accountIdentifier ,
1599
+ private static String getTimestampTransferArchiveKey (final UUID accountIdentifier ,
1556
1600
final byte destinationDeviceId ,
1557
1601
final Instant destinationDeviceCreationTimestamp ) {
1602
+ Metrics .counter (TIMESTAMP_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME ).increment ();
1558
1603
1559
1604
return TRANSFER_ARCHIVE_PREFIX + accountIdentifier .toString () +
1560
1605
":" + destinationDeviceId +
1561
1606
":" + destinationDeviceCreationTimestamp .toEpochMilli ();
1562
1607
}
1563
1608
1609
+ private static String getRegistrationIdTransferArchiveKey (final UUID accountIdentifier ,
1610
+ final byte destinationDeviceId ,
1611
+ final int registrationId ) {
1612
+ Metrics .counter (REGISTRATION_ID_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME ).increment ();
1613
+
1614
+ return TRANSFER_ARCHIVE_PREFIX + accountIdentifier .toString () +
1615
+ ":" + destinationDeviceId +
1616
+ ":" + TRANSFER_ARCHIVE_REGISTRATION_ID_PATTERN +
1617
+ ":" + registrationId ;
1618
+ }
1619
+
1564
1620
public CompletableFuture <Optional <RestoreAccountRequest >> waitForRestoreAccountRequest (final String token , final Duration timeout ) {
1565
1621
return waitForPubSubKey (waitForRestoreAccountRequestFuturesByToken ,
1566
1622
token ,
@@ -1648,23 +1704,36 @@ public void message(final String pattern, final String channel, final String mes
1648
1704
} else if (TRANSFER_ARCHIVE_KEYSPACE_PATTERN .equals (pattern ) && "set" .equalsIgnoreCase (message )) {
1649
1705
// The `- 1` here compensates for the '*' in the pattern
1650
1706
final String [] deviceIdentifierComponents =
1651
- channel .substring (TRANSFER_ARCHIVE_KEYSPACE_PATTERN .length () - 1 ).split (":" , 3 );
1707
+ channel .substring (TRANSFER_ARCHIVE_KEYSPACE_PATTERN .length () - 1 ).split (":" , 4 );
1652
1708
1653
- if (deviceIdentifierComponents .length != 3 ) {
1654
- logger .error ("Could not parse timestamped device identifier; unexpected component count" );
1709
+ if (deviceIdentifierComponents .length != 3 && deviceIdentifierComponents . length != 4 ) {
1710
+ logger .error ("Could not parse device identifier; unexpected component count" );
1655
1711
return ;
1656
1712
}
1657
1713
1714
+ final DeviceIdentifier deviceIdentifier ;
1715
+ final String transferArchiveKey ;
1658
1716
try {
1659
- final TimestampedDeviceIdentifier deviceIdentifier ;
1660
- final String transferArchiveKey ;
1661
- {
1662
- final UUID accountIdentifier = UUID . fromString ( deviceIdentifierComponents [ 0 ]);
1663
- final byte deviceId = Byte . parseByte ( deviceIdentifierComponents [ 1 ]);
1717
+ final UUID accountIdentifier = UUID . fromString ( deviceIdentifierComponents [ 0 ]) ;
1718
+ final byte deviceId = Byte . parseByte ( deviceIdentifierComponents [ 1 ]) ;
1719
+
1720
+ if ( deviceIdentifierComponents . length == 3 ) {
1721
+ // Parse the old transfer archive Redis key format
1664
1722
final Instant deviceCreationTimestamp = Instant .ofEpochMilli (Long .parseLong (deviceIdentifierComponents [2 ]));
1665
1723
1666
- deviceIdentifier = new TimestampedDeviceIdentifier (accountIdentifier , deviceId , deviceCreationTimestamp );
1667
- transferArchiveKey = getTransferArchiveKey (accountIdentifier , deviceId , deviceCreationTimestamp );
1724
+ deviceIdentifier = new TimestampDeviceIdentifier (accountIdentifier , deviceId , deviceCreationTimestamp );
1725
+ transferArchiveKey = getTimestampTransferArchiveKey (accountIdentifier , deviceId , deviceCreationTimestamp );
1726
+ } else {
1727
+ final String maybeRegistrationIdPattern = deviceIdentifierComponents [2 ];
1728
+ if (!maybeRegistrationIdPattern .equals (TRANSFER_ARCHIVE_REGISTRATION_ID_PATTERN )) {
1729
+ throw new IllegalArgumentException ("Could not parse Redis key with pattern " + maybeRegistrationIdPattern );
1730
+ }
1731
+ final int registrationId = Integer .parseInt (deviceIdentifierComponents [3 ]);
1732
+ if (!RegistrationIdValidator .validRegistrationId (registrationId )) {
1733
+ throw new IllegalArgumentException ("Invalid registration ID: " + registrationId );
1734
+ }
1735
+ deviceIdentifier = new RegistrationIdDeviceIdentifier (accountIdentifier , deviceId , registrationId );
1736
+ transferArchiveKey = getRegistrationIdTransferArchiveKey (accountIdentifier , deviceId , registrationId );
1668
1737
}
1669
1738
1670
1739
Optional .ofNullable (waitForTransferArchiveFuturesByDeviceIdentifier .remove (deviceIdentifier ))
@@ -1677,7 +1746,7 @@ public void message(final String pattern, final String channel, final String mes
1677
1746
}
1678
1747
}));
1679
1748
} catch (final IllegalArgumentException e ) {
1680
- logger .error ("Could not parse timestamped device identifier" , e );
1749
+ logger .error ("Could not parse device identifier" , e );
1681
1750
}
1682
1751
} else if (RESTORE_ACCOUNT_REQUEST_KEYSPACE_PATTERN .equalsIgnoreCase (pattern ) && "set" .equalsIgnoreCase (message )) {
1683
1752
// The `- 1` here compensates for the '*' in the pattern
0 commit comments