forked from adap/flower
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.py
103 lines (82 loc) · 3.17 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from typing import List, Tuple
import numpy as np
import tensorflow as tf
from tensorflow_privacy.privacy.analysis.rdp_accountant import compute_rdp
from tensorflow_privacy.privacy.analysis.rdp_accountant import get_privacy_spent
XY = Tuple[np.ndarray, np.ndarray]
XYList = List[XY]
PartitionedDataset = List[Tuple[XY, XY]]
def compute_epsilon(
epochs: int, num_train_examples: int, batch_size: int, noise_multiplier: float
) -> float:
"""Computes epsilon value for given hyperparameters.
Based on
github.com/tensorflow/privacy/blob/master/tutorials/mnist_dpsgd_tutorial_keras.py
"""
if noise_multiplier == 0.0:
return float("inf")
steps = epochs * num_train_examples // batch_size
orders = [1 + x / 10.0 for x in range(1, 100)] + list(range(12, 64))
sampling_probability = batch_size / num_train_examples
rdp = compute_rdp(
q=sampling_probability,
noise_multiplier=noise_multiplier,
steps=steps,
orders=orders,
)
# Delta is set to approximate 1 / (number of training points).
return get_privacy_spent(orders, rdp, target_delta=1 / num_train_examples)[0]
def create_cnn_model() -> tf.keras.Model:
"""Returns a sequential keras CNN Model."""
return tf.keras.Sequential(
[
tf.keras.layers.Conv2D(
16,
8,
strides=2,
padding="same",
activation="relu",
input_shape=(28, 28, 1),
),
tf.keras.layers.MaxPool2D(2, 1),
tf.keras.layers.Conv2D(
32, 4, strides=2, padding="valid", activation="relu"
),
tf.keras.layers.MaxPool2D(2, 1),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(32, activation="relu"),
tf.keras.layers.Dense(10),
]
)
def shuffle(X: np.ndarray, y: np.ndarray) -> XY:
"""Shuffle X and y."""
rng = np.random.default_rng()
idx = rng.permutation(len(X))
return X[idx], y[idx]
def partition(X: np.ndarray, y: np.ndarray, num_partitions: int) -> XYList:
"""Split X and y into a number of partitions."""
return list(
zip(np.array_split(X, num_partitions), np.array_split(y, num_partitions))
)
def preprocess(X: np.ndarray, y: np.ndarray) -> XY:
"""Basic preprocessing for MNIST dataset."""
X = np.array(X, dtype=np.float32) / 255
X = X.reshape((X.shape[0], 28, 28, 1))
y = np.array(y, dtype=np.int32)
y = tf.keras.utils.to_categorical(y, num_classes=10)
return X, y
def create_partitions(source_dataset: XY, num_partitions: int) -> XYList:
"""Create partitioned version of a source dataset."""
X, y = source_dataset
X, y = shuffle(X, y)
X, y = preprocess(X, y)
xy_partitions = partition(X, y, num_partitions)
return xy_partitions
def load(
num_partitions: int,
) -> PartitionedDataset:
"""Create partitioned version of MNIST."""
xy_train, xy_test = tf.keras.datasets.mnist.load_data()
xy_train_partitions = create_partitions(xy_train, num_partitions)
xy_test_partitions = create_partitions(xy_test, num_partitions)
return list(zip(xy_train_partitions, xy_test_partitions))