Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 21: Distributed Systems

Introduction

The Hologram’s content-addressable memory and receipt-based verification naturally extend to distributed systems. This chapter explores how the 12,288 model enables novel approaches to distributed storage, consensus, and network protocols, all while maintaining the same lawfulness guarantees that apply to local computation.

Content-Addressed Storage

Universal Address Space

In the Hologram, every lawful object has a unique address determined by its content:

#![allow(unused)]
fn main() {
pub struct DistributedCAM {
    local_store: LocalStore,
    peer_registry: PeerRegistry,
    address_resolver: AddressResolver,
}

impl DistributedCAM {
    pub async fn get(&self, address: Address) -> Result<Object, CamError> {
        // Check local store first
        if let Some(obj) = self.local_store.get(&address) {
            return Ok(obj);
        }

        // Query peer network
        let peer = self.address_resolver.find_peer(&address)?;
        let obj = peer.fetch(address).await?;

        // Verify object matches address
        if self.compute_address(&obj) != address {
            return Err(CamError::AddressMismatch);
        }

        // Cache locally
        self.local_store.put(address, &obj);
        Ok(obj)
    }

    pub async fn put(&mut self, obj: Object) -> Address {
        // Compute canonical address
        let address = self.compute_address(&obj);

        // Store locally
        self.local_store.put(address, &obj);

        // Announce to peers
        self.peer_registry.announce(address, self.local_id()).await;

        address
    }
}
}

Deduplication Network

Content addressing enables perfect deduplication across the network:

#![allow(unused)]
fn main() {
pub struct DeduplicationNetwork {
    shard_map: ConsistentHash<Address, PeerId>,
    replication_factor: usize,
}

impl DeduplicationNetwork {
    pub async fn store_unique(&mut self, obj: Object) -> StoreResult {
        let address = Address::from_object(&obj);

        // Check if already exists in network
        if self.exists(address).await {
            return StoreResult::Duplicate(address);
        }

        // Determine storage shards
        let primary_shard = self.shard_map.get_node(&address);
        let replicas = self.shard_map.get_replicas(&address, self.replication_factor);

        // Store with replication
        let mut futures = Vec::new();
        futures.push(primary_shard.store(address, obj.clone()));
        for replica in replicas {
            futures.push(replica.store(address, obj.clone()));
        }

        // Wait for quorum
        let results = join_all(futures).await;
        let successes = results.iter().filter(|r| r.is_ok()).count();

        if successes > self.replication_factor / 2 {
            StoreResult::Stored(address, successes)
        } else {
            StoreResult::Failed(address)
        }
    }
}
}

Content Routing

The DHT-based routing leverages the lattice structure:

#![allow(unused)]
fn main() {
pub struct LatticeRouter {
    routing_table: RoutingTable,
    lattice_coords: (u8, u8), // (page, byte) coordinates
}

impl LatticeRouter {
    pub fn route_to_address(&self, target: Address) -> Vec<PeerId> {
        let target_coords = target.to_lattice_coords();
        let mut candidates = Vec::new();

        // Find peers in same page
        let page_peers = self.routing_table.get_page_peers(target_coords.0);
        candidates.extend(page_peers);

        // Find peers in adjacent pages (toroidal wrap)
        for offset in [-1, 1] {
            let adjacent_page = (target_coords.0 as i16 + offset).rem_euclid(48) as u8;
            candidates.extend(self.routing_table.get_page_peers(adjacent_page));
        }

        // Sort by distance in lattice
        candidates.sort_by_key(|peer| {
            self.lattice_distance(peer.coords(), target_coords)
        });

        candidates.truncate(3); // Return 3 closest peers
        candidates
    }

    fn lattice_distance(&self, a: (u8, u8), b: (u8, u8)) -> u16 {
        // Toroidal distance metric
        let page_dist = ((a.0 as i16 - b.0 as i16).abs()).min(
            48 - (a.0 as i16 - b.0 as i16).abs()
        );
        let byte_dist = ((a.1 as i16 - b.1 as i16).abs()).min(
            256 - (a.1 as i16 - b.1 as i16).abs()
        );
        (page_dist * 256 + byte_dist) as u16
    }
}
}

Consensus via Receipts

Receipt-Based Consensus

Receipts provide a natural consensus mechanism:

#![allow(unused)]
fn main() {
pub struct ReceiptConsensus {
    validators: Vec<ValidatorNode>,
    threshold: usize, // Byzantine fault tolerance threshold
}

impl ReceiptConsensus {
    pub async fn achieve_consensus(&self, proposal: Proposal) -> ConsensusResult {
        // Phase 1: Proposal broadcast
        let proposal_receipt = proposal.compute_receipt();
        let mut prepare_votes = Vec::new();

        for validator in &self.validators {
            let vote = validator.prepare_vote(proposal.clone()).await;
            prepare_votes.push(vote);
        }

        // Phase 2: Receipt validation
        let valid_receipts: Vec<_> = prepare_votes
            .into_iter()
            .filter(|vote| self.verify_receipt(&vote.receipt))
            .collect();

        if valid_receipts.len() < self.threshold {
            return ConsensusResult::InsufficientVotes;
        }

        // Phase 3: Commit if receipts match
        let canonical_receipt = self.compute_canonical_receipt(&valid_receipts);
        let commit_futures: Vec<_> = self.validators
            .iter()
            .map(|v| v.commit(canonical_receipt.clone()))
            .collect();

        let commits = join_all(commit_futures).await;
        let committed_count = commits.iter().filter(|c| c.is_ok()).count();

        if committed_count >= self.threshold {
            ConsensusResult::Committed(canonical_receipt)
        } else {
            ConsensusResult::Failed
        }
    }

    fn compute_canonical_receipt(&self, receipts: &[Vote]) -> Receipt {
        // Deterministic selection of canonical receipt
        // All valid receipts should be identical for lawful proposals
        receipts[0].receipt.clone()
    }
}
}

Byzantine Fault Tolerance

The receipt system naturally handles Byzantine faults:

#![allow(unused)]
fn main() {
pub struct ByzantineDetector {
    history: ReceiptHistory,
    fault_threshold: f64,
}

impl ByzantineDetector {
    pub fn detect_byzantine_node(&self, node_id: NodeId, receipt: &Receipt) -> bool {
        // Check receipt validity
        if !receipt.verify() {
            return true; // Invalid receipt = Byzantine
        }

        // Check for equivocation
        if let Some(previous) = self.history.get_last_receipt(node_id) {
            if previous.conflicts_with(receipt) {
                return true; // Conflicting receipts = Byzantine
            }
        }

        // Check for impossible claims
        if receipt.budget_ledger > 47 {
            return true; // Negative budget = Byzantine
        }

        // Statistical anomaly detection
        let node_stats = self.history.get_stats(node_id);
        if node_stats.deviation_score() > self.fault_threshold {
            return true; // Statistical outlier = likely Byzantine
        }

        false
    }
}
}

Consensus Optimization

Optimizations for high-throughput consensus:

#![allow(unused)]
fn main() {
pub struct OptimizedConsensus {
    pipelined_rounds: VecDeque<ConsensusRound>,
    max_pipeline_depth: usize,
}

impl OptimizedConsensus {
    pub async fn pipelined_consensus(&mut self, proposals: Vec<Proposal>) -> Vec<ConsensusResult> {
        let mut results = Vec::new();

        for proposal in proposals {
            // Start new round if pipeline not full
            if self.pipelined_rounds.len() < self.max_pipeline_depth {
                let round = self.start_round(proposal);
                self.pipelined_rounds.push_back(round);
            }

            // Process pipeline stages in parallel
            let mut completed = Vec::new();
            for round in &mut self.pipelined_rounds {
                round.advance_stage().await;
                if round.is_complete() {
                    completed.push(round.id());
                    results.push(round.result());
                }
            }

            // Remove completed rounds
            self.pipelined_rounds.retain(|r| !completed.contains(&r.id()));
        }

        results
    }
}
}

Network Protocol Design

Lattice-Aware Networking

Network protocols that exploit lattice structure:

#![allow(unused)]
fn main() {
pub struct LatticeProtocol {
    local_page: u8,
    page_neighbors: Vec<PeerId>,
    gossip_fanout: usize,
}

impl LatticeProtocol {
    pub async fn broadcast(&self, message: Message) -> BroadcastResult {
        // Compute message receipt
        let receipt = message.compute_receipt();

        // Phase 1: Broadcast to page neighbors
        let page_broadcast = self.broadcast_to_page(message.clone(), receipt.clone()).await;

        // Phase 2: Inter-page gossip
        let selected_pages = self.select_gossip_targets();
        let gossip_futures: Vec<_> = selected_pages
            .iter()
            .map(|page| self.gossip_to_page(*page, message.clone(), receipt.clone()))
            .collect();

        let gossip_results = join_all(gossip_futures).await;

        BroadcastResult {
            page_coverage: page_broadcast.coverage,
            network_coverage: self.estimate_coverage(&gossip_results),
            receipt,
        }
    }

    fn select_gossip_targets(&self) -> Vec<u8> {
        // Use schedule rotation for deterministic gossip
        let mut targets = Vec::new();
        let rotation = ScheduleRotation::at_time(SystemTime::now());

        for i in 0..self.gossip_fanout {
            let target_page = rotation.map_page(self.local_page, i);
            targets.push(target_page);
        }

        targets
    }
}
}

Receipt-Authenticated Messages

All network messages carry verifiable receipts:

#![allow(unused)]
fn main() {
pub struct AuthenticatedMessage {
    payload: Vec<u8>,
    sender_id: NodeId,
    receipt: Receipt,
    witness: Witness,
}

impl AuthenticatedMessage {
    pub fn verify(&self) -> bool {
        // Verify receipt matches payload
        let computed_receipt = Receipt::from_bytes(&self.payload);
        if computed_receipt != self.receipt {
            return false;
        }

        // Verify witness chain
        if !self.witness.verify() {
            return false;
        }

        // Verify sender authorization
        self.witness.authorizes(self.sender_id)
    }

    pub fn forward(&self, next_hop: NodeId) -> AuthenticatedMessage {
        // Extend witness chain for forwarding
        let forward_witness = self.witness.extend(next_hop);

        AuthenticatedMessage {
            payload: self.payload.clone(),
            sender_id: self.sender_id,
            receipt: self.receipt.clone(),
            witness: forward_witness,
        }
    }
}
}

Adaptive Topology

The network topology adapts based on receipts:

#![allow(unused)]
fn main() {
pub struct AdaptiveTopology {
    connections: HashMap<NodeId, Connection>,
    performance_tracker: PerformanceTracker,
}

impl AdaptiveTopology {
    pub async fn optimize_topology(&mut self) {
        // Collect performance receipts
        let mut performance_receipts = Vec::new();
        for (node_id, conn) in &self.connections {
            let perf = self.performance_tracker.get_metrics(node_id);
            performance_receipts.push((*node_id, perf));
        }

        // Sort by performance (receipt-based)
        performance_receipts.sort_by_key(|(_, perf)| perf.latency_percentile(95));

        // Drop poor performers
        let drop_threshold = performance_receipts.len() * 3 / 4;
        for (node_id, _) in &performance_receipts[drop_threshold..] {
            self.connections.remove(node_id);
        }

        // Discover new peers
        let new_peers = self.discover_peers().await;
        for peer in new_peers.iter().take(5) {
            self.connect_to_peer(peer).await;
        }
    }
}
}

Distributed Transactions

Receipt-Coordinated Transactions

Distributed transactions using receipt coordination:

#![allow(unused)]
fn main() {
pub struct DistributedTransaction {
    transaction_id: TransactionId,
    participants: Vec<Participant>,
    coordinator_receipt: Receipt,
}

impl DistributedTransaction {
    pub async fn execute(&mut self) -> TransactionResult {
        // Phase 1: Prepare
        let prepare_futures: Vec<_> = self.participants
            .iter()
            .map(|p| p.prepare(self.transaction_id))
            .collect();

        let prepare_results = join_all(prepare_futures).await;

        // Check all participants ready
        let all_prepared = prepare_results.iter().all(|r| r.is_ok());
        if !all_prepared {
            return self.abort().await;
        }

        // Collect prepare receipts
        let prepare_receipts: Vec<_> = prepare_results
            .into_iter()
            .map(|r| r.unwrap())
            .collect();

        // Phase 2: Commit with coordinated receipt
        let commit_receipt = self.compute_commit_receipt(&prepare_receipts);
        let commit_futures: Vec<_> = self.participants
            .iter()
            .map(|p| p.commit(self.transaction_id, commit_receipt.clone()))
            .collect();

        let commit_results = join_all(commit_futures).await;

        // Verify commit receipts match
        let all_committed = commit_results.iter().all(|r| {
            r.as_ref().map(|receipt| receipt == &commit_receipt).unwrap_or(false)
        });

        if all_committed {
            TransactionResult::Committed(commit_receipt)
        } else {
            self.abort().await
        }
    }

    async fn abort(&mut self) -> TransactionResult {
        let abort_futures: Vec<_> = self.participants
            .iter()
            .map(|p| p.abort(self.transaction_id))
            .collect();

        join_all(abort_futures).await;
        TransactionResult::Aborted
    }
}
}

State Machine Replication

Deterministic State Machines

State machines with receipt-based determinism:

#![allow(unused)]
fn main() {
pub struct ReplicatedStateMachine {
    state: State,
    log: Vec<LogEntry>,
    receipt_chain: ReceiptChain,
}

impl ReplicatedStateMachine {
    pub fn apply_command(&mut self, command: Command) -> Receipt {
        // Compute command receipt
        let command_receipt = command.compute_receipt();

        // Apply to state
        let new_state = self.state.apply(&command);

        // Compute state transition receipt
        let transition_receipt = Receipt::from_transition(
            &self.state,
            &new_state,
            &command_receipt
        );

        // Update state and log
        self.state = new_state;
        self.log.push(LogEntry {
            command,
            receipt: transition_receipt.clone(),
            timestamp: SystemTime::now(),
        });

        // Extend receipt chain
        self.receipt_chain.extend(transition_receipt.clone());

        transition_receipt
    }

    pub fn verify_replica(&self, other: &ReplicatedStateMachine) -> bool {
        // Replicas are consistent if receipt chains match
        self.receipt_chain == other.receipt_chain
    }
}
}

Network Sharding

Receipt-Based Sharding

Sharding strategy based on receipt distribution:

#![allow(unused)]
fn main() {
pub struct ReceiptSharding {
    shard_count: usize,
    shard_map: HashMap<ShardId, ShardInfo>,
}

impl ReceiptSharding {
    pub fn compute_shard(&self, receipt: &Receipt) -> ShardId {
        // Use R96 digest for shard assignment
        let digest_hash = receipt.r96_digest.as_u64();
        (digest_hash % self.shard_count as u64) as ShardId
    }

    pub fn rebalance_shards(&mut self, load_stats: &LoadStatistics) {
        // Compute target load per shard
        let total_load = load_stats.total_load();
        let target_load = total_load / self.shard_count;

        // Identify overloaded shards
        let overloaded: Vec<_> = self.shard_map
            .iter()
            .filter(|(_, info)| info.load > target_load * 1.2)
            .map(|(id, _)| *id)
            .collect();

        // Split overloaded shards
        for shard_id in overloaded {
            self.split_shard(shard_id);
        }
    }

    fn split_shard(&mut self, shard_id: ShardId) {
        let new_shard_id = self.shard_count;
        self.shard_count += 1;

        // Update shard map with split point
        let split_receipt = self.compute_split_point(shard_id);
        self.shard_map.insert(new_shard_id, ShardInfo {
            range: ReceiptRange::from_split(split_receipt),
            load: 0,
        });
    }
}
}

Exercises

  1. Epidemic Broadcast: Design an epidemic broadcast protocol that uses receipts to prove message delivery to a threshold of nodes.

  2. Sybil Resistance: Implement a Sybil-resistant peer discovery mechanism using receipt-based proof of work.

  3. Cross-Shard Transactions: Design a protocol for atomic transactions across multiple shards using two-phase commit with receipts.

  4. Network Partitioning: Implement a partition-tolerant consensus algorithm that can merge decisions when partitions heal.

  5. Load Balancing: Create a dynamic load balancing algorithm that migrates objects between nodes based on access patterns recorded in receipts.

Summary

The Hologram’s foundations naturally extend to distributed systems, providing content-addressed storage with perfect deduplication, receipt-based consensus that handles Byzantine faults, and network protocols that exploit the lattice structure. The receipt system serves as both a verification mechanism and a coordination primitive, enabling novel approaches to distributed transactions, state machine replication, and network sharding. These patterns demonstrate that the same lawfulness principles governing local computation can orchestrate global distributed systems.

Further Reading

  • Chapter 4: Content-Addressable Memory - For CAM foundations
  • Chapter 9: Security, Safety, and Correctness - For Byzantine fault tolerance
  • Chapter 20: Verification System - For receipt verification
  • Chapter 22: Database Systems - For storage patterns