forked from heroiclabs/nakama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore_friend.go
488 lines (431 loc) · 17.7 KB
/
core_friend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
// Copyright 2018 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/gofrs/uuid/v5"
"github.com/heroiclabs/nakama-common/api"
"github.com/heroiclabs/nakama-common/runtime"
"github.com/jackc/pgtype"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
type edgeListCursor struct {
// ID fields.
State int64
Position int64
}
// Only used to get all friend IDs for the console. NOTE: Not intended for use in client/runtime APIs.
func GetFriendIDs(ctx context.Context, logger *zap.Logger, db *sql.DB, userID uuid.UUID) (*api.FriendList, error) {
query := `
SELECT id, state
FROM users, user_edge WHERE id = destination_id AND source_id = $1`
rows, err := db.QueryContext(ctx, query, userID)
if err != nil {
logger.Error("Error retrieving friends.", zap.Error(err))
return nil, err
}
defer rows.Close()
friends := make([]*api.Friend, 0, 10)
for rows.Next() {
var id string
var state sql.NullInt64
if err = rows.Scan(&id, &state); err != nil {
logger.Error("Error retrieving friend IDs.", zap.Error(err))
return nil, err
}
friendID := uuid.FromStringOrNil(id)
user := &api.User{
Id: friendID.String(),
}
friends = append(friends, &api.Friend{
User: user,
State: &wrapperspb.Int32Value{
Value: int32(state.Int64),
},
})
}
if err = rows.Err(); err != nil {
logger.Error("Error retrieving friend IDs.", zap.Error(err))
return nil, err
}
return &api.FriendList{Friends: friends}, nil
}
func ListFriends(ctx context.Context, logger *zap.Logger, db *sql.DB, statusRegistry *StatusRegistry, userID uuid.UUID, limit int, state *wrapperspb.Int32Value, cursor string) (*api.FriendList, error) {
var incomingCursor *edgeListCursor
if cursor != "" {
cb, err := base64.StdEncoding.DecodeString(cursor)
if err != nil {
return nil, runtime.ErrFriendInvalidCursor
}
incomingCursor = &edgeListCursor{}
if err := gob.NewDecoder(bytes.NewReader(cb)).Decode(incomingCursor); err != nil {
return nil, runtime.ErrFriendInvalidCursor
}
// Cursor and filter mismatch. Perhaps the caller has sent an old cursor with a changed filter.
if state != nil && int64(state.Value) != incomingCursor.State {
return nil, runtime.ErrFriendInvalidCursor
}
}
params := make([]interface{}, 0, 4)
query := `
SELECT id, username, display_name, avatar_url,
lang_tag, location, timezone, metadata,
create_time, users.update_time, user_edge.update_time, state, position,
facebook_id, google_id, gamecenter_id, steam_id, facebook_instant_game_id, apple_id
FROM users, user_edge WHERE id = destination_id AND source_id = $1`
params = append(params, userID)
if state != nil {
// Assumes the state has already been validated before this function.
query += " AND state = $2"
params = append(params, state.Value)
}
if incomingCursor != nil {
query += " AND (source_id, state, position) >= ($1, $2, $3)"
if state == nil {
params = append(params, incomingCursor.State)
}
params = append(params, incomingCursor.Position)
}
query += " ORDER BY state ASC, position ASC"
if limit != 0 {
// Console API can select all friends in one request. Client/runtime calls will set a non-0 limit.
params = append(params, limit+1)
query += " LIMIT $" + strconv.Itoa(len(params))
}
rows, err := db.QueryContext(ctx, query, params...)
if err != nil {
logger.Error("Error retrieving friends.", zap.Error(err))
return nil, err
}
defer rows.Close()
friends := make([]*api.Friend, 0, limit)
var outgoingCursor string
for rows.Next() {
var id string
var username sql.NullString
var displayName sql.NullString
var avatarURL sql.NullString
var lang sql.NullString
var location sql.NullString
var timezone sql.NullString
var metadata []byte
var createTime pgtype.Timestamptz
var updateTime pgtype.Timestamptz
var edgeUpdateTime pgtype.Timestamptz
var state sql.NullInt64
var position sql.NullInt64
var facebookID sql.NullString
var googleID sql.NullString
var gamecenterID sql.NullString
var steamID sql.NullString
var facebookInstantGameID sql.NullString
var appleID sql.NullString
if err = rows.Scan(&id, &username, &displayName, &avatarURL, &lang, &location, &timezone, &metadata,
&createTime, &updateTime, &edgeUpdateTime, &state, &position,
&facebookID, &googleID, &gamecenterID, &steamID, &facebookInstantGameID, &appleID); err != nil {
logger.Error("Error retrieving friends.", zap.Error(err))
return nil, err
}
if limit != 0 && len(friends) >= limit {
cursorBuf := new(bytes.Buffer)
if err := gob.NewEncoder(cursorBuf).Encode(&edgeListCursor{State: state.Int64, Position: position.Int64}); err != nil {
logger.Error("Error creating friend list cursor", zap.Error(err))
return nil, err
}
outgoingCursor = base64.StdEncoding.EncodeToString(cursorBuf.Bytes())
break
}
user := &api.User{
Id: id,
Username: username.String,
DisplayName: displayName.String,
AvatarUrl: avatarURL.String,
LangTag: lang.String,
Location: location.String,
Timezone: timezone.String,
Metadata: string(metadata),
CreateTime: ×tamppb.Timestamp{Seconds: createTime.Time.Unix()},
UpdateTime: ×tamppb.Timestamp{Seconds: updateTime.Time.Unix()},
// Online filled below.
FacebookId: facebookID.String,
GoogleId: googleID.String,
GamecenterId: gamecenterID.String,
SteamId: steamID.String,
FacebookInstantGameId: facebookInstantGameID.String,
AppleId: appleID.String,
}
friends = append(friends, &api.Friend{
User: user,
State: &wrapperspb.Int32Value{
Value: int32(state.Int64),
},
UpdateTime: ×tamppb.Timestamp{Seconds: edgeUpdateTime.Time.Unix()},
})
}
if err = rows.Err(); err != nil {
logger.Error("Error retrieving friends.", zap.Error(err))
return nil, err
}
if statusRegistry != nil {
statusRegistry.FillOnlineFriends(friends)
}
return &api.FriendList{Friends: friends, Cursor: outgoingCursor}, nil
}
func AddFriends(ctx context.Context, logger *zap.Logger, db *sql.DB, tracker Tracker, messageRouter MessageRouter, userID uuid.UUID, username string, friendIDs []string) error {
uniqueFriendIDs := make(map[string]struct{})
for _, fid := range friendIDs {
uniqueFriendIDs[fid] = struct{}{}
}
var notificationToSend map[string]bool
if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
// If the transaction is retried ensure we wipe any notifications that may have been prepared by previous attempts.
notificationToSend = make(map[string]bool)
for id := range uniqueFriendIDs {
// Check to see if user has already blocked friend, if so, don't add friend or send notification.
var blockState int
err := tx.QueryRowContext(ctx, "SELECT state FROM user_edge WHERE source_id = $1 AND destination_id = $2 AND state = 3", userID, id).Scan(&blockState)
// ignore if the error is sql.ErrNoRows as means block was not found - continue as intended.
if err != nil && err != sql.ErrNoRows {
// genuine DB error was found.
logger.Debug("Failed to check edge state.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", id))
return err
} else if err == nil {
// the block was found, don't add friend or send notification.
logger.Info("Ignoring previously blocked friend. Delete friend first before attempting to add.", zap.String("user", userID.String()), zap.String("friend", id))
continue
}
isFriendAccept, addFriendErr := addFriend(ctx, logger, tx, userID, id)
if addFriendErr == nil {
notificationToSend[id] = isFriendAccept
} else if addFriendErr != sql.ErrNoRows { // Check to see if friend had blocked user.
return addFriendErr
}
}
return nil
}); err != nil {
logger.Error("Error adding friends.", zap.Error(err))
return err
}
notifications := make(map[uuid.UUID][]*api.Notification)
content, _ := json.Marshal(map[string]interface{}{"username": username})
for id, isFriendAccept := range notificationToSend {
uid := uuid.FromStringOrNil(id)
code := NotificationCodeFriendRequest
subject := fmt.Sprintf("%v wants to add you as a friend", username)
if isFriendAccept {
code = NotificationCodeFriendAccept
subject = fmt.Sprintf("%v accepted your friend request", username)
}
notifications[uid] = []*api.Notification{{
Id: uuid.Must(uuid.NewV4()).String(),
Subject: subject,
Content: string(content),
SenderId: userID.String(),
Code: code,
Persistent: true,
CreateTime: ×tamppb.Timestamp{Seconds: time.Now().UTC().Unix()},
}}
}
// Any error is already logged before it's returned here.
_ = NotificationSend(ctx, logger, db, tracker, messageRouter, notifications)
return nil
}
// Returns "true" if accepting an invite, otherwise false
func addFriend(ctx context.Context, logger *zap.Logger, tx *sql.Tx, userID uuid.UUID, friendID string) (bool, error) {
// Mark an invite as accepted, if one was in place.
res, err := tx.ExecContext(ctx, `
UPDATE user_edge SET state = 0, update_time = now()
WHERE (source_id = $1 AND destination_id = $2 AND state = 1)
OR (source_id = $2 AND destination_id = $1 AND state = 2)
`, friendID, userID)
if err != nil {
logger.Debug("Failed to update user state.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return false, err
}
// If both edges were updated, it was accepting an invite was successful.
if rowsAffected, _ := res.RowsAffected(); rowsAffected == 2 {
logger.Debug("Accepting friend invitation.", zap.String("user", userID.String()), zap.String("friend", friendID))
return true, nil
}
position := fmt.Sprintf("%v", time.Now().UTC().UnixNano())
// If no edge updates took place, it's either a new invite being set up, or user was blocked off by friend.
_, err = tx.ExecContext(ctx, `
INSERT INTO user_edge (source_id, destination_id, state, position, update_time)
SELECT source_id, destination_id, state, position, update_time
FROM (VALUES
($1::UUID, $2::UUID, 1, $3::BIGINT, now()),
($2::UUID, $1::UUID, 2, $3::BIGINT, now())
) AS ue(source_id, destination_id, state, position, update_time)
WHERE
EXISTS (SELECT id FROM users WHERE id = $2::UUID)
AND
NOT EXISTS
(SELECT state
FROM user_edge
WHERE source_id = $2::UUID AND destination_id = $1::UUID AND state = 3
)
ON CONFLICT (source_id, destination_id) DO NOTHING
`, userID, friendID, position)
if err != nil {
logger.Debug("Failed to insert new user edge link.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return false, err
}
// Update friend count if we've just created the relationship.
// This check is done by comparing the the timestamp(position) to the timestamp available.
// i.e. only increase count when the relationship was first formed.
// This is caused by an existing bug in CockroachDB: https://github.com/cockroachdb/cockroach/issues/10264
if res, err = tx.ExecContext(ctx, `
UPDATE users
SET edge_count = edge_count +1, update_time = now()
WHERE
(id = $1::UUID OR id = $2::UUID)
AND EXISTS
(SELECT state
FROM user_edge
WHERE
(source_id = $1::UUID AND destination_id = $2::UUID AND position = $3::BIGINT)
OR
(source_id = $2::UUID AND destination_id = $1::UUID AND position = $3::BIGINT)
)
`, userID, friendID, position); err != nil {
logger.Debug("Failed to update user count.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return false, err
}
// An invite was successfully added if both components were inserted.
if rowsAffected, _ := res.RowsAffected(); rowsAffected != 2 {
logger.Debug("Did not add new friend as friend connection already exists or user is blocked.", zap.String("user", userID.String()), zap.String("friend", friendID))
return false, sql.ErrNoRows
}
logger.Debug("Added new friend invitation.", zap.String("user", userID.String()), zap.String("friend", friendID))
return false, nil
}
func DeleteFriends(ctx context.Context, logger *zap.Logger, db *sql.DB, currentUser uuid.UUID, ids []string) error {
uniqueFriendIDs := make(map[string]struct{})
for _, fid := range ids {
uniqueFriendIDs[fid] = struct{}{}
}
if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
for id := range uniqueFriendIDs {
if deleteFriendErr := deleteFriend(ctx, logger, tx, currentUser, id); deleteFriendErr != nil {
return deleteFriendErr
}
}
return nil
}); err != nil {
logger.Error("Error deleting friends.", zap.Error(err))
return err
}
return nil
}
func deleteFriend(ctx context.Context, logger *zap.Logger, tx *sql.Tx, userID uuid.UUID, friendID string) error {
res, err := tx.ExecContext(ctx, "DELETE FROM user_edge WHERE (source_id = $1 AND destination_id = $2) OR (source_id = $2 AND destination_id = $1 AND state <> 3)", userID, friendID)
if err != nil {
logger.Debug("Failed to delete user edge relationships.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
if rowsAffected, _ := res.RowsAffected(); rowsAffected == 0 {
logger.Debug("Could not delete user relationships as prior relationship did not exist.", zap.String("user", userID.String()), zap.String("friend", friendID))
return nil
} else if rowsAffected == 1 {
if _, err = tx.ExecContext(ctx, "UPDATE users SET edge_count = edge_count - 1, update_time = now() WHERE id = $1::UUID", userID); err != nil {
logger.Debug("Failed to update user edge counts.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
} else if rowsAffected == 2 {
if _, err = tx.ExecContext(ctx, "UPDATE users SET edge_count = edge_count - 1, update_time = now() WHERE id IN ($1, $2)", userID, friendID); err != nil {
logger.Debug("Failed to update user edge counts.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
} else {
logger.Debug("Unexpected number of edges were deleted.", zap.String("user", userID.String()), zap.String("friend", friendID), zap.Int64("rows_affected", rowsAffected))
return errors.New("unexpected number of edges were deleted")
}
return nil
}
func BlockFriends(ctx context.Context, logger *zap.Logger, db *sql.DB, currentUser uuid.UUID, ids []string) error {
uniqueFriendIDs := make(map[string]struct{})
for _, fid := range ids {
uniqueFriendIDs[fid] = struct{}{}
}
if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
for id := range uniqueFriendIDs {
if blockFriendErr := blockFriend(ctx, logger, tx, currentUser, id); blockFriendErr != nil {
return blockFriendErr
}
}
return nil
}); err != nil {
logger.Error("Error blocking friends.", zap.Error(err))
return err
}
return nil
}
func blockFriend(ctx context.Context, logger *zap.Logger, tx *sql.Tx, userID uuid.UUID, friendID string) error {
// Try to update any previous edge between these users.
res, err := tx.ExecContext(ctx, "UPDATE user_edge SET state = 3, update_time = now() WHERE source_id = $1 AND destination_id = $2",
userID, friendID)
if err != nil {
logger.Debug("Failed to update user edge state.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
position := fmt.Sprintf("%v", time.Now().UTC().UnixNano())
if rowsAffected, _ := res.RowsAffected(); rowsAffected == 0 {
// If there was no previous edge then create one.
query := `
INSERT INTO user_edge (source_id, destination_id, state, position, update_time)
SELECT source_id, destination_id, state, position, update_time
FROM (VALUES
($1::UUID, $2::UUID, 3, $3::BIGINT, now())
) AS ue(source_id, destination_id, state, position, update_time)
WHERE EXISTS (SELECT id FROM users WHERE id = $2::UUID)`
res, err = tx.ExecContext(ctx, query, userID, friendID, position)
if err != nil {
logger.Debug("Failed to block user.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
if rowsAffected, _ := res.RowsAffected(); rowsAffected == 0 {
logger.Debug("Could not block user as user may not exist.", zap.String("user", userID.String()), zap.String("friend", friendID))
return nil
}
// Update the edge count.
if _, err = tx.ExecContext(ctx, "UPDATE users SET edge_count = edge_count + 1, update_time = now() WHERE id = $1", userID); err != nil {
logger.Debug("Failed to update user edge count.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
}
// Delete opposite relationship if user hasn't blocked you already
res, err = tx.ExecContext(ctx, "DELETE FROM user_edge WHERE source_id = $1 AND destination_id = $2 AND state != 3", friendID, userID)
if err != nil {
logger.Debug("Failed to update user edge state.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
if rowsAffected, _ := res.RowsAffected(); rowsAffected == 1 {
if _, err = tx.ExecContext(ctx, "UPDATE users SET edge_count = edge_count - 1, update_time = now() WHERE id = $1", friendID); err != nil {
logger.Debug("Failed to update user edge count.", zap.Error(err), zap.String("user", userID.String()), zap.String("friend", friendID))
return err
}
}
return nil
}