“`html
In this tutorial, we explore how federated learning behaves when the traditional centralized aggregation server is removed and replaced with a fully decentralized, peer-to-peer gossip mechanism. We implement both centralized FedAvg and decentralized Gossip Federated Learning from scratch and introduce client-side differential privacy by injecting calibrated noise into local model updates. By running controlled experiments on non-IID MNIST data, we examine how privacy strength, as measured by different epsilon values, directly affects convergence speed, stability, and final model accuracy. Also, we study the practical trade-offs between privacy guarantees and learning efficiency in real-world decentralized learning systems. Check out the Full Codes here.
from dataclasses import dataclass
from typing import Dict, List, Tuple
import subprocess, sys
def pip_install(pkgs):
subprocess.check_call([sys.executable, “-m”, “pip”, “install”, “-q”] + pkgs)
pip_install([“torch”, “torchvision”, “numpy”, “matplotlib”, “networkx”, “tqdm”])
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import networkx as nx
from tqdm import trange
SEED = 7
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
transform = transforms.Compose([transforms.ToTensor()])
train_ds = datasets.MNIST(root=”/content/data”, train=True, download=True, transform=transform)
test_ds = datasets.MNIST(root=”/content/data”, train=False, download=True, transform=transform)
We set up the execution environment and installed all required dependencies. We initialize random seeds and device settings to maintain reproducibility across experiments. We also load the MNIST dataset, which serves as a lightweight yet effective benchmark for federated learning experiments. Check out the Full Codes here.
rng = np.random.default_rng(seed)
y = np.array([dataset[i][1] for i in range(len(dataset))])
idx = np.arange(len(dataset))
idx_sorted = idx[np.argsort(y)]
num_shards = num_clients * shards_per_client
shard_size = len(dataset) // num_shards
shards = [idx_sorted[i*shard_size:(i+1)*shard_size] for i in range(num_shards)]
rng.shuffle(shards)
client_indices = []
for c in range(num_clients):
take = shards[c*shards_per_client:(c+1)*shards_per_client]
client_indices.append(np.concatenate(take))
return client_indices
NUM_CLIENTS = 20
client_indices = make_noniid_clients(train_ds, num_clients=NUM_CLIENTS, shards_per_client=2)
test_loader = DataLoader(test_ds, batch_size=1024, shuffle=False, num_workers=2, pin_memory=True)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(28*28, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
We construct a non-IID data distribution by partitioning the training dataset into label-based shards across multiple clients. We define a compact neural network model that balances expressiveness and computational efficiency. It enables us to realistically simulate data heterogeneity, a critical challenge in federated learning systems. Check out the Full Codes here.
return {k: v.detach().clone() for k, v in model.state_dict().items()}
def set_model_params(model, params):
model.load_state_dict(params, strict=True)
def add_params(a, b):
return {k: a[k] + b[k] for k in a.keys()}
def sub_params(a, b):
return {k: a[k] – b[k] for k in a.keys()}
def scale_params(a, s):
return {k: a[k] * s for k in a.keys()}
def mean_params(params_list):
out = {k: torch.zeros_like(params_list[0][k]) for k in params_list[0].keys()}
for p in params_list:
for k in out.keys():
out[k] += p[k]
for k in out.keys():
out[k] /= len(params_list)
return out
def l2_norm_params(delta):
sq = 0.0
for v in delta.values():
sq += float(torch.sum(v.float() * v.float()).item())
return math.sqrt(sq)
def dp_sanitize_update(delta, clip_norm, epsilon, delta_dp, rng):
norm = l2_norm_params(delta)
scale = min(1.0, clip_norm / (norm + 1e-12))
clipped = scale_params(delta, scale)
if epsilon is None or math.isinf(epsilon) or epsilon <= 0:
return clipped
sigma = clip_norm * math.sqrt(2.0 * math.log(1.25 / delta_dp)) / epsilon
noised = {}
for k, v in clipped.items():
noise = torch.normal(mean=0.0, std=sigma, size=v.shape, generator=rng, device=v.device, dtype=v.dtype)
noised[k] = v + noise
return noised
We implement parameter manipulation utilities that enable addition, subtraction, scaling, and averaging of model weights across clients. We introduce differential privacy by clipping local updates and injecting Gaussian noise, both determined by the chosen privacy budget. It serves as the core privacy mechanism that enables us to study the privacy–utility trade-off in both centralized and decentralized settings.
“` Check out the Full Codes here.
“`python
def local_train_one_client(base_params, client_id, epochs, lr, batch_size, weight_decay=0.0):
model = MLP().to(device)
set_model_params(model, base_params)
model.train()
loader = DataLoader(
Subset(train_ds, client_indices[client_id].tolist() if hasattr(client_indices[client_id], “tolist”) else client_indices[client_id]),
batch_size=batch_size,
shuffle=True,
num_workers=2,
pin_memory=True
)
opt = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
for _ in range(epochs):
for xb, yb in loader:
xb, yb = xb.to(device), yb.to(device)
opt.zero_grad(set_to_none=True)
logits = model(xb)
loss = F.cross_entropy(logits, yb)
loss.backward()
opt.step()
return get_model_params(model)
@torch.no_grad()
def evaluate(params):
model = MLP().to(device)
set_model_params(model, params)
model.eval()
total, correct = 0, 0
loss_sum = 0.0
for xb, yb in test_loader:
xb, yb = xb.to(device), yb.to(device)
logits = model(xb)
loss = F.cross_entropy(logits, yb, reduction=”sum”)
loss_sum += float(loss.item())
pred = torch.argmax(logits, dim=1)
correct += int((pred == yb).sum().item())
total += int(yb.numel())
return loss_sum / total, correct / total
“`
We define the local training loop that each client executes independently on its private data. We also implement a unified evaluation routine to measure test loss and accuracy for any given model state. Together, these functions simulate realistic federated learning behavior where training and evaluation are fully decoupled from data ownership. Check out the Full Codes here.
“`python
@dataclass
class FedAvgConfig:
rounds: int = 25
clients_per_round: int = 10
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
def run_fedavg(cfg):
global_params = get_model_params(MLP().to(device))
history = {“test_loss”: [], “test_acc”: []}
for r in trange(cfg.rounds):
chosen = random.sample(range(NUM_CLIENTS), k=cfg.clients_per_round)
start_params = global_params
updates = []
for cid in chosen:
local_params = local_train_one_client(start_params, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(local_params, start_params)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
updates.append(delta_dp)
avg_update = mean_params(updates)
global_params = add_params(start_params, avg_update)
tl, ta = evaluate(global_params)
history[“test_loss”].append(tl)
history[“test_acc”].append(ta)
return history, global_params
“`
We implement the centralized FedAvg algorithm, where a subset of clients trains locally and sends differentially private updates to a central aggregator. We track model performance across communication rounds to observe convergence behavior under varying privacy budgets. This serves as the baseline against which decentralized gossip-based learning is compared. Check out the full codes for implementing decentralized Gossip Federated Learning using a peer-to-peer model that exchanges over a predefined network topology.
“`python
@dataclass
class GossipConfig:
rounds: int = 25
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
topology: str = “ring”
p: float = 0.2
gossip_pairs_per_round: int = 10
def build_topology(cfg):
if cfg.topology == “ring”:
G = nx.cycle_graph(NUM_CLIENTS)
elif cfg.topology == “erdos_renyi”:
G = nx.erdos_renyi_graph(NUM_CLIENTS, cfg.p, seed=SEED)
if not nx.is_connected(G):
comps = list(nx.connected_components(G))
for i in range(len(comps) – 1):
a = next(iter(comps[i]))
b = next(iter(comps[i+1]))
G.add_edge(a, b)
else:
raise ValueError
return G
def run_gossip(cfg):
node_params = [get_model_params(MLP().to(device)) for _ in range(NUM_CLIENTS)]
G = build_topology(cfg)
history = {“avg_test_loss”: [], “avg_test_acc”: []}
for r in trange(cfg.rounds):
new_params = []
for cid in range(NUM_CLIENTS):
p0 = node_params[cid]
p_local = local_train_one_client(p0, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(p_local, p0)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
p_local_dp = add_params(p0, delta_dp)
new_params.append(p_local_dp)
node_params = new_params
edges = list(G.edges())
for _ in range(cfg.gossip_pairs_per_round):
i, j = random.choice(edges)
avg = mean_params([node_params[i], node_params[j]])
node_params[i] = avg
node_params[j] = avg
losses, accs = [], []
for cid in range(NUM_CLIENTS):
tl, ta = evaluate(node_params[cid])
losses.append(tl)
accs.append(ta)
history[“avg_test_loss”].append(float(np.mean(losses)))
history[“avg_test_acc”].append(float(np.mean(accs)))
return history, node_params
“`
These codes allow us to analyze how privacy noise propagates through decentralized communication patterns and affects convergence.
We also ran controlled experiments across multiple privacy levels for both centralized and decentralized training strategies, visualized convergence trends, and computed convergence speed metrics to compare different aggregation schemes’ responses to increasing privacy constraints.
In conclusion, we observed that while centralized FedAvg converges faster under weak privacy constraints, gossip-based federated learning is more robust to noisy updates at the cost of slower convergence. Stronger privacy guarantees significantly slow learning in both settings, with decentralized topologies experiencing delayed information mixing.





Be the first to comment