Skip to content

Commit 1c6b5ff

Browse files
committed
wip
1 parent 6b77a29 commit 1c6b5ff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+404
-565
lines changed

outbox/outbox.go renamed to ap/inbox.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2024 Dima Krasner
2+
Copyright 2025 Dima Krasner
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -14,7 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package outbox handles user actions and translates them into outgoing activities.
18-
//
19-
// Outgoing activities are queued and delivered by [fed.Queue].
20-
package outbox
17+
package ap
18+
19+
import (
20+
"context"
21+
"database/sql"
22+
)
23+
24+
type Inbox interface {
25+
NewID(actorID, prefix string) (string, error)
26+
Unfollow(ctx context.Context, db *sql.DB, follower *Actor, followed, followID string) error
27+
}

cluster/deleter_test.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"strings"
2121
"testing"
2222

23-
"github.com/dimkr/tootik/outbox"
23+
"github.com/dimkr/tootik/inbox"
2424
)
2525

2626
func TestDeleter_OldData(t *testing.T) {
@@ -115,10 +115,8 @@ func TestDeleter_OldData(t *testing.T) {
115115
t.Fatal("Failed to set post #1 insertion time: no rows affected")
116116
}
117117

118-
deleter := outbox.Deleter{
119-
Domain: "b.localdomain",
120-
Config: cluster["b.localdomain"].Config,
121-
DB: cluster["b.localdomain"].DB,
118+
deleter := inbox.Deleter{
119+
Queue: cluster["b.localdomain"].Incoming,
122120
}
123121

124122
if err := deleter.Run(t.Context()); err != nil {
@@ -269,10 +267,8 @@ func TestDeleter_Disabled(t *testing.T) {
269267
Follow("Never").
270268
Contains(Line{Type: Text, Text: "Current setting: old posts are not deleted automatically."})
271269

272-
deleter := outbox.Deleter{
273-
Domain: "b.localdomain",
274-
Config: cluster["b.localdomain"].Config,
275-
DB: cluster["b.localdomain"].DB,
270+
deleter := inbox.Deleter{
271+
Queue: cluster["b.localdomain"].Incoming,
276272
}
277273

278274
if err := deleter.Run(t.Context()); err != nil {

cluster/followers_sync_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func TestCluster_FollowersSyncMissingRemoteFollow(t *testing.T) {
6767
DB: cluster["a.localdomain"].DB,
6868
Resolver: cluster["a.localdomain"].Resolver,
6969
Keys: cluster["a.localdomain"].NobodyKeys,
70+
Queue: cluster["a.localdomain"].Incoming,
7071
}
7172
if _, err := syncer.ProcessBatch(t.Context()); err != nil {
7273
t.Fatalf("Failed to synchronize followers: %v", err)
@@ -126,6 +127,7 @@ func TestCluster_FollowersSyncMissingLocalFollow(t *testing.T) {
126127
DB: cluster["a.localdomain"].DB,
127128
Resolver: cluster["a.localdomain"].Resolver,
128129
Keys: cluster["a.localdomain"].NobodyKeys,
130+
Queue: cluster["a.localdomain"].Incoming,
129131
}
130132
if _, err := syncer.ProcessBatch(t.Context()); err != nil {
131133
t.Fatalf("Failed to synchronize followers: %v", err)

cluster/followers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func TestCluster_PostToFollowers_Rejected(t *testing.T) {
145145
Follow("🐕 Followers").
146146
Follow("🔴 Reject").
147147
NotContains(Line{Type: Link, Text: "🟢 Accept", URL: "/users/followers/accept/a.localdomain/user/alice"}).
148-
Contains(Line{Type: Link, Text: "🔴 Reject", URL: "/users/followers/reject/a.localdomain/user/alice"}).
148+
NotContains(Line{Type: Link, Text: "🔴 Reject", URL: "/users/followers/reject/a.localdomain/user/alice"}).
149149
NotContains(Line{Type: Link, Text: "🟢 Accept", URL: "/users/followers/accept/a.localdomain/user/bob"}).
150150
NotContains(Line{Type: Link, Text: "🔴 Reject", URL: "/users/followers/reject/a.localdomain/user/bob"})
151151
cluster.Settle(t)

cluster/move_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package cluster
1919
import (
2020
"testing"
2121

22-
"github.com/dimkr/tootik/outbox"
22+
"github.com/dimkr/tootik/inbox"
2323
)
2424

2525
func TestCluster_MovedAccount(t *testing.T) {
@@ -53,11 +53,8 @@ func TestCluster_MovedAccount(t *testing.T) {
5353

5454
bob.FollowInput("🔭 View profile", "carol@c.localdomain").OK()
5555

56-
mover := outbox.Mover{
57-
Domain: "b.localdomain",
58-
DB: cluster["b.localdomain"].DB,
59-
Resolver: cluster["b.localdomain"].Resolver,
60-
Keys: cluster["b.localdomain"].NobodyKeys,
56+
mover := inbox.Mover{
57+
Queue: cluster["b.localdomain"].Incoming,
6158
}
6259
if err := mover.Run(t.Context()); err != nil {
6360
t.Fatalf("Failed to process moved accounts: %v", err)

cluster/poll_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package cluster
1919
import (
2020
"testing"
2121

22-
"github.com/dimkr/tootik/outbox"
22+
"github.com/dimkr/tootik/inbox"
2323
)
2424

2525
func TestCluster_Poll(t *testing.T) {
@@ -55,10 +55,8 @@ func TestCluster_Poll(t *testing.T) {
5555
OK()
5656
cluster.Settle(t)
5757

58-
poller := outbox.Poller{
59-
Domain: "b.localdomain",
60-
DB: cluster["b.localdomain"].DB,
61-
Config: cluster["b.localdomain"].Config,
58+
poller := inbox.Poller{
59+
Queue: cluster["b.localdomain"].Incoming,
6260
}
6361
if err := poller.Run(t.Context()); err != nil {
6462
t.Fatalf("Failed to process votes: %v", err)

cluster/server.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,15 @@ func NewServer(ctx context.Context, t *testing.T, domain string, client fed.Clie
122122
t.Fatalf("Failed to run create the nobody user: %v", err)
123123
}
124124

125-
handler, err := front.NewHandler(domain, false, &cfg, resolver, db)
125+
incoming := &inbox.Queue{
126+
Domain: domain,
127+
Config: &cfg,
128+
DB: db,
129+
Resolver: resolver,
130+
Keys: nobodyKeys,
131+
}
132+
133+
handler, err := front.NewHandler(domain, false, &cfg, resolver, db, incoming)
126134
if err != nil {
127135
t.Fatalf("Failed to run create a Handler: %v", err)
128136
}
@@ -172,14 +180,8 @@ func NewServer(ctx context.Context, t *testing.T, domain string, client fed.Clie
172180
DB: db,
173181
Handler: handler,
174182
},
175-
Backend: backend,
176-
Incoming: &inbox.Queue{
177-
Domain: domain,
178-
Config: &cfg,
179-
DB: db,
180-
Resolver: resolver,
181-
Keys: nobodyKeys,
182-
},
183+
Backend: backend,
184+
Incoming: incoming,
183185
Outgoing: &fed.Queue{
184186
Domain: domain,
185187
Config: &cfg,

cmd/tootik/main.go

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ import (
4949
"github.com/dimkr/tootik/icon"
5050
"github.com/dimkr/tootik/inbox"
5151
"github.com/dimkr/tootik/migrations"
52-
"github.com/dimkr/tootik/outbox"
5352
_ "github.com/mattn/go-sqlite3"
5453
)
5554

@@ -202,6 +201,15 @@ func main() {
202201
panic(err)
203202
}
204203

204+
queue := &inbox.Queue{
205+
Domain: *domain,
206+
Config: &cfg,
207+
BlockList: blockList,
208+
DB: db,
209+
Resolver: resolver,
210+
Keys: nobodyKeys,
211+
}
212+
205213
switch cmd {
206214
case "add-community":
207215
_, _, err := user.Create(ctx, *domain, db, flag.Arg(1), ap.Group, nil)
@@ -241,7 +249,7 @@ func main() {
241249
panic(err)
242250
}
243251

244-
if err := outbox.UpdateActor(ctx, *domain, tx, actorID); err != nil {
252+
if err := queue.UpdateActor(ctx, tx, actorID); err != nil {
245253
panic(err)
246254
}
247255

@@ -300,7 +308,7 @@ func main() {
300308
panic(err)
301309
}
302310

303-
if err := outbox.UpdateActor(ctx, *domain, tx, actorID); err != nil {
311+
if err := queue.UpdateActor(ctx, tx, actorID); err != nil {
304312
panic(err)
305313
}
306314

@@ -311,7 +319,7 @@ func main() {
311319
return
312320
}
313321

314-
handler, err := front.NewHandler(*domain, *closed, &cfg, resolver, db)
322+
handler, err := front.NewHandler(*domain, *closed, &cfg, resolver, db, queue)
315323
if err != nil {
316324
panic(err)
317325
}
@@ -393,14 +401,7 @@ func main() {
393401
}{
394402
{
395403
"incoming",
396-
&inbox.Queue{
397-
Domain: *domain,
398-
Config: &cfg,
399-
BlockList: blockList,
400-
DB: db,
401-
Resolver: resolver,
402-
Keys: nobodyKeys,
403-
},
404+
queue,
404405
},
405406
{
406407
"outgoing",
@@ -439,20 +440,15 @@ func main() {
439440
{
440441
"poller",
441442
pollResultsUpdateInterval,
442-
&outbox.Poller{
443-
Domain: *domain,
444-
Config: &cfg,
445-
DB: db,
443+
&inbox.Poller{
444+
Queue: queue,
446445
},
447446
},
448447
{
449448
"mover",
450449
followMoveInterval,
451-
&outbox.Mover{
452-
Domain: *domain,
453-
DB: db,
454-
Resolver: resolver,
455-
Keys: nobodyKeys,
450+
&inbox.Mover{
451+
Queue: queue,
456452
},
457453
},
458454
{
@@ -464,15 +460,14 @@ func main() {
464460
DB: db,
465461
Resolver: resolver,
466462
Keys: nobodyKeys,
463+
Queue: queue,
467464
},
468465
},
469466
{
470467
"deleter",
471468
deleterInterval,
472-
&outbox.Deleter{
473-
Domain: *domain,
474-
Config: &cfg,
475-
DB: db,
469+
&inbox.Deleter{
470+
Queue: queue,
476471
},
477472
},
478473
{

fed/deliver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ func (q *Queue) queueTasks(
489489
}
490490

491491
// if this is an activity by a portable actor, forward it to all gateways
492-
if ap.IsPortable(job.Sender.ID) && ap.Canonical(job.Sender.ID) == ap.Canonical(job.Activity.Actor) && len(job.Sender.Gateways) > 1 {
492+
if ap.IsPortable(job.Sender.ID) && (job.Activity.IsPublic() || ap.Canonical(job.Sender.ID) == ap.Canonical(job.Activity.Actor)) && len(job.Sender.Gateways) > 1 {
493493
for _, gw := range job.Sender.Gateways[1:] {
494494
slog.Info("Forwarding activity to gateway", "activity", job.Activity.ID, "sender", job.Sender.ID, "gateway", gw)
495495

fed/followers.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"github.com/dimkr/tootik/ap"
3535
"github.com/dimkr/tootik/cfg"
3636
"github.com/dimkr/tootik/httpsig"
37-
"github.com/dimkr/tootik/outbox"
3837
)
3938

4039
type partialFollowers map[string]map[string]string
@@ -45,12 +44,14 @@ type Syncer struct {
4544
DB *sql.DB
4645
Resolver *Resolver
4746
Keys [2]httpsig.Key
47+
Queue ap.Inbox
4848
}
4949

5050
type followersDigest struct {
5151
Followed string
5252
URL string
5353
Digest string
54+
Queue ap.Inbox
5455
}
5556

5657
var followersSyncRegex = regexp.MustCompile(`\b([^"=]+)="([^"]+)"`)
@@ -297,20 +298,26 @@ func (d *followersDigest) Sync(ctx context.Context, domain string, cfg *cfg.Conf
297298
continue
298299
}
299300

301+
var actor ap.Actor
302+
if err := db.QueryRowContext(ctx, `SELECT JSON(persons.actor) FROM persons WHERE id = ? AND persons.ed25519privkey IS NOT NULL`, follower).Scan(&actor); err != nil {
303+
slog.Warn("Failed to fetch actor of unknown remote follow", "followed", d.Followed, "follower", follower, "error", err)
304+
continue
305+
}
306+
300307
var followID string
301-
if err := db.QueryRowContext(ctx, `SELECT id FROM follows WHERE follower = ? AND followed = ?`, follower, d.Followed).Scan(&followID); err != nil && errors.Is(err, sql.ErrNoRows) {
302-
followID, err = outbox.NewID(d.Followed, domain, "follow")
308+
if err := db.QueryRowContext(ctx, `SELECT follows.id FROM follows WHERE follows.follower = ? AND follows.followed = ?`, follower, d.Followed).Scan(&followID); errors.Is(err, sql.ErrNoRows) {
309+
followID, err = d.Queue.NewID(d.Followed, "follow")
303310
if err != nil {
304311
slog.Warn("Failed to generate fake follow ID", "followed", d.Followed, "follower", follower, "error", err)
305312
continue
306313
}
307314
slog.Warn("Using fake follow ID to remove unknown remote follow", "followed", d.Followed, "follower", follower, "id", followID)
308315
} else if err != nil {
309-
slog.Warn("Failed to fetch follow ID of unknown remote follow", "followed", d.Followed, "follower", follower, "error", err)
316+
slog.Warn("Failed to fetch actor and follow ID of unknown remote follow", "followed", d.Followed, "follower", follower, "error", err)
310317
continue
311318
}
312319

313-
if err := outbox.Unfollow(ctx, domain, db, follower, d.Followed, followID); err != nil {
320+
if err := d.Queue.Unfollow(ctx, db, &actor, d.Followed, followID); err != nil {
314321
slog.Warn("Failed to remove remote follow", "followed", d.Followed, "follower", follower, "error", err)
315322
}
316323
}
@@ -336,7 +343,9 @@ func (s *Syncer) ProcessBatch(ctx context.Context) (int, error) {
336343
jobs := make([]followersDigest, 0, s.Config.FollowersSyncBatchSize)
337344

338345
for rows.Next() {
339-
var job followersDigest
346+
job := followersDigest{
347+
Queue: s.Queue,
348+
}
340349
if err := rows.Scan(&job.Followed, &job.URL, &job.Digest); err != nil {
341350
slog.Error("Failed to scan digest", "error", err)
342351
continue

0 commit comments

Comments
 (0)